Author: psmith Date: Wed Jan 17 00:06:13 2007 New Revision: 40
Added: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp - copied, changed from r37, branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Modified: branches/home/psmith/restructure/run-yarpc-client.lisp branches/home/psmith/restructure/run-yarpc.lisp branches/home/psmith/restructure/src/compat/concurrent-queue.lisp branches/home/psmith/restructure/src/io/async-fd.lisp branches/home/psmith/restructure/src/io/async-socket.lisp branches/home/psmith/restructure/src/io/nio-server.lisp branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Log: yarpc work, saving...
Modified: branches/home/psmith/restructure/run-yarpc-client.lisp ============================================================================== --- branches/home/psmith/restructure/run-yarpc-client.lisp (original) +++ branches/home/psmith/restructure/run-yarpc-client.lisp Wed Jan 17 00:06:13 2007 @@ -2,10 +2,9 @@ (require :asdf) (require :nio-yarpc)
-(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1" :port 9897)) :name "nio-server") +;;shouldn't be listenting on the client hence nil for accept SM to start-server +(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity nil :host "127.0.0.1" :port 9897)) :name "nio-server") (sleep 4) -(let ((sm (nio:add-connection "127.0.0.1" 16323 'nio-yarpc:yarpc-state-machine))) +(let ((sm (nio:add-connection "127.0.0.1" 16323 'nio-yarpc:yarpc-client-state-machine))) (format t "toplevel adding conn ~A~%" sm) (format t "Result of remote-execute ~A~%" (nio-yarpc:remote-execute sm "(nio-yarpc:test-rpc-list)"))) - -
Modified: branches/home/psmith/restructure/run-yarpc.lisp ============================================================================== --- branches/home/psmith/restructure/run-yarpc.lisp (original) +++ branches/home/psmith/restructure/run-yarpc.lisp Wed Jan 17 00:06:13 2007 @@ -2,4 +2,11 @@ (require :asdf) (require :nio-yarpc)
-(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1") +(let ((jobq (nio-compat:concurrent-queue))) + (sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity #'(lambda()(nio-yarpc:yarpc-state-machine jobq)) :host "127.0.0.1")) :name "nio-server") + (format t "server toplevel waiting for job~%" ) + (loop + ;;block waiting for jobs + (multiple-value-bind (job result-queue) (nio-compat:take jobq) + (format t "Server received job ~A~%" job) + (nio-compat:add result-queue (nio-yarpc:execute-call job)))))
Modified: branches/home/psmith/restructure/src/compat/concurrent-queue.lisp ============================================================================== --- branches/home/psmith/restructure/src/compat/concurrent-queue.lisp (original) +++ branches/home/psmith/restructure/src/compat/concurrent-queue.lisp Wed Jan 17 00:06:13 2007 @@ -51,18 +51,18 @@ head) nil))
- -(defmethod take ((queue concurrent-queue)) +;Do an (optionally blocking) remove of the element at the head of this queue +(defmethod take ((queue concurrent-queue) &key (blocking-call t)) (sb-thread:with-mutex ((buffer-lock queue)) ;if its there, pop it (let ((ret (pop-elt (buffer queue) "1sttry"))) - (if ret + (if (or ret (not blocking-call)) ret (progn (sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue)) (pop-elt (buffer queue) "2ndtry"))))))
- +;Append the element to the tail of this queue (defmethod add ((queue concurrent-queue) elt) (sb-thread:with-mutex ((buffer-lock queue)) (setf (buffer queue) (append (buffer queue) (list elt)) )
Modified: branches/home/psmith/restructure/src/io/async-fd.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/async-fd.lisp (original) +++ branches/home/psmith/restructure/src/io/async-fd.lisp Wed Jan 17 00:06:13 2007 @@ -84,10 +84,6 @@ ;;Implement this in concrete SM for read (defgeneric process-write (async-fd))
- - -;Loop over state machines calling process-outgoing-packets via state-machine::process-write - ;;SM factory (defun create-state-machine(sm-type read-fd write-fd socket) (let ((sm (make-instance sm-type :read-fd read-fd :write-fd write-fd :socket socket)))
Modified: branches/home/psmith/restructure/src/io/async-socket.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/async-socket.lisp (original) +++ branches/home/psmith/restructure/src/io/async-socket.lisp Wed Jan 17 00:06:13 2007 @@ -178,7 +178,7 @@
-(defun socket-accept (socket-fd connection-type) +(defun socket-accept (socket-fd connection-factory) "Accept connection from SOCKET-FD. Allocates and returns socket structure denoting the connection."
(flet ((parse-inet6-addr (addr) @@ -202,7 +202,7 @@ ;; accept connection (let* ((res (%accept socket-fd addr len)) ;; (async-socket-fd (make-instance 'async-socket-fd :read-fd res :write-fd res))) - (async-socket-fd (create-state-machine connection-type res res (make-instance 'async-socket-fd)))) + (async-socket-fd (create-state-machine connection-factory res res (make-instance 'async-socket-fd))))
(unless (< res 0) (let ((len-value (mem-ref len :unsigned-int)))
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nio-server.lisp (original) +++ branches/home/psmith/restructure/src/io/nio-server.lisp Wed Jan 17 00:06:13 2007 @@ -55,7 +55,7 @@
-(defun start-server (connection-handler accept-filter connection-type +(defun start-server (connection-handler accept-filter connection-factory &key (protocol :inet) (port (+ (random 60000) 1024)) @@ -99,7 +99,7 @@ (cond ;; new connection ((= fd sock) - (let ((async-fd (socket-accept fd connection-type))) + (let ((async-fd (socket-accept fd connection-factory))) #+nio-debug (format t "start-server - New conn: ~A~%" async-fd) (cond ((null async-fd)
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp Wed Jan 17 00:06:13 2007 @@ -28,6 +28,14 @@ (:export
+ ;;base + yarpc-state-machine-factory get-packet-factory + ;; yarpc-state-machine - yarpc-state-machine yarpc-state-machine-factory test-rpc test-rpc-list test-rpc-string get-packet-factory remote-execute + yarpc-state-machine + ;to be moved + test-rpc test-rpc-list test-rpc-string execute-call + + ;;yarpc-client-state-machine + yarpc-client-state-machine remote-execute ))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd Wed Jan 17 00:06:13 2007 @@ -7,6 +7,7 @@ :components ((:file "nio-yarpc-package") (:file "yarpc-packet-factory" :depends-on ("nio-yarpc-package")) (:file "yarpc-state-machine" :depends-on ("yarpc-packet-factory")) + (:file "yarpc-client-state-machine" :depends-on ("yarpc-packet-factory")) )
- :depends-on (:nio :nio-sm)) \ No newline at end of file + :depends-on (:nio :nio-sm :nio-compat)) \ No newline at end of file
Copied: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (from r37, branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp) ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Wed Jan 17 00:06:13 2007 @@ -28,108 +28,53 @@
(declaim (optimize (debug 3) (speed 3) (space 0)))
-;; YetAnotherRPC state machine +;; YetAnotherRPC Client state machine ;; -;; A server that processes remote procedure calls and returns results +;; A client that accepts jobs to be run via a threadsafe queue and then submits them to the remote end for execution ;; -;; Test with: -;; > telnet 127.0.0.1 16323 -;; Trying 127.0.0.1... -;; Connected to 127.0.0.1. -;; Escape character is '^]'. -;; (test-rpc "who" 2 's) -;; response - who 2 'S -;; -(defclass yarpc-state-machine (state-machine) - ((outgoing-packet :initarg :outgoing-packet - :accessor outgoing-packet - :initform nil))) +(defclass yarpc-client-state-machine (state-machine) + ((job-queue :initform (nio-compat:concurrent-queue) + :accessor job-queue + :documentation "The queue used to hand off work from an external thread to the io thread") + (result-queue :initform (nio-compat:concurrent-queue) + :accessor result-queue + :documentation "The queue used to hand off work from an external thread to the io thread")))
-(defun yarpc-state-machine () - (make-instance 'yarpc-state-machine)) +(defun yarpc-client-state-machine () + (make-instance 'yarpc-client-state-machine))
(defparameter yarpc-pf (yarpc-packet-factory))
-(defmethod get-packet-factory((sm yarpc-state-machine)) +(defmethod get-packet-factory((sm yarpc-client-state-machine)) yarpc-pf)
-;;TODO move somewhere suitable - -(defparameter *remote-fns* nil) - -(defun register-remote-fn(name) - (push name *remote-fns*)) - -(defmacro defremote (name args &rest body) - `(progn - (defun ,name (,@args) ,@body) - (register-remote-fn #',name))) - -(defremote test-rpc-list() - (list 3 "as" 's (code-char #x2211))) - -(defremote test-rpc-string(a b c) - (format nil "response - ~A ~A ~A ~A~%" a b c (code-char #x2211))) - -;;end move TODO - - -;;;Utils - -(defun print-hashtable (table &optional (stream t)) - (maphash #'(lambda (k v) (format stream "~a -> ~a~%" k v)) table)) -;;; -
-(defmethod print-object ((sm yarpc-state-machine) stream) - (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil))) +(defmethod print-object ((sm yarpc-client-state-machine) stream) + (format stream "#<YARPC-CLIENT-STATE-MACHINE ~A >" (call-next-method sm nil)))
(defconstant STATE-INITIALISED 0) -(defconstant STATE-SEND-RESPONSE 1) +(defconstant STATE-SENT-REQUEST 1)
(defparameter state STATE-INITIALISED)
-(define-condition authorization-error (error) ()) - - -(defmethod process-outgoing-packet((sm yarpc-state-machine)) - (format t "process-outgoing-packet called~%") - (let ((packet (outgoing-packet sm))) - (setf (outgoing-packet sm) nil) - packet)) - -;TODO queue and thread stuf -(defmethod queue-outgoing-packet((sm yarpc-state-machine) packet) - (setf (outgoing-packet sm) packet)) - -;Process a call method packet, returns -(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet)) - (assert (eql state STATE-INITIALISED)) - (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call) - (handler-case - (let ((result (execute-call (call-string call)))) - (when result - (let ((response-packet (progn - (setf state STATE-SEND-RESPONSE) - (queue-outgoing-packet sm (method-response-packet result))))) - t))) - (reader-error (re) (format t "No such function ~A~%" (call-string call))) - (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call))))) - -(defmethod process-incoming-packet ((sm yarpc-state-machine) (response method-response-packet)) - (assert (eql state STATE-INITIALISED)) - (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response)) +(defmethod process-outgoing-packet((sm yarpc-client-state-machine)) + (format t "process-outgoing-packet called, polling the job-queue ~%") + (let ((packet (nio-compat:take (job-queue sm) :blocking-call nil))) + (when packet + (format t "process-outgoing-packet got job ~A ~%" packet) + (setf state STATE-SENT-REQUEST)) + packet)) + +(defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet)) + (assert (eql state STATE-SENT-REQUEST)) + (format t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response) + (nio-compat:add (result-queue sm) response) + (setf state STATE-INITIALISED))
- -(defun execute-call (call-string) - (let* ((rpc-call-list (read-from-string call-string )) - (fn (member (symbol-function (first rpc-call-list)) *remote-fns* ))) - (format t "fn - ~A authorised? : ~A~%" (symbol-function (first rpc-call-list)) fn) - (if fn - (apply (first rpc-call-list) (rest rpc-call-list)) - (error 'authorization-error)))) - - -(defmethod remote-execute ((sm yarpc-state-machine) call-string) - (queue-outgoing-packet sm (make-instance 'call-method-packet :call-string call-string))) - \ No newline at end of file +;Called from an external thread i.e. *not* the nio thread +;Blocks calling thread on the remote m/c's response +(defmethod remote-execute ((sm yarpc-client-state-machine) call-string) +; (queue-outgoing-packet + (assert (eql state STATE-INITIALISED)) + (nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string)) + (nio-compat:take (result-queue sm))) \ No newline at end of file
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Wed Jan 17 00:06:13 2007 @@ -32,27 +32,57 @@ ;; ;; A server that processes remote procedure calls and returns results ;; -;; Test with: -;; > telnet 127.0.0.1 16323 -;; Trying 127.0.0.1... -;; Connected to 127.0.0.1. -;; Escape character is '^]'. -;; (test-rpc "who" 2 's) -;; response - who 2 'S -;; (defclass yarpc-state-machine (state-machine) - ((outgoing-packet :initarg :outgoing-packet - :accessor outgoing-packet - :initform nil))) - -(defun yarpc-state-machine () - (make-instance 'yarpc-state-machine)) + ((job-queue :initarg :job-queue + :initform (error "Must supply a job queue to write work to.") + :accessor job-queue + :documentation "The queue used to hand off work from the NIO thread to an external thread for execution") + (result-queue :initform (nio-compat:concurrent-queue) + :accessor result-queue + :documentation "The queue used to return results from an external thread to the nio thread"))) + +(defun yarpc-state-machine (read-fd write-fd socket job-queue) + (let ((sm (make-instance 'yarpc-state-machine :read-fd read-fd :write-fd write-fd :socket socket :job-queue job-queue))) + (nio-buffer:clear (foreign-read-buffer sm)) + (nio-buffer:clear (foreign-write-buffer sm)) + (format t "yarpc-state-machine - Created ~S~%" sm) + sm))
(defparameter yarpc-pf (yarpc-packet-factory))
(defmethod get-packet-factory((sm yarpc-state-machine)) yarpc-pf)
+(defmethod print-object ((sm yarpc-state-machine) stream) + (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil))) + +(defconstant STATE-INITIALISED 0) +(defconstant STATE-SEND-RESPONSE 1) + +(defparameter state STATE-INITIALISED) + + +(defmethod process-outgoing-packet((sm yarpc-state-machine)) + (format t "yarpc-state-machine: process-outgoing-packet called, polling the results-queue ~%") + (let ((packet (nio-compat:take (result-queue sm) :blocking-call nil))) + (format t "yarpc-state-machine: process-outgoing-packet got result ~A ~%" packet) + packet)) + + +;Process a call method packet, returns +(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet)) + (assert (eql state STATE-INITIALISED)) + (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call) + (nio-compat:add (job-queue sm) (cons (call-string call) (result-queue sm)))) + + +;Called from an external thread i.e. *not* the nio thread +;Blocks waiting for a job (call-string,result-queue) to process and return the result into the result queue +;(defmethod get-job ((sm yarpc-state-machine)) +; (values (nio-compat:take (job-queue sm)) (result-queue sm))) + + + ;;TODO move somewhere suitable
(defparameter *remote-fns* nil) @@ -71,56 +101,8 @@ (defremote test-rpc-string(a b c) (format nil "response - ~A ~A ~A ~A~%" a b c (code-char #x2211)))
-;;end move TODO - - -;;;Utils - -(defun print-hashtable (table &optional (stream t)) - (maphash #'(lambda (k v) (format stream "~a -> ~a~%" k v)) table)) -;;; - - -(defmethod print-object ((sm yarpc-state-machine) stream) - (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil))) - -(defconstant STATE-INITIALISED 0) -(defconstant STATE-SEND-RESPONSE 1) - -(defparameter state STATE-INITIALISED) - (define-condition authorization-error (error) ())
- -(defmethod process-outgoing-packet((sm yarpc-state-machine)) - (format t "process-outgoing-packet called~%") - (let ((packet (outgoing-packet sm))) - (setf (outgoing-packet sm) nil) - packet)) - -;TODO queue and thread stuf -(defmethod queue-outgoing-packet((sm yarpc-state-machine) packet) - (setf (outgoing-packet sm) packet)) - -;Process a call method packet, returns -(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet)) - (assert (eql state STATE-INITIALISED)) - (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call) - (handler-case - (let ((result (execute-call (call-string call)))) - (when result - (let ((response-packet (progn - (setf state STATE-SEND-RESPONSE) - (queue-outgoing-packet sm (method-response-packet result))))) - t))) - (reader-error (re) (format t "No such function ~A~%" (call-string call))) - (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call))))) - -(defmethod process-incoming-packet ((sm yarpc-state-machine) (response method-response-packet)) - (assert (eql state STATE-INITIALISED)) - (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response)) - - (defun execute-call (call-string) (let* ((rpc-call-list (read-from-string call-string )) (fn (member (symbol-function (first rpc-call-list)) *remote-fns* ))) @@ -129,7 +111,18 @@ (apply (first rpc-call-list) (rest rpc-call-list)) (error 'authorization-error))))
+;;end move TODO + + + +
-(defmethod remote-execute ((sm yarpc-state-machine) call-string) - (queue-outgoing-packet sm (make-instance 'call-method-packet :call-string call-string))) - \ No newline at end of file +; (handler-case +; (let ((result (execute-call (call-string call)))) +; (when result +; (let ((response-packet (progn +; (setf state STATE-SEND-RESPONSE) +; (queue-outgoing-packet sm (method-response-packet result))))) +; t))) +; (reader-error (re) (format t "No such function ~A~%" (call-string call))) +; (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call)))) \ No newline at end of file