Author: psmith Date: Sat Feb 10 18:52:32 2007 New Revision: 82
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp branches/home/psmith/restructure/src/nio-logger/run-logging-client.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Log: getting there...
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nio-server.lisp (original) +++ branches/home/psmith/restructure/src/io/nio-server.lisp Sat Feb 10 18:52:32 2007 @@ -68,7 +68,7 @@ (defun start-server (connection-type &key (protocol :inet) - (port (+ (random 60000) 1024)) + (port 0) ;//if set then listen (host "127.0.0.1") (accept-connection #'trivial-accept)) (let (sock @@ -76,7 +76,7 @@ (client-hash (make-hash-table :test 'eql)) )
- (when (not (null connection-type)) + (when (not (eql port 0)) (format t "Binding to ~A:~A~%" host port) (setq sock (ecase protocol (:inet (make-inet-socket)) @@ -161,6 +161,7 @@ (when new-fd #+nio-debug (format-log t "nio-server:start-server - adding connection to nio thread ~A~%" new-fd) (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd) + (setf (active-conn a-node) new-fd) (add-async-fd event-queue new-fd :read-write)))) ;loop over async-fd's processing where necessary
Modified: branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp ============================================================================== --- branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp (original) +++ branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp Sat Feb 10 18:52:32 2007 @@ -54,6 +54,7 @@ (let ((rpc (format nil "(nio-logger:remote-log "~A")" (cl-base64:string-to-base64-string text)))) (nio-utils:format-log t "Toplevel Submitting job~A~%" rpc) (nio:with-connected-nodes (node) + (nio-utils:format-log t "Toplevel sending ~A to ~A~%" rpc node) (nio-yarpc:remote-execute (nio:active-conn node) rpc #'callback)))))
;Runs a multithreaded system with an IO thread dealing with IO only and a 'job' thread taking and executing jobs @@ -64,7 +65,7 @@ (setf nio-yarpc:+process-jobs-inline+ nil) (setf +log-file-name+ out-file) (nio:load-ips allowed-ips) - (sb-thread:make-thread #'(lambda()(nio:start-server 'nio-yarpc:yarpc-state-machine :host listen-ip :accept-connection 'nio:check-ip)) :name "nio-server") + (sb-thread:make-thread #'(lambda()(nio:start-server 'nio-yarpc:yarpc-state-machine :host listen-ip :port 16323 :accept-connection 'nio:check-ip)) :name "nio-server") (loop ;;block waiting for jobs (nio-yarpc:run-job)))
Modified: branches/home/psmith/restructure/src/nio-logger/run-logging-client.lisp ============================================================================== --- branches/home/psmith/restructure/src/nio-logger/run-logging-client.lisp (original) +++ branches/home/psmith/restructure/src/nio-logger/run-logging-client.lisp Sat Feb 10 18:52:32 2007 @@ -36,4 +36,4 @@ (sb-thread:make-thread #'(lambda()(nio-logger:tail-log log-file ip)) :name "nio-server")
;;shouldn't be listenting on the client hence nil for accept SM to start-server - (nio:start-server nil)) + (nio:start-server 'nio-yarpc:yarpc-client-state-machine))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Sat Feb 10 18:52:32 2007 @@ -41,7 +41,8 @@ :documentation "A map from request-id (a unique id for this request) to remote-job")))
(defclass remote-job() - ((callback :accessor callback + ((callback :initarg :callback + :accessor callback :documentation "A function accepting one argument to call with the result of the remote operation") (start-time :initform (get-universal-high-res) :reader start-time @@ -72,7 +73,7 @@ (defparameter +request-id+ 0)
(defmethod process-outgoing-packet((sm yarpc-client-state-machine)) -#+nio-debug2 (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%") +#+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%") (let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil))) (when ttd (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd) @@ -88,4 +89,5 @@
;Execute the call-string on the remote node and call callback with the result (defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback) - (nio-compat:add (job-queue sm) '((remote-job callback) call-string))) + (format-log t "yarpc-client-state-machine:remote-execute called :sm ~A :call-string ~A :callback ~A~%" sm call-string callback) + (nio-compat:add (job-queue sm) (list (remote-job callback) call-string)))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp Sat Feb 10 18:52:32 2007 @@ -40,7 +40,7 @@
(defconstant +PACKET-ID-SIZE+ 1) (defconstant +PACKET-LENGTH-SIZE+ 4) -;(defconstant +PACKET-REQUEST-ID+ 4) +(defconstant +PACKET-REQUEST-ID-SIZE+ 4)
(defconstant +yarpc-packet-header-size+ (+ +PACKET-ID-SIZE+ +PACKET-LENGTH-SIZE+)) @@ -53,8 +53,8 @@ (if (<= (- packet-length +yarpc-packet-header-size+) (remaining buf)) ;is the whole packet available in the buffer? (let* ((packet-request-id (bytebuffer-read-32 buf)) (ret-packet (ecase packet-id - (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id))) - (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id)))))) + (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+ +PACKET-REQUEST-ID-SIZE+)) :request-id packet-request-id))) + (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+ +PACKET-REQUEST-ID-SIZE+)) :request-id packet-request-id)))))) (compact buf) #+nio-debug (format-log t "yarpc-packet-factory:get-packet - after compact ~%~A~%" buf) #+nio-debug (format-log t "yarpc-packet-factory:get-packet - retuirning packet ~A~%" ret-packet) @@ -70,10 +70,12 @@ ((request-id :initarg :request-id :reader request-id)))
-(defclass call-method-packet (yarpc-packet)((call-string :initarg :call-string - :accessor call-string))) -(defun call-method-packet (call-string) - (make-instance 'call-method-packet :call-string call-string)) +(defclass call-method-packet (yarpc-packet) + ((call-string :initarg :call-string + :accessor call-string))) + +(defun call-method-packet (call-string &key request-id) + (make-instance 'call-method-packet :call-string call-string :request-id request-id))
(defmethod print-object ((packet call-method-packet) stream) (format stream "#<CALL-METHOD-PACKET ~A >" (call-string packet))) @@ -103,8 +105,8 @@ ((response :initarg :response :accessor response)))
-(defun method-response-packet (response) - (make-instance 'method-response-packet :response response)) +(defun method-response-packet (response &key request-id) + (make-instance 'method-response-packet :response response :request-id request-id))
(defmethod print-object ((packet method-response-packet) stream) (format stream "#<METHID-RESPONSE-PACKET ~A >" (response packet)))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Sat Feb 10 18:52:32 2007 @@ -60,17 +60,20 @@
(defun run-job(&key (blocking t)) (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%") - (destructuring-bind (job request-id result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call blocking) - (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job) - (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job))))) + (let ((server-job (nio-compat:take nio-yarpc:job-queue :blocking-call blocking))) + (when server-job + (destructuring-bind (job request-id result-queue) server-job + (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job) + (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
(defmethod process-outgoing-packet((sm yarpc-state-machine)) (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" ) - (destructuring-bind (request-id result) (nio-compat:take (result-queue sm) :blocking-call nil) - (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result) - (when result - (method-response-packet result :request-id request-id)))) + (let ((server-job (nio-compat:take (result-queue sm) :blocking-call nil))) + (when server-job + (destructuring-bind (request-id result) server-job + (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result) + (method-response-packet result :request-id request-id)))))
;Process a call method packet by placing it in the job-queue (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))