;; ipc.lisp -- IPC with SYS V message queues. ;; ;; Written by Helmut Eller 2004 ;; ;; This package implements some Erlang-style concurrency primitives ;; with Unix processes and SYS V message queues. The following ;; primitives are available: ;; ;; spawn: forks a new process and calls a function in the new process. ;; ;; snd: sends a message to another process. The message must be ;; PRINTable and READable. ;; ;; rcv: receives a message. A test predicate can be used to ;; emulate `selective receive'. This operation blocks if there's no ;; message in the mailbox, but a timeout value can be specified. ;; ;; Some simple usage examples are at the end of this file. ;; ;; Caution: Erlangish link/unlink is not supported, so it is quite ;; difficult to handle errors in other processes. Currently a child ;; process prints a backtrace and exits instead of entering a ;; debugger. There's also no way to discover or kill other processes. ;; You'll have to use ps etc. for this. The SYS V message queues are ;; not reliably removed. Again you'll have to use ipcs and ipcrm to ;; do it manually. ;; ;; This code is written for CMUCL.
(defpackage :ipc (:use :cl :ext :unix :alien :c-call) (:export :spawn :snd :rcv :self))
(in-package :ipc)
(defvar *ipc-debug* t)
(defun dbg (fstring &rest args) (when *ipc-debug* (apply #'format *debug-io* fstring args) (finish-output *debug-io*)))
(defmacro ccall ((name rettype &rest argtypes) &rest args) "Call the C function NAME with ARGS." `(alien:alien-funcall (alien:extern-alien ,name (function ,rettype ,@argtypes)) ,@args))
(defmacro let-errno ((var errno value) &body body) "Execute VALUE; bind the result to VAR, bind ERRNO, and execute BODY." `(let ((,var ,value)) (let ((,errno ;; unix-errno is suddenly a function in recent cmucls. ,(cond ((fboundp 'unix:unix-errno) '(unix:unix-errno)) (t `(progn (unix::unix-get-errno) unix:unix-errno))))) ,@body)))
(def-alien-type sap system-area-pointer) (def-alien-type uint unsigned)
(defun unix-error (errno) (error "~A" (get-unix-error-msg errno)))
(defconstant +ipc_nowait+ #x800) (defconstant +ipc_creat+ #x200) (defconstant +ipc_excl+ #x400) (defconstant +ipc_set+ #x1) (defconstant +ipc_rmid+ #x0)
(defun %msgget (key flags) (ccall (msgget int uint uint) key flags))
(defun make-mq () "Create a new SYS V message queue. Return the (per process) queue handle and the public key." (let ((key (random #xffff))) (let-errno (mq errno (%msgget key (logior #o600 +ipc_creat+ +ipc_excl+))) (cond ((/= mq -1) (values mq key)) ((= errno unix:eexist) (make-mq)) ; retry with new key (t (unix-error errno))))))
(defun get-mq (key) "Get the (existing) message queue with id KEY." (let-errno (mq errno (%msgget key #o600)) (cond ((= mq -1) (unix-error errno)) (t mq))))
(defun delete-mq (mq) (dbg "; Closing: ~A (pid ~A)~%" mq (unix-getpid)) (let-errno (code errno (ccall (msgctl int int int sap) mq +ipc_rmid+ (sys:int-sap 0))) (cond ((zerop code)) (t (unix-error errno)))))
(defun %msgsnd (mq string) (ccall (msgsnd int int sap uint uint) mq (sys:vector-sap string) (length string) +ipc_nowait+))
(defun msgsnd (mq string) "Write STRING to message queue MQ." (let-errno (code errno (%msgsnd mq string)) (cond ((/= code -1) nil) (t (unix-error errno)))))
(defun %msgrcv (mq buffer) (ccall (msgrcv int int sap uint uint uint) mq (sys:vector-sap buffer) (length buffer) 0 0))
(defun msgrcv (mq) "Return the first message in MQ." (let ((buffer (make-string 8192))) (let-errno (code errno (%msgrcv mq buffer)) (cond ((/= code -1) (subseq buffer 0 code)) ((= errno unix:eintr) (msgrcv mq)) (t (unix-error errno))))))
(defun decode (string) (with-standard-io-syntax (read-from-string string)))
(defun encode (message) (with-standard-io-syntax (prin1-to-string message)))
;;; Process structure (pendant to Erlangs PID)
(defstruct (process (:conc-name process.) (:predicate process?)) ;; Unix PID (pid (required-argument) :type fixnum :read-only t) ;; The key of the SYSV message queue (qid (required-argument) :type fixnum :read-only t))
(defvar *queues* (make-hash-table) "Maps pids to SYSV IPC message queues.")
(defun process-queue (process) (or (gethash (process.pid process) *queues*) (setf (process-queue process) (get-mq (process.qid process)))))
(defun (setf process-queue) (queue process) (setf (gethash (process.pid process) *queues*) queue))
(defvar *self* nil) (defvar *parent* nil) (defvar *linked-processes* '()) (defvar *trap-exit* nil)
(defun init-self () "Allocate a new message queue and initialize *SELF*." (multiple-value-bind (mq qid) (make-mq) (let ((p (make-process :pid (unix-getpid) :qid qid))) (setf (process-queue p) mq) (setf *self* p))))
(defun self () (or *self* (init-self))) (defun parent () *parent*)
(defun self? (process) "Is PROCESS this unix process?" (equalp process (self)))
(defun alive? (process) (unix-kill (process.pid process) :check))
(defvar *mailbox* (list))
(defun snd (message process) "Send MESSAGE to PROCESS." (cond ((self? process) (setf *mailbox* (nconc *mailbox* (list message)))) (t (msgsnd (process-queue process) (encode message)))))
;;; SYS V message queues have no support for receive with timeouts. ;;; We use the itimer to implement timeouts.
(defun assert-timer-unused () "Raise an error if the reatime is already in use." (multiple-value-bind (ok isec iusec vsec vusec) (unix-getitimer :real) (declare (ignore ok isec iusec)) (assert (and (zerop vsec) (zerop vusec)) () "itimer already used")))
(defun decode-timeout (timeout) "Return the timevalue TIMEOUT as to two values integers SECS USECS. TIMEOUT the number of seconds." (multiple-value-bind (secs rem) (truncate timeout) (let ((usecs (round (* rem 1000000)))) (assert (or (plusp secs) (and (zerop secs) (plusp usecs)))) (values secs usecs))))
(defun with-timeout (timeout fn timeout-fn) "Call FN. If executing FN completes withing TIMEOUT seconds return the result. Otherwise abort, unwind the stack and call TIMEOUT-FN instead. TIMEOUT is in seconds (can be a float). Return either the result of executing FN or the result of calling TIMEOUT-FN." (if (minusp timeout) (funcall timeout-fn) (%with-timeout timeout fn timeout-fn)))
(defun %with-timeout (timeout fn timeout-fn) (block abort (return-from %with-timeout (labels ((handler (signal code scp) (declare (ignore signal code scp)) (return-from abort nil))) (multiple-value-bind (secs usecs) (decode-timeout timeout) (assert-timer-unused) (sys:with-enabled-interrupts ((unix:sigalrm #'handler)) (unix-setitimer :real 0 0 secs usecs) (unwind-protect (funcall fn) (unix-setitimer :real 0 0 0 0))))))) (funcall timeout-fn))
(defun endtime (timeout) (+ (get-internal-real-time) (/ timeout (coerce internal-time-units-per-second 'double-float))))
(defun seconds-until (endtime) (* (- endtime (get-internal-real-time)) internal-time-units-per-second))
(defun rcv-from-mq (mq test endtime) "Return the first message from MQ satisfying the predicate TEST.
The second return value is non-nil when timeout ENDTIME expires." (let ((msg (if endtime (with-timeout (seconds-until endtime) (lambda () (decode (msgrcv mq))) (lambda () (return-from rcv-from-mq (values nil t)))) (decode (msgrcv mq))))) (cond ((funcall test msg) (values msg nil)) (t (setf *mailbox* (nconc *mailbox* (list msg))) (rcv-from-mq mq test endtime)))))
(defun rcv (&key (test (constantly t)) timeout) "Receive the next message which satisfies TEST.
Return the message and a flag indicating whether the timeout expired." (let ((tail (member-if test *mailbox*))) (cond (tail (setf *mailbox* (nconc (ldiff *mailbox* tail) (cdr tail))) (values (car tail) nil)) (t (rcv-from-mq (process-queue (self)) test (if timeout (endtime timeout)))))))
(in-package :vm)
(eval-when (:compile-toplevel :load-toplevel :execute)
(defknown %reset-csp (function) nil)
;; Reset the control stack and call FN with the empty stack. ;; Unwind-protect blocks are reset and NOT executed. (define-vop (reset-csp) (:policy :fast-safe) (:args (fn :scs (descriptor-reg control-stack))) (:save-p t) (:translate %reset-csp) (:generator 0 (move eax-tn fn) (load-foreign-data-symbol esp-tn "control_stack_end") (inst mov esp-tn (make-ea :dword :base esp-tn)) (store-symbol-value 0 lisp::*current-catch-block*) (store-symbol-value 0 lisp::*current-unwind-protect-block*) (inst push 0) (inst mov ebp-tn esp-tn) (inst mov esi-tn esp-tn) (inst jmp (make-fixup 'tail-call-variable :assembly-routine)))))
(in-package :ipc)
(defun call-with-empty-stacks (fn) (mp::unbind-binding-stack) (setf x86::*alien-stack* (kernel:make-lisp-obj mp::*alien-stack-top*)) (setf kernel:*eval-stack-top* 0) (setf (kernel:binding-stack-pointer-sap) (alien:extern-alien "binding_stack" system-area-pointer)) (vm::%reset-csp fn))
(defun child-debugger-hook (condition hook) "Print a backtrace and exit." (declare (ignore hook)) (ignore-errors (format *debug-io* "~2&~A~% [Condition of type ~S]~2&" (debug::safe-condition-message condition) (type-of condition))) (ignore-errors (debug:backtrace)) (throw '%exit-child nil))
(defun start-child (stream mq fn args) (let ((*standard-output* stream) (*debug-io* stream) (*error-output* stream) (*query-io* stream) (*standard-input* stream)) (catch '%exit-child (unwind-protect (let ((*debugger-hook* (lambda (c h) (child-debugger-hook c h)))) (apply fn args)) (ignore-errors (delete-mq mq) (dbg "; Child exited: ~S~%" (self)) (lisp::finish-standard-output-streams)) (unix:unix-exit)))))
(defun initialize-child (parent mq qid) (setf *parent* parent) (setf *self* (make-process :pid (unix-getpid) :qid qid)) (setf (process-queue *self*) mq) (setf *mailbox* (list)))
(defun %spawn (fn args) (multiple-value-bind (mq qid) (make-mq) (let ((parent (self))) (mapc #'force-output (list *standard-output* *debug-io* *query-io*)) ;; CMUCL's SIGCHLD handler (sys:enable-interrupt unix:sigchld #'ext::sigchld-handler) (multiple-value-bind (pid errno) (unix-fork) (cond ((and pid (zerop pid)) (initialize-child parent mq qid) (let ((io *debug-io*)) (call-with-empty-stacks (lambda () (start-child io mq fn args))))) (pid (dbg "; Forked: ~D ~D (parent ~D)~%" pid qid (unix-getpid)) (let ((child (make-process :pid pid :qid qid))) (setf (process-queue child) mq) child)) (t (error "fork: ~A" (get-unix-error-msg errno))))))))
(defun spawn (fn &rest args) "Create a new process and apply FN to ARGS in the new process." (%spawn fn args))
#|
;; The counter process maintains the current value. Operations on the ;; counter are invoked via messges. The messages are as follows: ;; :up incerments the counter ;; :quit termintates the counter process ;; <a process> sends the current value to the process. (defun counter (value) (dbg "value: ~S~%" value) (let ((msg (rcv))) (dbg "msg: ~S~%" msg) (cond ((eq msg :up) (counter (1+ value))) ((eq msg :down) (counter (1- value))) ((eq msg :quit) ;; return ) ((process? msg) (snd value msg) (counter value)))))
(defun test-counter () (let ((p (spawn (lambda () (counter 0))))) (snd :up p) (snd (self) p) (format t "~&value: ~A~&" (rcv)) (snd :up p) (snd (self) p) (format t "~&value: ~A~&" (rcv)) (snd :quit p)))
;; This expamle counts the nodes in a cons tree by spawing a new ;; process for each subtree. (defun count-nodes-aux (tree parent) "Count the number of nodes in TREE an send it to PARENT." (cond ((atom tree) (snd 1 parent)) (t (spawn #'count-nodes-aux (car tree) (self)) (spawn #'count-nodes-aux (cdr tree) (self)) (snd (+ 1 (rcv) (rcv)) parent))))
(defun count-nodes (tree) (count-nodes-aux tree (self)) (rcv))
(defun test-count-nodes () (time (count-nodes '(a ((b c)) d e ))))
|#
;; Local Variables: ;; eval: (put 'let-errno 'common-lisp-indent-function 1) ;; End: