Author: psmith Date: Sat Feb 10 15:36:43 2007 New Revision: 81
Modified: branches/home/psmith/restructure/src/io/nio-package.lisp branches/home/psmith/restructure/src/io/nio-server.lisp branches/home/psmith/restructure/src/io/nio.asd branches/home/psmith/restructure/src/io/nodes.lisp branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Log: First stab at rpc multiplexing
Modified: branches/home/psmith/restructure/src/io/nio-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nio-package.lisp (original) +++ branches/home/psmith/restructure/src/io/nio-package.lisp Sat Feb 10 15:36:43 2007 @@ -42,4 +42,7 @@
;;ip-authorisation check-ip load-ips + + ;;nodes + node with-connected-nodes active-conn ))
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nio-server.lisp (original) +++ branches/home/psmith/restructure/src/io/nio-server.lisp Sat Feb 10 15:36:43 2007 @@ -35,7 +35,7 @@
;TODO thread safety (defparameter +connected-sockets-queue+ (nio-compat:concurrent-queue) - "List of sockets that have been connected and are awaiting addition to the event-notification system") + "List of node objects that are to be connected to")
;loop over hashtable (defun process-async-fds (client-hash) @@ -150,12 +150,18 @@ (when (write-event-p event) (setf (write-ready async-fd) t)))))))))
;add outgoing sockets to event queue -#+nio-debug2 (format-log t "nio-server:start-server - Processing client add ~A~%" +connected-sockets-queue+) - - (loop for new-fd = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null new-fd) do +#+nio-debug2 (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+) + (loop for node = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null node) do +#+nio-debug (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node) + (push node *nodes-list*)) + (with-connect-ready-nodes (a-node) +#+nio-debug (format-log t "nio-server:start-server - attempting connection to node ~A~%" a-node) + (let ((new-fd (connect (host a-node) (port a-node) connection-type))) + (update-last-connect-attempt a-node) + (when new-fd #+nio-debug (format-log t "nio-server:start-server - adding connection to nio thread ~A~%" new-fd) - (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd) - (add-async-fd event-queue new-fd :read-write)) + (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd) + (add-async-fd event-queue new-fd :read-write)))) ;loop over async-fd's processing where necessary (process-async-fds client-hash) @@ -164,10 +170,10 @@ (close-fd sock))))
-(defun add-connection (host port connection-type - &key - (protocol :inet)) - (format-log t "nio-server:add-connection - Called with: ~A:~A:~A ~%" protocol host port) +(defun connect(host port connection-type + &key + (protocol :inet)) + (format-log t "nio-server:connect - Called with: ~A:~A:~A ~%" protocol host port) (let ((sock nil)) (setq sock (ecase protocol (:inet (make-inet-socket)) @@ -175,8 +181,10 @@
(if (connect-inet-socket sock host port) (let ((sm (create-state-machine connection-type sock sock sock))) - (nio-compat:add +connected-sockets-queue+ sm) - (format-log t "nio-server:add-connection - Socket enqueued: ~A~%" +connected-sockets-queue+) - (return-from add-connection sm)) +; (nio-compat:add +connected-sockets-queue+ sm) +; (format-log t "nio-server:connect - Socket enqueued: ~A~%" +connected-sockets-queue+) + (return-from connect sm)) (format t "Connect failed!!~A ~%" (get-errno))))) - \ No newline at end of file + +(defun add-connection(node) + (nio-compat:add +connected-sockets-queue+ node)) \ No newline at end of file
Modified: branches/home/psmith/restructure/src/io/nio.asd ============================================================================== --- branches/home/psmith/restructure/src/io/nio.asd (original) +++ branches/home/psmith/restructure/src/io/nio.asd Sat Feb 10 15:36:43 2007 @@ -9,9 +9,9 @@ (:file "packet" :depends-on ("nio-package")) (:file "async-fd" :depends-on ("fd-helper")) (:file "async-socket" :depends-on ("async-fd")) - (:file "nio-server" :depends-on ("async-socket")) + (:file "nodes" :depends-on ("nio-package")) + (:file "nio-server" :depends-on ("async-socket" "nodes")) (:file "ip-authorisation" :depends-on ("nio-package")) - (:file "nodes" :depends-on ("nio-package")) )
:depends-on (:cffi :event-notification :nio-buffer :nio-compat :nio-utils))
Modified: branches/home/psmith/restructure/src/io/nodes.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nodes.lisp (original) +++ branches/home/psmith/restructure/src/io/nodes.lisp Sat Feb 10 15:36:43 2007 @@ -75,6 +75,21 @@ (get-universal-high-res) (+ (last-connect-attempt node) (retry-delay node))))
+(defun allowed-to-connect(node) + (if (null (last-connect-attempt node)) + t + (and (not (active-conn node)) (< (+ (last-connect-attempt node) (retry-delay node)) (get-universal-high-res))))) + (defun update-last-connect-attempt(node) (setf (last-connect-attempt node) (get-universal-high-res)))
+;;iterates over the nodes list looking for nodes that are ready to be connected to +;;i.e. the SM is null and the next-allowed-connect time has expired +(defmacro with-connect-ready-nodes ((node) &rest body) + `(dolist (,node *nodes-list*) + (when (allowed-to-connect ,node) ,@body))) + + +(defmacro with-connected-nodes ((node) &rest body) + `(dolist (,node *nodes-list*) + (when (active-conn ,node) ,@body)))
Modified: branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp ============================================================================== --- branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp (original) +++ branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp Sat Feb 10 15:36:43 2007 @@ -41,16 +41,20 @@ (sleep ,delay))))))
+(defun callback(result) + (nio-utils:format-log t "Result of remote-log ~A~%" result)) + + ;;Tail the given log and write to remote logger ;;e.g. (tail-log "/var/log/httpd/access_log" "192.168.1.1") (defun tail-log(filename ip-address) (sleep 4) - (let ((sm (nio:add-connection ip-address 16323 'nio-yarpc:yarpc-client-state-machine))) - (nio-utils:format-log t "toplevel adding conn ~A to ~A~%" sm ip-address) - (with-line-from-tailed-file (text filename 1) - (let ((rpc (format nil "(nio-logger:remote-log "~A")" (cl-base64:string-to-base64-string text)))) - (nio-utils:format-log t "Toplevel Submitting job~A~%" rpc) - (nio-utils:format-log t "Result of remote-log ~A~%" (nio-yarpc:remote-execute sm rpc)))))) + (nio:add-connection (nio:node ip-address 16323)) + (with-line-from-tailed-file (text filename 1) + (let ((rpc (format nil "(nio-logger:remote-log "~A")" (cl-base64:string-to-base64-string text)))) + (nio-utils:format-log t "Toplevel Submitting job~A~%" rpc) + (nio:with-connected-nodes (node) + (nio-yarpc:remote-execute (nio:active-conn node) rpc #'callback)))))
;Runs a multithreaded system with an IO thread dealing with IO only and a 'job' thread taking and executing jobs
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Sat Feb 10 15:36:43 2007 @@ -36,9 +36,23 @@ ((job-queue :initform (nio-compat:concurrent-queue) :accessor job-queue :documentation "The queue used to hand off work from an external thread to the io thread") - (result-queue :initform (nio-compat:concurrent-queue) - :accessor result-queue - :documentation "The queue used to return results from the io thread to an external thread"))) + (request-map :initform (make-hash-table) + :reader request-map + :documentation "A map from request-id (a unique id for this request) to remote-job"))) + +(defclass remote-job() + ((callback :accessor callback + :documentation "A function accepting one argument to call with the result of the remote operation") + (start-time :initform (get-universal-high-res) + :reader start-time + :documentation "The (floating point) start time") + (timeout :initarg :timeout + :initform 1.5 + :documentation "The time in seconds before a timeout should occur, abviously we dont guarantee that this will be honored, it depends on other processing but should be close."))) + +(defun remote-job(callback) + (make-instance 'remote-job :callback callback)) +
(defun yarpc-client-state-machine () (make-instance 'yarpc-client-state-machine)) @@ -55,25 +69,23 @@ (defconstant STATE-INITIALISED 0) (defconstant STATE-SENT-REQUEST 1)
+(defparameter +request-id+ 0) + (defmethod process-outgoing-packet((sm yarpc-client-state-machine)) #+nio-debug2 (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%") - (let ((packet (nio-compat:take (job-queue sm) :blocking-call nil))) - (when packet - (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" packet) - (setf (state sm) STATE-SENT-REQUEST)) - packet)) + (let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil))) + (when ttd + (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd) + (destructuring-bind (job call-string) ttd + (setf (gethash (1+ +request-id+) (request-map sm)) job) + (make-instance 'call-method-packet :call-string call-string :request-id +request-id+)))))
(defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet)) - (assert (eql (state sm) STATE-SENT-REQUEST)) (format-log t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response) (let* ((*package* (find-package :nio-yarpc)) (result (read-from-string (response response)))) - (setf (state sm) STATE-INITIALISED) (nio-compat:add (result-queue sm) result))) - -;Called from an external thread i.e. *not* the nio thread -;Blocks calling thread on the remote m/c's response -(defmethod remote-execute ((sm yarpc-client-state-machine) call-string) - (assert (eql (state sm) STATE-INITIALISED)) - (nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string)) - (nio-compat:take (result-queue sm))) \ No newline at end of file + +;Execute the call-string on the remote node and call callback with the result +(defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback) + (nio-compat:add (job-queue sm) '((remote-job callback) call-string)))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp Sat Feb 10 15:36:43 2007 @@ -40,19 +40,21 @@
(defconstant +PACKET-ID-SIZE+ 1) (defconstant +PACKET-LENGTH-SIZE+ 4) +;(defconstant +PACKET-REQUEST-ID+ 4)
(defconstant +yarpc-packet-header-size+ (+ +PACKET-ID-SIZE+ +PACKET-LENGTH-SIZE+))
(defmethod get-packet ((pf yarpc-packet-factory) buf) (flip buf) - (if (>= (remaining buf) +yarpc-packet-header-size+) ;; First byte denotes packet ID ;;bytes 2,3,4,5 denote packet size + (if (>= (remaining buf) +yarpc-packet-header-size+) ;; First byte denotes packet ID ;;bytes 2,3,4,5 denote packet size ;; 6,7,8,9 request-id (let ((packet-id (bytebuffer-read-8 buf)) (packet-length (bytebuffer-read-32 buf))) (if (<= (- packet-length +yarpc-packet-header-size+) (remaining buf)) ;is the whole packet available in the buffer? - (let ((ret-packet (ecase packet-id - (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+))))) - (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)))))))) + (let* ((packet-request-id (bytebuffer-read-32 buf)) + (ret-packet (ecase packet-id + (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id))) + (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id)))))) (compact buf) #+nio-debug (format-log t "yarpc-packet-factory:get-packet - after compact ~%~A~%" buf) #+nio-debug (format-log t "yarpc-packet-factory:get-packet - retuirning packet ~A~%" ret-packet) @@ -64,7 +66,11 @@
-(defclass call-method-packet (packet)((call-string :initarg :call-string +(defclass yarpc-packet(packet) + ((request-id :initarg :request-id + :reader request-id))) + +(defclass call-method-packet (yarpc-packet)((call-string :initarg :call-string :accessor call-string))) (defun call-method-packet (call-string) (make-instance 'call-method-packet :call-string call-string)) @@ -79,6 +85,7 @@ (progn (nio-buffer:bytebuffer-write-8 buf +CALL-METHOD-PACKET-ID+) (nio-buffer:bytebuffer-write-32 buf 0) ; come back and write length later + (nio-buffer:bytebuffer-write-32 buf (request-id packet)) (nio-buffer:bytebuffer-write-string buf (call-string packet)) (nio-buffer:bytebuffer-insert-32 buf (buffer-position buf) 1) #+nio-debug (format-log t "yarpc-packet-factory:write-bytes(call-method-packet) - written ~%~A ~%" buf) @@ -92,7 +99,7 @@ (+ +yarpc-packet-header-size+ (length (sb-ext:string-to-octets (write-to-string (call-string packet))))))
-(defclass method-response-packet (packet) +(defclass method-response-packet (yarpc-packet) ((response :initarg :response :accessor response)))
@@ -109,6 +116,7 @@ (progn (nio-buffer:bytebuffer-write-8 buf +METHOD-RESPONSE-PACKET-ID+) (nio-buffer:bytebuffer-write-32 buf 0) ; come back and write length later + (nio-buffer:bytebuffer-write-32 buf (request-id packet)) (nio-buffer:bytebuffer-write-string buf (write-to-string (response packet))) (nio-buffer:bytebuffer-insert-32 buf (buffer-position buf) 1) #+nio-debug (format-log t "yarpc-packet-factory:write-bytes - written ~A~%" buf)
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Sat Feb 10 15:36:43 2007 @@ -58,26 +58,25 @@
-(defun run-job(&key (wait-on-job-pdw t)) +(defun run-job(&key (blocking t)) (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%") - (destructuring-bind (job result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call wait-on-job-pdw) + (destructuring-bind (job request-id result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call blocking) (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job) - (nio-compat:add result-queue (nio-yarpc:execute-call job)))) + (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))
(defmethod process-outgoing-packet((sm yarpc-state-machine)) (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" ) - (let ((result (nio-compat:take (result-queue sm) :blocking-call nil))) - (format-log t "yarpc-state-machine:process-outgoing-packet - got result ~A ~%" result) + (destructuring-bind (request-id result) (nio-compat:take (result-queue sm) :blocking-call nil) + (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result) (when result - (method-response-packet result)))) + (method-response-packet result :request-id request-id))))
;Process a call method packet by placing it in the job-queue (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet)) - (assert (eql (state sm) STATE-INITIALISED)) (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call) - (nio-compat:add job-queue (list (call-string call) (result-queue sm))) - (when +process-jobs-inline+ (run-job :wait-on-job-pdw nil))) + (nio-compat:add job-queue (list (call-string call) (request-id call) (result-queue sm))) + (when +process-jobs-inline+ (run-job :blocking nil)))