Author: psmith Date: Wed Jan 17 23:01:11 2007 New Revision: 41
Modified: branches/home/psmith/restructure/run-yarpc.lisp branches/home/psmith/restructure/src/buffer/buffer.lisp branches/home/psmith/restructure/src/compat/concurrent-queue.lisp branches/home/psmith/restructure/src/io/async-fd.lisp branches/home/psmith/restructure/src/io/async-socket.lisp branches/home/psmith/restructure/src/io/nio-server.lisp branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp branches/home/psmith/restructure/src/statemachine/state-machine.lisp Log: Yarpc working end-to-end
Modified: branches/home/psmith/restructure/run-yarpc.lisp ============================================================================== --- branches/home/psmith/restructure/run-yarpc.lisp (original) +++ branches/home/psmith/restructure/run-yarpc.lisp Wed Jan 17 23:01:11 2007 @@ -2,11 +2,10 @@ (require :asdf) (require :nio-yarpc)
-(let ((jobq (nio-compat:concurrent-queue))) - (sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity #'(lambda()(nio-yarpc:yarpc-state-machine jobq)) :host "127.0.0.1")) :name "nio-server") - (format t "server toplevel waiting for job~%" ) - (loop +(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1")) :name "nio-server") +(loop ;;block waiting for jobs - (multiple-value-bind (job result-queue) (nio-compat:take jobq) + (format t "Server toplevel waiting for job~%" ) + (destructuring-bind (job result-queue) (nio-compat:take nio-yarpc:job-queue) (format t "Server received job ~A~%" job) - (nio-compat:add result-queue (nio-yarpc:execute-call job))))) + (nio-compat:add result-queue (nio-yarpc:execute-call job))))
Modified: branches/home/psmith/restructure/src/buffer/buffer.lisp ============================================================================== --- branches/home/psmith/restructure/src/buffer/buffer.lisp (original) +++ branches/home/psmith/restructure/src/buffer/buffer.lisp Wed Jan 17 23:01:11 2007 @@ -55,25 +55,45 @@
;;Utils by slyrus (http://paste.lisp.org/display/11149) (defun hex-dump-byte (address) - (format nil "~2,'0X" - (sb-alien:deref - (sb-alien:sap-alien - (sb-alien::int-sap address) - (* (sb-alien:unsigned 8)))))) + (format nil "~2,'0X" (byte-value address))) + +(defun byte-value (address) + (sb-alien:deref + (sb-alien:sap-alien + (sb-alien::int-sap address) + (* (sb-alien:unsigned 8)))))
(defun hex-dump-memory (start-address length) (loop for i from start-address below (+ start-address length) collect (format nil (hex-dump-byte i))))
+;;-- end utils + + +(defun pretty-hex-dump (start-address length) +; (format t "start: ~A length ~A~%" start-address length) + (with-output-to-string (str) + (let ((rows (floor (/ length 16)))) +; (format t "rows: ~A remainder ~A~%" rows remainder) + (dotimes (row-index (+ 1 rows)) + (format str "~A~%" + (with-output-to-string (readable) + (dotimes (column-index 16) + (let ((address (+ start-address (* row-index 16) column-index))) + ; (format t "Current address : ~A~%" address) + (if (>= address (+ start-address length)) + (progn + (format str "--") + (format readable "--")) + (progn + (format str (if (eql column-index 7) "~A " "~A ") (hex-dump-byte address)) + (format readable "~A" (code-char (byte-value address)))))))))))))
(defun make-uint8-seq (size) "Make uint8 sequence." (make-sequence '(vector (unsigned-byte 8)) size :initial-element 0))
-;;-- end utils - - ;;A buffer that deals with bytes (defclass byte-buffer (buffer)())
@@ -83,7 +103,7 @@
(defmethod print-object ((byte-buffer byte-buffer) stream) (with-slots (capacity position limit buf) byte-buffer - (format stream "<byte-buffer :capacity ~A :position ~A :limit ~A :buf ~%~A>~%" capacity position limit (if buf (hex-dump-memory (cffi:pointer-address buf) limit) nil)))) + (format stream "<byte-buffer :capacity ~A :position ~A :limit ~A :buf ~%~A>~%" capacity position limit (if buf (pretty-hex-dump (cffi:pointer-address buf) limit) nil))))
(defmethod free-buffer((byte-buffer byte-buffer)) (with-slots (capacity position limit buf) byte-buffer
Modified: branches/home/psmith/restructure/src/compat/concurrent-queue.lisp ============================================================================== --- branches/home/psmith/restructure/src/compat/concurrent-queue.lisp (original) +++ branches/home/psmith/restructure/src/compat/concurrent-queue.lisp Wed Jan 17 23:01:11 2007 @@ -47,7 +47,7 @@ `(if ,a-buffer (let ((head (car ,a-buffer))) (setf ,a-buffer (cdr ,a-buffer)) -#+nio-debug (format t "reader ~A woke, read ~A as ~A~%" sb-thread:*current-thread* head ,loc) +#+nio-debug (format t "concurent-queue:take - (~A) read ~A at ~A~%" sb-thread:*current-thread* head ,loc) head) nil))
@@ -64,6 +64,7 @@
;Append the element to the tail of this queue (defmethod add ((queue concurrent-queue) elt) +#+nio-debug (format t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt) (sb-thread:with-mutex ((buffer-lock queue)) (setf (buffer queue) (append (buffer queue) (list elt)) ) (sb-thread:condition-notify (buffer-queue queue))))
Modified: branches/home/psmith/restructure/src/io/async-fd.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/async-fd.lisp (original) +++ branches/home/psmith/restructure/src/io/async-fd.lisp Wed Jan 17 23:01:11 2007 @@ -125,11 +125,12 @@ (error 'read-error)))
((= new-bytes 0) - nil);;(throw 'end-of-file nil)) + nil);;(throw 'end-of-file nil)
(t ;;Update buffer position - (inc-position foreign-read-buffer new-bytes)))))) + (inc-position foreign-read-buffer new-bytes) + (setf (read-ready state-machine) nil))))))
(defun close-async-fd (async-fd) "Close ASYNC-FD's fd after everything has been written from write-queue."
Modified: branches/home/psmith/restructure/src/io/async-socket.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/async-socket.lisp (original) +++ branches/home/psmith/restructure/src/io/async-socket.lisp Wed Jan 17 23:01:11 2007 @@ -178,7 +178,7 @@
-(defun socket-accept (socket-fd connection-factory) +(defun socket-accept (socket-fd connection-type) "Accept connection from SOCKET-FD. Allocates and returns socket structure denoting the connection."
(flet ((parse-inet6-addr (addr) @@ -202,7 +202,7 @@ ;; accept connection (let* ((res (%accept socket-fd addr len)) ;; (async-socket-fd (make-instance 'async-socket-fd :read-fd res :write-fd res))) - (async-socket-fd (create-state-machine connection-factory res res (make-instance 'async-socket-fd)))) + (async-socket-fd (create-state-machine connection-type res res (make-instance 'async-socket-fd))))
(unless (< res 0) (let ((len-value (mem-ref len :unsigned-int)))
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nio-server.lisp (original) +++ branches/home/psmith/restructure/src/io/nio-server.lisp Wed Jan 17 23:01:11 2007 @@ -55,7 +55,7 @@
-(defun start-server (connection-handler accept-filter connection-factory +(defun start-server (connection-handler accept-filter connection-type &key (protocol :inet) (port (+ (random 60000) 1024)) @@ -99,7 +99,7 @@ (cond ;; new connection ((= fd sock) - (let ((async-fd (socket-accept fd connection-factory))) + (let ((async-fd (socket-accept fd connection-type))) #+nio-debug (format t "start-server - New conn: ~A~%" async-fd) (cond ((null async-fd)
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp Wed Jan 17 23:01:11 2007 @@ -32,7 +32,7 @@ yarpc-state-machine-factory get-packet-factory
;; yarpc-state-machine - yarpc-state-machine + yarpc-state-machine job-queue ;to be moved test-rpc test-rpc-list test-rpc-string execute-call
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Wed Jan 17 23:01:11 2007 @@ -38,7 +38,7 @@ :documentation "The queue used to hand off work from an external thread to the io thread") (result-queue :initform (nio-compat:concurrent-queue) :accessor result-queue - :documentation "The queue used to hand off work from an external thread to the io thread"))) + :documentation "The queue used to return results from the io thread to an external thread")))
(defun yarpc-client-state-machine () (make-instance 'yarpc-client-state-machine)) @@ -55,26 +55,26 @@ (defconstant STATE-INITIALISED 0) (defconstant STATE-SENT-REQUEST 1)
-(defparameter state STATE-INITIALISED) - (defmethod process-outgoing-packet((sm yarpc-client-state-machine)) (format t "process-outgoing-packet called, polling the job-queue ~%") (let ((packet (nio-compat:take (job-queue sm) :blocking-call nil))) (when packet (format t "process-outgoing-packet got job ~A ~%" packet) - (setf state STATE-SENT-REQUEST)) + (setf (state sm) STATE-SENT-REQUEST)) packet))
(defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet)) - (assert (eql state STATE-SENT-REQUEST)) + (assert (eql (state sm) STATE-SENT-REQUEST)) (format t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response) - (nio-compat:add (result-queue sm) response) - (setf state STATE-INITIALISED)) + (let* ((*package* (find-package :nio-yarpc)) + (result (read-from-string (response response)))) + (nio-compat:add (result-queue sm) result) + (setf (state sm) STATE-INITIALISED)))
;Called from an external thread i.e. *not* the nio thread ;Blocks calling thread on the remote m/c's response (defmethod remote-execute ((sm yarpc-client-state-machine) call-string) ; (queue-outgoing-packet - (assert (eql state STATE-INITIALISED)) + (assert (eql (state sm) STATE-INITIALISED)) (nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string)) (nio-compat:take (result-queue sm))) \ No newline at end of file
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Wed Jan 17 23:01:11 2007 @@ -33,20 +33,13 @@ ;; A server that processes remote procedure calls and returns results ;; (defclass yarpc-state-machine (state-machine) - ((job-queue :initarg :job-queue - :initform (error "Must supply a job queue to write work to.") - :accessor job-queue - :documentation "The queue used to hand off work from the NIO thread to an external thread for execution") + ( (result-queue :initform (nio-compat:concurrent-queue) :accessor result-queue :documentation "The queue used to return results from an external thread to the nio thread")))
-(defun yarpc-state-machine (read-fd write-fd socket job-queue) - (let ((sm (make-instance 'yarpc-state-machine :read-fd read-fd :write-fd write-fd :socket socket :job-queue job-queue))) - (nio-buffer:clear (foreign-read-buffer sm)) - (nio-buffer:clear (foreign-write-buffer sm)) - (format t "yarpc-state-machine - Created ~S~%" sm) - sm)) +(defparameter job-queue (nio-compat:concurrent-queue) + "The queue used to hand off work from the NIO thread to an external thread for execution")
(defparameter yarpc-pf (yarpc-packet-factory))
@@ -59,27 +52,18 @@ (defconstant STATE-INITIALISED 0) (defconstant STATE-SEND-RESPONSE 1)
-(defparameter state STATE-INITIALISED) - - (defmethod process-outgoing-packet((sm yarpc-state-machine)) (format t "yarpc-state-machine: process-outgoing-packet called, polling the results-queue ~%") - (let ((packet (nio-compat:take (result-queue sm) :blocking-call nil))) - (format t "yarpc-state-machine: process-outgoing-packet got result ~A ~%" packet) - packet)) - + (let ((result (nio-compat:take (result-queue sm) :blocking-call nil))) + (format t "yarpc-state-machine: process-outgoing-packet got result ~A ~%" result) + (when result + (method-response-packet result))))
-;Process a call method packet, returns +;Process a call method packet by placing it in the job-queue (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet)) - (assert (eql state STATE-INITIALISED)) + (assert (eql (state sm) STATE-INITIALISED)) (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call) - (nio-compat:add (job-queue sm) (cons (call-string call) (result-queue sm)))) - - -;Called from an external thread i.e. *not* the nio thread -;Blocks waiting for a job (call-string,result-queue) to process and return the result into the result queue -;(defmethod get-job ((sm yarpc-state-machine)) -; (values (nio-compat:take (job-queue sm)) (result-queue sm))) + (nio-compat:add job-queue (list (call-string call) (result-queue sm))))
Modified: branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp (original) +++ branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp Wed Jan 17 23:01:11 2007 @@ -29,5 +29,5 @@ (:export
;; state-machine - state-machine packet-factory get-packet-factory get-packet process-outgoing-packet process-incoming-packet + state-machine packet-factory get-packet-factory get-packet process-outgoing-packet process-incoming-packet state ))
Modified: branches/home/psmith/restructure/src/statemachine/state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/statemachine/state-machine.lisp (original) +++ branches/home/psmith/restructure/src/statemachine/state-machine.lisp Wed Jan 17 23:01:11 2007 @@ -37,7 +37,9 @@ ;This way only the protocols packet heirarchy knows about binary representations and ; the SM can deal with protocol logic and state maintenance ; -(defclass state-machine (async-fd)()) +(defclass state-machine (async-fd) + ((state :initform 0 + :accessor state)))
(defmethod print-object ((sm state-machine) stream) (format stream "#<STATE-MACHINE ~A >" (call-next-method sm nil)))