Author: psmith
Date: Thu Feb 22 17:50:56 2007
New Revision: 94
Added:
branches/home/psmith/restructure/src/utils/concurrent-queue.lisp
- copied, changed from r93, branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
Removed:
branches/home/psmith/restructure/src/compat/concurrent-queue.lisp
Modified:
branches/home/psmith/restructure/src/compat/nio-compat-package.lisp
branches/home/psmith/restructure/src/compat/nio-compat.asd
branches/home/psmith/restructure/src/io/nio-server.lisp
branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
branches/home/psmith/restructure/src/utils/nio-utils-package.lisp
branches/home/psmith/restructure/src/utils/nio-utils.asd
branches/home/psmith/restructure/src/utils/utils.lisp
Log:
moved threadsafe queue and added more tests.
Modified: branches/home/psmith/restructure/src/compat/nio-compat-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/compat/nio-compat-package.lisp (original)
+++ branches/home/psmith/restructure/src/compat/nio-compat-package.lisp Thu Feb 22 17:50:56 2007
@@ -30,7 +30,6 @@
;; errno.lisp
get-errno +ERRNO_EAGAIN+ perror
-
- ;;concurrent-queue
- concurrent-queue add take
+ ;;threading
+ with-mutex make-mutex
))
Modified: branches/home/psmith/restructure/src/compat/nio-compat.asd
==============================================================================
--- branches/home/psmith/restructure/src/compat/nio-compat.asd (original)
+++ branches/home/psmith/restructure/src/compat/nio-compat.asd Thu Feb 22 17:50:56 2007
@@ -6,7 +6,7 @@
:components ((:file "nio-compat-package")
(:file "errno" :depends-on ("nio-compat-package"))
- (:file "concurrent-queue" :depends-on ("nio-compat-package"))
+ (:file "threading" :depends-on ("nio-compat-package"))
)
:depends-on (:cffi))
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp (original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp Thu Feb 22 17:50:56 2007
@@ -34,7 +34,7 @@
t)
;TODO thread safety
-(defparameter +connected-sockets-queue+ (nio-compat:concurrent-queue)
+(defparameter +connected-sockets-queue+ (nio-utils:concurrent-queue)
"List of node objects that are to be connected to")
;loop over hashtable
@@ -155,7 +155,7 @@
;add outgoing sockets to event queue
#+nio-debug2 (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+)
- (loop for node = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null node) do
+ (loop for node = (nio-utils:take +connected-sockets-queue+ :blocking-call nil) until (null node) do
#+nio-debug (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node)
(push node *nodes-list*))
@@ -192,4 +192,4 @@
(format t "Connect failed!!~A ~%" (get-errno)))))
(defun add-connection(node)
- (nio-compat:add +connected-sockets-queue+ node))
\ No newline at end of file
+ (nio-utils:add +connected-sockets-queue+ node))
\ No newline at end of file
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd Thu Feb 22 17:50:56 2007
@@ -10,4 +10,4 @@
(:file "yarpc-client-state-machine" :depends-on ("yarpc-packet-factory"))
)
- :depends-on (:nio :nio-sm :nio-compat))
\ No newline at end of file
+ :depends-on (:nio :nio-sm :nio-utils))
\ No newline at end of file
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Thu Feb 22 17:50:56 2007
@@ -33,7 +33,7 @@
;; A client that accepts jobs to be run via a threadsafe queue and then submits them to the remote end for execution
;;
(defclass yarpc-client-state-machine (state-machine)
- ((job-queue :initform (nio-compat:concurrent-queue)
+ ((job-queue :initform (nio-utils:concurrent-queue)
:accessor job-queue
:documentation "The queue used to hand off work from an external thread to the io thread")
(request-map :initform (make-hash-table)
@@ -74,7 +74,7 @@
(defmethod process-outgoing-packet((sm yarpc-client-state-machine))
#+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%")
- (let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil)))
+ (let ((ttd (nio-utils:take (job-queue sm) :blocking-call nil)))
(when ttd
#+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd)
(destructuring-bind (job call-string) ttd
@@ -94,4 +94,4 @@
;Execute the call-string on the remote node and call callback with the result
(defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback)
#+nio-debug (format-log t "yarpc-client-state-machine:remote-execute called :sm ~A :call-string ~A :callback ~A~%" sm call-string callback)
- (nio-compat:add (job-queue sm) (list (remote-job callback) call-string)))
+ (nio-utils:add (job-queue sm) (list (remote-job callback) call-string)))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Thu Feb 22 17:50:56 2007
@@ -34,11 +34,11 @@
;;
(defclass yarpc-state-machine (state-machine)
(
- (result-queue :initform (nio-compat:concurrent-queue)
+ (result-queue :initform (nio-utils:concurrent-queue)
:accessor result-queue
:documentation "The queue used to return results from an external thread to the nio thread")))
-(defparameter job-queue (nio-compat:concurrent-queue)
+(defparameter job-queue (nio-utils:concurrent-queue)
"The queue used to hand off work from the NIO thread to an external thread for execution")
(defparameter yarpc-pf (yarpc-packet-factory))
@@ -60,16 +60,16 @@
(defun run-job(&key (blocking t))
#+nio-debug (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%")
- (let ((server-job (nio-compat:take nio-yarpc:job-queue :blocking-call blocking)))
+ (let ((server-job (nio-utils:take nio-yarpc:job-queue :blocking-call blocking)))
(when server-job
(destructuring-bind (job request-id result-queue) server-job
#+nio-debug (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job)
- (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
+ (nio-utils:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
(defmethod process-outgoing-packet((sm yarpc-state-machine))
#+nio-debug2 (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" )
- (let ((server-job (nio-compat:take (result-queue sm) :blocking-call nil)))
+ (let ((server-job (nio-utils:take (result-queue sm) :blocking-call nil)))
(when server-job
(destructuring-bind (request-id result) server-job
#+nio-debug (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result)
@@ -78,7 +78,7 @@
;Process a call method packet by placing it in the job-queue
(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
#+nio-debug (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call)
- (nio-compat:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
+ (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
(when +process-jobs-inline+ (run-job :blocking nil)))
Copied: branches/home/psmith/restructure/src/utils/concurrent-queue.lisp (from r93, branches/home/psmith/restructure/src/compat/concurrent-queue.lisp)
==============================================================================
--- branches/home/psmith/restructure/src/compat/concurrent-queue.lisp (original)
+++ branches/home/psmith/restructure/src/utils/concurrent-queue.lisp Thu Feb 22 17:50:56 2007
@@ -25,16 +25,19 @@
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|#
-(in-package :nio-compat)
+(in-package :nio-utils)
(declaim (optimize (debug 3) (speed 3) (space 0)))
;Implements a threadsafe queue where readers wait for elements of a FIFO queue to appear using a waitqueue
;Modified from sbcl manual example
+
(defclass concurrent-queue()
((buffer-queue :initform (sb-thread:make-waitqueue)
:reader buffer-queue)
+; (buffer-queue-mutex :initform (sb-thread:make-mutex :name "buffer queue mutex")
+; :reader buffer-queue-mutex)
(buffer-lock :initform (sb-thread:make-mutex :name "buffer lock")
:reader buffer-lock)
(buffer :initform nil
@@ -43,47 +46,80 @@
(defun concurrent-queue()
(make-instance 'concurrent-queue))
+
+
(defmacro pop-elt(a-buffer loc)
`(if ,a-buffer
(let ((head (car ,a-buffer)))
(setf ,a-buffer (cdr ,a-buffer))
-#+nio-debug (format t "concurent-queue:take - (~A) read ~A at ~A~%" sb-thread:*current-thread* head ,loc)
+#+nio-debug (format-threadsafe t "concurent-queue:take - (~A,~A) read ~A at ~A~%" sb-thread:*current-thread* (length (buffer queue)) head ,loc)
head)
nil))
+
;Do an (optionally blocking) remove of the element at the head of this queue
(defmethod take ((queue concurrent-queue) &key (blocking-call t))
+#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) attempting to obtain mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue))
(sb-thread:with-mutex ((buffer-lock queue))
+#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) aquired mutex mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue))
;if its there, pop it
(let ((ret (pop-elt (buffer queue) "1sttry")))
(if (or ret (not blocking-call))
ret
(progn
+#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) about to wait on queue~%" sb-thread:*current-thread*)
(sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue))
+#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) notified on queue~%" sb-thread:*current-thread*)
(pop-elt (buffer queue) "2ndtry"))))))
;Append the element to the tail of this queue
(defmethod add ((queue concurrent-queue) elt)
-#+nio-debug (format t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt)
+#+nio-debug (format-threadsafe t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt)
(sb-thread:with-mutex ((buffer-lock queue))
(setf (buffer queue) (append (buffer queue) (list elt)) )
- (sb-thread:condition-notify (buffer-queue queue))))
-
+ (sb-thread:condition-broadcast (buffer-queue queue))))
(defun test-writer(queue)
- (loop for i from 0 to 999 do
- (sleep 0.1)
+ (loop for i from 0 to 100 do
+; (sleep (random 0.1))
+ (format-threadsafe t "Adding ~A~%" i)
(add queue i)))
-(defun test-reader(queue)
+(defun test-reader(queue results)
+ (format-threadsafe t "Started reader ~A~%" sb-thread:*current-thread*)
(loop
- (format t "reader on ~A got elt ~A~%"
- sb-thread:*current-thread* (take queue))))
+ (let ((elt (take queue)))
+ (push elt results)
+ (format-threadsafe t "reader on ~A got elt ~A~%"
+ sb-thread:*current-thread*
+ results))))
+
+(defparameter *results1* (list 999999))
+(defparameter *results2* (list 888888))
(defun test-queue()
(let ((queue (make-instance 'concurrent-queue)))
(sb-thread:make-thread #'(lambda()(test-writer queue)))
- (sleep 10)
+; (sleep 10)
+ (let ((t1 (sb-thread:make-thread #'(lambda()(test-reader queue *results1*)))))
+ ;(t2 (sb-thread:make-thread #'(lambda()(test-reader queue *results2*)))))
+ (sleep 5) ;;wait for it to probably complete
+ (format-threadsafe t "t1 got: ~A~%" *results1*)
+ (format-threadsafe t "t2 got: ~A~%" *results2*)
+ (sb-thread:destroy-thread t1)
+; (sb-thread:destroy-thread t2)
+)
+ (sb-thread:with-mutex ((buffer-lock queue))
+ (assert (eql (length (buffer queue)) 0)))))
+
+(defun test-queue2()
+ (let ((queue (make-instance 'concurrent-queue)))
+ (sb-thread:make-thread #'(lambda()(test-reader queue)))
+ (sb-thread:make-thread #'(lambda()(test-writer queue)))
(sb-thread:make-thread #'(lambda()(test-reader queue)))
- (sb-thread:make-thread #'(lambda()(test-reader queue)))))
+ (sb-thread:make-thread #'(lambda()(test-writer queue)))
+ (sleep 10)
+ (format-threadsafe t "running asserts")
+ (sb-thread:with-mutex ((buffer-lock queue))
+ (assert (eql (length (buffer queue)) 0)))))
Modified: branches/home/psmith/restructure/src/utils/nio-utils-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/utils/nio-utils-package.lisp (original)
+++ branches/home/psmith/restructure/src/utils/nio-utils-package.lisp Thu Feb 22 17:50:56 2007
@@ -30,4 +30,9 @@
;;utils
format-log get-universal-high-res get-readable-time
+
+ ;;concurrent-queue
+ concurrent-queue add take
+
+
))
Modified: branches/home/psmith/restructure/src/utils/nio-utils.asd
==============================================================================
--- branches/home/psmith/restructure/src/utils/nio-utils.asd (original)
+++ branches/home/psmith/restructure/src/utils/nio-utils.asd Thu Feb 22 17:50:56 2007
@@ -6,7 +6,8 @@
:components ((:file "nio-utils-package")
(:file "utils" :depends-on ("nio-utils-package"))
+ (:file "concurrent-queue" :depends-on ("utils"))
)
- :depends-on ())
+ :depends-on (:nio-compat))
Modified: branches/home/psmith/restructure/src/utils/utils.lisp
==============================================================================
--- branches/home/psmith/restructure/src/utils/utils.lisp (original)
+++ branches/home/psmith/restructure/src/utils/utils.lisp Thu Feb 22 17:50:56 2007
@@ -59,6 +59,9 @@
)
+(defparameter *format-mutex* (nio-compat:make-mutex "format lock"))
+
;Format the message to destination but prepend a high res time to the message, useful for logging
(defmacro format-log (destination control-string &rest format-arguments)
- `(format ,destination (concatenate 'string "~A - " ,control-string) (get-readable-high-res-time) ,@format-arguments))
+ `(nio-compat:with-mutex (*format-mutex*)
+ (format ,destination (concatenate 'string "~A - " ,control-string) (get-readable-high-res-time) ,@format-arguments)))