Okay - I changed thread-alive-p in the attached file; (but I did not write the original).
The hash-table modification has now been shifted out of process-wait.
The lock is now returned on an unwind-protect around the entire function's body (in case anything else fails) -.
The tlist-affecting functions have been factored out into other functions, and indenting was fixed.
Condition-notify's primary failure clause has been cleaned up, and made more correct (it now checks for consumption of the notification as independent of receiving it).
Martin - thanks for correcting this with respect to Lispworks, please let me know if anything else looks amiss.
Everybody else - look, I'm a windows guy and am not too up on darcs or unix diff files - but my change is a big addition to a file that doesn't see a lot of use. Please diff and add from the attached file.
Martin Simmons wrote:
On Tue, 11 Aug 2009 00:26:39 +0200, Stelian Ionescu said:
On Sat, 2009-08-01 at 18:46 -0500, Matt Lamari wrote:
I'm still around if anyone needs support on the functions I've added.
I've cleaned up the code a little(attached as patch against the darcs repository), however, before merging I need you to split condition-wait in at least 3-4 separate functions because it's way too big(two screenfuls here). When you're done, please send a unified diff done against the darcs repository.
I have some comments on the patch:
It would be better to implement thread-alive-p by calling mp:process-alive-p.
The relocking of the caller's lock in condition-wait should be inside the unwind-protect cleanup, otherwise you can throw without the lock held.
Claiming a lock and doing hash table manipulation inside a mp:process-wait predicate ("Waiting for notification") is not good practice because the predicate is suppose to be a pure function. It should be safe to check the value of wakeup-allowed-to-proceed without the lock and manipulate the hash table after mp:process-wait has returned.
Is the prog1 in condition-wait redundant?
#| Copyright 2006, 2007 Greg Pfeil
Distributed under the MIT license (see LICENSE file) |#
(in-package #:bordeaux-threads)
;;; documentation on the LispWorks Multiprocessing interface can be found at ;;; http://www.lispworks.com/documentation/lw445/LWUG/html/lwuser-156.htm
; (mp:initialize-multiprocessing)
;;; Thread Creation
(defun make-thread (function &key name) (mp:process-run-function name nil function))
(defun current-thread () mp:*current-process*)
(defun threadp (object) (typep object 'mp:process))
(defun thread-name (thread) (mp:process-name thread))
;;; Resource contention: locks and recursive locks
(defun make-lock (&optional name) (mp:make-lock :name name))
(defun make-recursive-lock (&optional name) (mp:make-lock :name name))
(defun acquire-lock (lock &optional (wait-p t)) (mp:process-lock lock nil (if wait-p (if (typep wait-p 'number) wait-p nil) 0)))
(defun release-lock (lock) (mp:process-unlock lock))
;;; Apparently this EVAL-WHEN is needed so that the macro is available ;;; when compiling condition-variables.lisp (eval-when (:compile-toplevel :load-toplevel :execute) (defmacro with-lock-held ((place) &body body) `(mp:with-lock (,place) ,@body)))
(defmacro with-recursive-lock-held ((place &key timeout) &body body) `(mp:with-lock (,place :timeout ,timeout :whostate "Waiting For Lock") ,@body))
;;; Resource contention: condition variables
(defun thread-yield () (mp:process-allow-scheduling))
;;; Introspection/debugging
(defun all-threads () (mp:list-all-processes))
(defun interrupt-thread (thread function) (mp:process-interrupt thread function))
(defun destroy-thread (thread) (signal-error-if-current-thread thread) (mp:process-kill thread))
(defun thread-alive-p (thread) (mp:process-alive-p thread))
; Lispworks condition support is simulated, albeit via a lightweight wrapper over ; its own polling-based wait primitive. Waiters register with the condition variable, ; and use MP:process-wait which queries for permission to proceed at its own (usspecified) interval. ; http://www.lispworks.com/documentation/lw51/LWRM/html/lwref-445.htm ; A wakeup callback (on notify) is provided to lighten this query to not have to do a hash lookup ; on every poll (or have to serialize on the condition variable) and a mechanism is put ; in place to unregister any waiter that exits wait for other reasons, ; and to resend any (single) notification that may have been consumed before this (corner ; case). Much of the complexity present is to support single notification (as recommended in ; the spec); but a distinct condition-notify-all is provided for reference. ; Single-notification follows a first-in first-out ordering ; ; Performance: With 1000 threads waiting on one condition-variable, the steady-state hit (at least ; as tested on a 3GHz Win32 box) is noise - hovering at 0% on Task manager. ; While not true zero like a true native solution, the use of the Lispworks native checks appear ; fast enough to be an equivalent substitute (thread count will cause issue before the ; waiting overhead becomes significant) (defstruct condition-variable (lock (mp:make-lock :name "For condition-variable") :type mp:lock :read-only t) (wait-tlist (cons nil nil) :type cons :read-only t) (wait-hash (make-hash-table :test 'eq) :type hash-table :read-only t) ; unconsumed-notifications is to track :remove-from-consideration ; for entries that may have exited prematurely - notification is sent through ; to someone else, and offender is removed from hash and list (unconsumed-notifications (make-hash-table :test 'eq) :type hash-table :read-only t)) ; make-condition-variable the default constructor for condition-variable
(defmacro with-cv-access (condition-variable &body body) (let ((cv-sym (gensym)) (slots '(lock wait-tlist wait-hash unconsumed-notifications))) `(let ((,cv-sym ,condition-variable)) (with-slots ,slots ,cv-sym (macrolet ((locked (&body body) `(mp:with-lock (lock) ,@body))) (labels ((,(gensym) () ,@slots))) ; Trigger expansion of the symbol-macrolets to ignore ,@body)))))
(defmacro defcvfun (function-name (condition-variable &rest args) &body body) `(defun ,function-name (,condition-variable ,@args) (with-cv-access ,condition-variable ,@body))) #+lispworks (editor:setup-indent "defcvfun" 2 2 7) ; indent defcvfun
; utility function thath assumes process is locked on condition-variable's lock. (defcvfun do-notify-single (condition-variable) ; assumes already locked (let ((id (caar wait-tlist))) (when id (pop (car wait-tlist)) (unless (car wait-tlist) ; check for empty (setf (cdr wait-tlist) nil)) (funcall (gethash id wait-hash)) ; call waiter-wakeup (remhash id wait-hash) ; absence of entry = permission to proceed (setf (gethash id unconsumed-notifications) t))))
; Added for completeness/to show how it's done in this paradigm; but ; The symbol for this call is not exposed in the api (defcvfun condition-notify-all (condition-variable) (locked (loop for waiter-wakeup being the hash-values in wait-hash do (funcall waiter-wakeup)) (clrhash wait-hash) (clrhash unconsumed-notifications) ; don't care as everyone just got notified (setf (car wait-tlist) nil) (setf (cdr wait-tlist) nil)))
; Currently implemented so as to notify only one waiting thread (defcvfun condition-notify (condition-variable) (locked (do-notify-single condition-variable)))
(defun delete-from-tlist (tlist element) (let ((deleter (lambda () (setf (car tlist) (cdar tlist)) (unless (car tlist) (setf (cdr tlist) nil))))) (loop for cons in (car tlist) do (if (eq element (car cons)) (progn (funcall deleter) (return nil)) (let ((cons cons)) (setq deleter (lambda () (setf (cdr cons) (cddr cons)) (unless (cdr cons) (setf (cdr tlist) cons)))))))))
(defun add-to-tlist-tail (tlist element) (let ((new-link (cons element nil))) (cond ((car tlist) (setf (cddr tlist) new-link) (setf (cdr tlist) new-link)) (t (setf (car tlist) new-link) (setf (cdr tlist) new-link)))))
(defcvfun condition-wait (condition-variable lock-) (mp:process-unlock lock-) (unwind-protect ; for the re-taking of the lock. Guarding all of the code (let ((wakeup-allowed-to-proceed nil) (wakeup-lock (mp:make-lock :name "wakeup lock for condition-wait"))) ; wakeup-allowed-to-proceed is an optimisation to avoid having to serialize all waiters and ; search the hashtable. That it is locked is for safety/completeness, although ; as wakeup-allowed-to-proceed only transitions nil -> t, and that missing it once or twice is ; moot in this situation, it would be redundant even if ever a Lispworks implementation ever became ; non-atomic in its assigments (let ((id (cons nil nil)) (clean-exit nil)) (locked (add-to-tlist-tail wait-tlist id) (setf (gethash id wait-hash) (lambda () (mp:with-lock (wakeup-lock) (setq wakeup-allowed-to-proceed t))))) (unwind-protect (progn (mp:process-wait "Waiting for notification" (lambda () (when (mp:with-lock (wakeup-lock) wakeup-allowed-to-proceed) (locked (not (gethash id wait-hash)))))) (locked (remhash id unconsumed-notifications)) (setq clean-exit t)) ; Notification was consumed ; Have to call remove-from-consideration just in case process was interrupted ; rather than having condition met (unless clean-exit ; clean-exit is just an optimization (locked (when (gethash id wait-hash) ; not notified - must have been interrupted ; Have to unsubscribe (remhash id wait-hash) (delete-from-tlist wait-tlist id)) ; note - it's possible to be removed from wait-hash/wait-tlist (in notify-single); but still have an unconsumed notification! (when (gethash id unconsumed-notifications) ; Must have exited for reasons unrelated to notification (remhash id unconsumed-notifications) ; Have to pass on the notification to an eligible waiter (do-notify-single condition-variable))))))) (mp:process-lock lock-)))
; Generally safe sanity check for the locks and single-notify (defun unit-test-lw-conditions () (let ((condition-variable (make-condition-variable)) (test-lock (make-lock)) (completed nil)) (loop for id from 0 to 5 do (let ((id id)) (make-thread (lambda () (with-lock-held (test-lock) (condition-wait condition-variable test-lock) (push id completed) (condition-notify condition-variable)))))) (sleep 2) (if completed (print "Failed: Premature passage through condition-wait") (print "Successfully waited on condition")) (condition-notify condition-variable) (sleep 2) (if (and completed (eql (length completed) 6) (equal (sort completed #'<) (loop for id from 0 to 5 collect id))) (print "Success: All elements notified") (print (format nil "Failed: Of 6 expected elements, only ~A proceeded" completed))) (with-cv-access condition-variable (if (and (not (or (car wait-tlist) (cdr wait-tlist))) (zerop (hash-table-count wait-hash)) (zerop (hash-table-count unconsumed-notifications))) (print "Success: condition variable restored to initial state") (print "Error: condition variable retains residue from completed waiters")))
(setq completed nil) (loop for id from 0 to 5 do (let ((id id)) (make-thread (lambda () (with-lock-held (test-lock) (condition-wait condition-variable test-lock) (push id completed)))))) (sleep 2) (condition-notify condition-variable) (sleep 2) (if (= (length completed) 1) (print "Success: Notify-single only notified a single waiter to restart") (print (format nil "Failure: Notify-single restarted ~A items" (length completed)))) (condition-notify condition-variable) (sleep 2) (if (= (length completed) 2) (print "Success: second Notify-single only notified a single waiter to restart") (print (format nil "Failure: Two Notify-singles restarted ~A items" (length completed)))) (loop for i from 0 to 5 do (condition-notify condition-variable)) (print "Note: In the case of any failures, assume there are outstanding waiting threads") (values)))
(defun thread-alive-p (thread) (mp:process-alive-p thread))
(mark-supported)