Author: psmith Date: Thu Feb 22 19:18:57 2007 New Revision: 95
Modified: branches/home/psmith/restructure/src/utils/concurrent-queue.lisp Log: Fixed problem with NIL being read from queue and then waiting when we should have returned
Modified: branches/home/psmith/restructure/src/utils/concurrent-queue.lisp ============================================================================== --- branches/home/psmith/restructure/src/utils/concurrent-queue.lisp (original) +++ branches/home/psmith/restructure/src/utils/concurrent-queue.lisp Thu Feb 22 19:18:57 2007 @@ -47,51 +47,42 @@ (make-instance 'concurrent-queue))
- -(defmacro pop-elt(a-buffer loc) - `(if ,a-buffer - (let ((head (car ,a-buffer))) - (setf ,a-buffer (cdr ,a-buffer)) -#+nio-debug (format-threadsafe t "concurent-queue:take - (~A,~A) read ~A at ~A~%" sb-thread:*current-thread* (length (buffer queue)) head ,loc) - head) - nil)) - - ;Do an (optionally blocking) remove of the element at the head of this queue (defmethod take ((queue concurrent-queue) &key (blocking-call t)) -#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) attempting to obtain mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue)) +#+nio-debug (format-log t "concurent-queue:take - (~A) attempting to obtain mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue)) (sb-thread:with-mutex ((buffer-lock queue)) -#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) aquired mutex mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue)) +#+nio-debug (format-log t "concurent-queue:take - (~A) aquired mutex mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue)) ;if its there, pop it - (let ((ret (pop-elt (buffer queue) "1sttry"))) - (if (or ret (not blocking-call)) - ret - (progn -#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) about to wait on queue~%" sb-thread:*current-thread*) - (sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue)) -#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) notified on queue~%" sb-thread:*current-thread*) - (pop-elt (buffer queue) "2ndtry")))))) + (if (> (length (buffer queue)) 0) + (pop (buffer queue)) + (when blocking-call + (loop +#+nio-debug (format-log t "concurent-queue:take - (~A) about to wait on queue~%" sb-thread:*current-thread*) + (sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue)) +#+nio-debug (format-log t "concurent-queue:take - (~A) notified on queue~%" sb-thread:*current-thread*) + (if (> (length (buffer queue)) 0) + (return-from take (pop (buffer queue)))))))))
;Append the element to the tail of this queue (defmethod add ((queue concurrent-queue) elt) -#+nio-debug (format-threadsafe t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt) +#+nio-debug (format-log t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt) (sb-thread:with-mutex ((buffer-lock queue)) (setf (buffer queue) (append (buffer queue) (list elt)) ) - (sb-thread:condition-broadcast (buffer-queue queue)))) + (sb-thread:condition-notify (buffer-queue queue))))
(defun test-writer(queue) (loop for i from 0 to 100 do ; (sleep (random 0.1)) - (format-threadsafe t "Adding ~A~%" i) + (format-log t "Adding ~A~%" i) (add queue i)))
(defun test-reader(queue results) - (format-threadsafe t "Started reader ~A~%" sb-thread:*current-thread*) + (format-log t "Started reader ~A~%" sb-thread:*current-thread*) (loop (let ((elt (take queue))) (push elt results) - (format-threadsafe t "reader on ~A got elt ~A~%" + (format-log t "reader on ~A got elt ~A~%" sb-thread:*current-thread* results))))
@@ -105,8 +96,8 @@ (let ((t1 (sb-thread:make-thread #'(lambda()(test-reader queue *results1*))))) ;(t2 (sb-thread:make-thread #'(lambda()(test-reader queue *results2*))))) (sleep 5) ;;wait for it to probably complete - (format-threadsafe t "t1 got: ~A~%" *results1*) - (format-threadsafe t "t2 got: ~A~%" *results2*) + (format-log t "t1 got: ~A~%" *results1*) + (format-log t "t2 got: ~A~%" *results2*) (sb-thread:destroy-thread t1) ; (sb-thread:destroy-thread t2) ) @@ -120,6 +111,6 @@ (sb-thread:make-thread #'(lambda()(test-reader queue))) (sb-thread:make-thread #'(lambda()(test-writer queue))) (sleep 10) - (format-threadsafe t "running asserts") + (format-log t "running asserts") (sb-thread:with-mutex ((buffer-lock queue)) (assert (eql (length (buffer queue)) 0)))))