Author: psmith Date: Thu Feb 22 17:50:56 2007 New Revision: 94
Added: branches/home/psmith/restructure/src/utils/concurrent-queue.lisp - copied, changed from r93, branches/home/psmith/restructure/src/compat/concurrent-queue.lisp Removed: branches/home/psmith/restructure/src/compat/concurrent-queue.lisp Modified: branches/home/psmith/restructure/src/compat/nio-compat-package.lisp branches/home/psmith/restructure/src/compat/nio-compat.asd branches/home/psmith/restructure/src/io/nio-server.lisp branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp branches/home/psmith/restructure/src/utils/nio-utils-package.lisp branches/home/psmith/restructure/src/utils/nio-utils.asd branches/home/psmith/restructure/src/utils/utils.lisp Log: moved threadsafe queue and added more tests.
Modified: branches/home/psmith/restructure/src/compat/nio-compat-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/compat/nio-compat-package.lisp (original) +++ branches/home/psmith/restructure/src/compat/nio-compat-package.lisp Thu Feb 22 17:50:56 2007 @@ -30,7 +30,6 @@
;; errno.lisp get-errno +ERRNO_EAGAIN+ perror - - ;;concurrent-queue - concurrent-queue add take + ;;threading + with-mutex make-mutex ))
Modified: branches/home/psmith/restructure/src/compat/nio-compat.asd ============================================================================== --- branches/home/psmith/restructure/src/compat/nio-compat.asd (original) +++ branches/home/psmith/restructure/src/compat/nio-compat.asd Thu Feb 22 17:50:56 2007 @@ -6,7 +6,7 @@
:components ((:file "nio-compat-package") (:file "errno" :depends-on ("nio-compat-package")) - (:file "concurrent-queue" :depends-on ("nio-compat-package")) + (:file "threading" :depends-on ("nio-compat-package")) )
:depends-on (:cffi))
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nio-server.lisp (original) +++ branches/home/psmith/restructure/src/io/nio-server.lisp Thu Feb 22 17:50:56 2007 @@ -34,7 +34,7 @@ t)
;TODO thread safety -(defparameter +connected-sockets-queue+ (nio-compat:concurrent-queue) +(defparameter +connected-sockets-queue+ (nio-utils:concurrent-queue) "List of node objects that are to be connected to")
;loop over hashtable @@ -155,7 +155,7 @@
;add outgoing sockets to event queue #+nio-debug2 (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+) - (loop for node = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null node) do + (loop for node = (nio-utils:take +connected-sockets-queue+ :blocking-call nil) until (null node) do #+nio-debug (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node) (push node *nodes-list*))
@@ -192,4 +192,4 @@ (format t "Connect failed!!~A ~%" (get-errno)))))
(defun add-connection(node) - (nio-compat:add +connected-sockets-queue+ node)) \ No newline at end of file + (nio-utils:add +connected-sockets-queue+ node)) \ No newline at end of file
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd Thu Feb 22 17:50:56 2007 @@ -10,4 +10,4 @@ (:file "yarpc-client-state-machine" :depends-on ("yarpc-packet-factory")) )
- :depends-on (:nio :nio-sm :nio-compat)) \ No newline at end of file + :depends-on (:nio :nio-sm :nio-utils)) \ No newline at end of file
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Thu Feb 22 17:50:56 2007 @@ -33,7 +33,7 @@ ;; A client that accepts jobs to be run via a threadsafe queue and then submits them to the remote end for execution ;; (defclass yarpc-client-state-machine (state-machine) - ((job-queue :initform (nio-compat:concurrent-queue) + ((job-queue :initform (nio-utils:concurrent-queue) :accessor job-queue :documentation "The queue used to hand off work from an external thread to the io thread") (request-map :initform (make-hash-table) @@ -74,7 +74,7 @@
(defmethod process-outgoing-packet((sm yarpc-client-state-machine)) #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%") - (let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil))) + (let ((ttd (nio-utils:take (job-queue sm) :blocking-call nil))) (when ttd #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd) (destructuring-bind (job call-string) ttd @@ -94,4 +94,4 @@ ;Execute the call-string on the remote node and call callback with the result (defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback) #+nio-debug (format-log t "yarpc-client-state-machine:remote-execute called :sm ~A :call-string ~A :callback ~A~%" sm call-string callback) - (nio-compat:add (job-queue sm) (list (remote-job callback) call-string))) + (nio-utils:add (job-queue sm) (list (remote-job callback) call-string)))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Thu Feb 22 17:50:56 2007 @@ -34,11 +34,11 @@ ;; (defclass yarpc-state-machine (state-machine) ( - (result-queue :initform (nio-compat:concurrent-queue) + (result-queue :initform (nio-utils:concurrent-queue) :accessor result-queue :documentation "The queue used to return results from an external thread to the nio thread")))
-(defparameter job-queue (nio-compat:concurrent-queue) +(defparameter job-queue (nio-utils:concurrent-queue) "The queue used to hand off work from the NIO thread to an external thread for execution")
(defparameter yarpc-pf (yarpc-packet-factory)) @@ -60,16 +60,16 @@
(defun run-job(&key (blocking t)) #+nio-debug (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%") - (let ((server-job (nio-compat:take nio-yarpc:job-queue :blocking-call blocking))) + (let ((server-job (nio-utils:take nio-yarpc:job-queue :blocking-call blocking))) (when server-job (destructuring-bind (job request-id result-queue) server-job #+nio-debug (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job) - (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job))))))) + (nio-utils:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
(defmethod process-outgoing-packet((sm yarpc-state-machine)) #+nio-debug2 (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" ) - (let ((server-job (nio-compat:take (result-queue sm) :blocking-call nil))) + (let ((server-job (nio-utils:take (result-queue sm) :blocking-call nil))) (when server-job (destructuring-bind (request-id result) server-job #+nio-debug (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result) @@ -78,7 +78,7 @@ ;Process a call method packet by placing it in the job-queue (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet)) #+nio-debug (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call) - (nio-compat:add job-queue (list (call-string call) (request-id call) (result-queue sm))) + (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm))) (when +process-jobs-inline+ (run-job :blocking nil)))
Copied: branches/home/psmith/restructure/src/utils/concurrent-queue.lisp (from r93, branches/home/psmith/restructure/src/compat/concurrent-queue.lisp) ============================================================================== --- branches/home/psmith/restructure/src/compat/concurrent-queue.lisp (original) +++ branches/home/psmith/restructure/src/utils/concurrent-queue.lisp Thu Feb 22 17:50:56 2007 @@ -25,16 +25,19 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |#
-(in-package :nio-compat) +(in-package :nio-utils)
(declaim (optimize (debug 3) (speed 3) (space 0)))
;Implements a threadsafe queue where readers wait for elements of a FIFO queue to appear using a waitqueue ;Modified from sbcl manual example
+ (defclass concurrent-queue() ((buffer-queue :initform (sb-thread:make-waitqueue) :reader buffer-queue) +; (buffer-queue-mutex :initform (sb-thread:make-mutex :name "buffer queue mutex") +; :reader buffer-queue-mutex) (buffer-lock :initform (sb-thread:make-mutex :name "buffer lock") :reader buffer-lock) (buffer :initform nil @@ -43,47 +46,80 @@ (defun concurrent-queue() (make-instance 'concurrent-queue))
+ + (defmacro pop-elt(a-buffer loc) `(if ,a-buffer (let ((head (car ,a-buffer))) (setf ,a-buffer (cdr ,a-buffer)) -#+nio-debug (format t "concurent-queue:take - (~A) read ~A at ~A~%" sb-thread:*current-thread* head ,loc) +#+nio-debug (format-threadsafe t "concurent-queue:take - (~A,~A) read ~A at ~A~%" sb-thread:*current-thread* (length (buffer queue)) head ,loc) head) nil))
+ ;Do an (optionally blocking) remove of the element at the head of this queue (defmethod take ((queue concurrent-queue) &key (blocking-call t)) +#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) attempting to obtain mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue)) (sb-thread:with-mutex ((buffer-lock queue)) +#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) aquired mutex mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue)) ;if its there, pop it (let ((ret (pop-elt (buffer queue) "1sttry"))) (if (or ret (not blocking-call)) ret (progn +#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) about to wait on queue~%" sb-thread:*current-thread*) (sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue)) +#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) notified on queue~%" sb-thread:*current-thread*) (pop-elt (buffer queue) "2ndtry"))))))
;Append the element to the tail of this queue (defmethod add ((queue concurrent-queue) elt) -#+nio-debug (format t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt) +#+nio-debug (format-threadsafe t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt) (sb-thread:with-mutex ((buffer-lock queue)) (setf (buffer queue) (append (buffer queue) (list elt)) ) - (sb-thread:condition-notify (buffer-queue queue)))) - + (sb-thread:condition-broadcast (buffer-queue queue))))
(defun test-writer(queue) - (loop for i from 0 to 999 do - (sleep 0.1) + (loop for i from 0 to 100 do +; (sleep (random 0.1)) + (format-threadsafe t "Adding ~A~%" i) (add queue i)))
-(defun test-reader(queue) +(defun test-reader(queue results) + (format-threadsafe t "Started reader ~A~%" sb-thread:*current-thread*) (loop - (format t "reader on ~A got elt ~A~%" - sb-thread:*current-thread* (take queue)))) + (let ((elt (take queue))) + (push elt results) + (format-threadsafe t "reader on ~A got elt ~A~%" + sb-thread:*current-thread* + results)))) + +(defparameter *results1* (list 999999)) +(defparameter *results2* (list 888888))
(defun test-queue() (let ((queue (make-instance 'concurrent-queue))) (sb-thread:make-thread #'(lambda()(test-writer queue))) - (sleep 10) +; (sleep 10) + (let ((t1 (sb-thread:make-thread #'(lambda()(test-reader queue *results1*))))) + ;(t2 (sb-thread:make-thread #'(lambda()(test-reader queue *results2*))))) + (sleep 5) ;;wait for it to probably complete + (format-threadsafe t "t1 got: ~A~%" *results1*) + (format-threadsafe t "t2 got: ~A~%" *results2*) + (sb-thread:destroy-thread t1) +; (sb-thread:destroy-thread t2) +) + (sb-thread:with-mutex ((buffer-lock queue)) + (assert (eql (length (buffer queue)) 0))))) + +(defun test-queue2() + (let ((queue (make-instance 'concurrent-queue))) + (sb-thread:make-thread #'(lambda()(test-reader queue))) + (sb-thread:make-thread #'(lambda()(test-writer queue))) (sb-thread:make-thread #'(lambda()(test-reader queue))) - (sb-thread:make-thread #'(lambda()(test-reader queue))))) + (sb-thread:make-thread #'(lambda()(test-writer queue))) + (sleep 10) + (format-threadsafe t "running asserts") + (sb-thread:with-mutex ((buffer-lock queue)) + (assert (eql (length (buffer queue)) 0)))))
Modified: branches/home/psmith/restructure/src/utils/nio-utils-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/utils/nio-utils-package.lisp (original) +++ branches/home/psmith/restructure/src/utils/nio-utils-package.lisp Thu Feb 22 17:50:56 2007 @@ -30,4 +30,9 @@
;;utils format-log get-universal-high-res get-readable-time + + ;;concurrent-queue + concurrent-queue add take + + ))
Modified: branches/home/psmith/restructure/src/utils/nio-utils.asd ============================================================================== --- branches/home/psmith/restructure/src/utils/nio-utils.asd (original) +++ branches/home/psmith/restructure/src/utils/nio-utils.asd Thu Feb 22 17:50:56 2007 @@ -6,7 +6,8 @@
:components ((:file "nio-utils-package") (:file "utils" :depends-on ("nio-utils-package")) + (:file "concurrent-queue" :depends-on ("utils")) )
- :depends-on ()) + :depends-on (:nio-compat))
Modified: branches/home/psmith/restructure/src/utils/utils.lisp ============================================================================== --- branches/home/psmith/restructure/src/utils/utils.lisp (original) +++ branches/home/psmith/restructure/src/utils/utils.lisp Thu Feb 22 17:50:56 2007 @@ -59,6 +59,9 @@
)
+(defparameter *format-mutex* (nio-compat:make-mutex "format lock")) + ;Format the message to destination but prepend a high res time to the message, useful for logging (defmacro format-log (destination control-string &rest format-arguments) - `(format ,destination (concatenate 'string "~A - " ,control-string) (get-readable-high-res-time) ,@format-arguments)) + `(nio-compat:with-mutex (*format-mutex*) + (format ,destination (concatenate 'string "~A - " ,control-string) (get-readable-high-res-time) ,@format-arguments)))