Author: psmith Date: Tue Jan 16 20:34:39 2007 New Revision: 38
Added: branches/home/psmith/restructure/src/compat/concurrent-queue.lisp Log: Added concurrent queue
inter thread communication via a FIFO queue
Added: branches/home/psmith/restructure/src/compat/concurrent-queue.lisp ============================================================================== --- (empty file) +++ branches/home/psmith/restructure/src/compat/concurrent-queue.lisp Tue Jan 16 20:34:39 2007 @@ -0,0 +1,85 @@ +#| +Copyright (c) 2007 +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. +3. The name of the author may not be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR +IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT +NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +|# + +(in-package :nio-compat) + +(declaim (optimize (debug 3) (speed 3) (space 0))) + +;Implements a threadsafe queue where readers wait for elements of a FIFO queue to appear using a waitqueue +;Modified from sbcl manual example + +(defclass concurrent-queue() + ((buffer-queue :initform (sb-thread:make-waitqueue) + :reader buffer-queue) + (buffer-lock :initform (sb-thread:make-mutex :name "buffer lock") + :reader buffer-lock) + (buffer :initform nil + :accessor buffer))) + +(defmacro pop-elt(a-buffer loc) + `(if ,a-buffer + (let ((head (car ,a-buffer))) + (setf ,a-buffer (cdr ,a-buffer)) +#+nio-debug (format t "reader ~A woke, read ~A as ~A~%" sb-thread:*current-thread* head ,loc) + head) + nil)) + + +(defmethod take ((queue concurrent-queue)) + (sb-thread:with-mutex ((buffer-lock queue)) + ;if its there, pop it + (let ((ret (pop-elt (buffer queue) "1sttry"))) + (if ret + ret + (progn + (sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue)) + (pop-elt (buffer queue) "2ndtry")))))) + + +(defmethod add ((queue concurrent-queue) elt) + (sb-thread:with-mutex ((buffer-lock queue)) + (setf (buffer queue) (append (buffer queue) (list elt)) ) + (sb-thread:condition-notify (buffer-queue queue)))) + + + +(defun test-writer(queue) + (loop for i from 0 to 999 do + (sleep 0.1) + (add queue i))) + +(defun test-reader(queue) + (loop + (format t "reader on ~A got elt ~A~%" + sb-thread:*current-thread* (take queue)))) + +(defun test-queue() + (let ((queue (make-instance 'concurrent-queue))) + (sb-thread:make-thread #'(lambda()(test-writer queue))) + (sleep 10) + (sb-thread:make-thread #'(lambda()(test-reader queue))) + (sb-thread:make-thread #'(lambda()(test-reader queue)))))