Author: psmith Date: Tue Apr 17 00:26:29 2007 New Revision: 109
Modified: branches/home/psmith/restructure/src/io/async-fd.lisp branches/home/psmith/restructure/src/io/nio-package.lisp branches/home/psmith/restructure/src/io/nio-server.lisp branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Log: Remove +process-jobs-inline+ as can't work like this. Added timeout mechanism
Modified: branches/home/psmith/restructure/src/io/async-fd.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/async-fd.lisp (original) +++ branches/home/psmith/restructure/src/io/async-fd.lisp Tue Apr 17 00:26:29 2007 @@ -62,6 +62,9 @@ ;;Implement this in concrete SM for read (defgeneric process-write (async-fd))
+;;Implement this in concrete SM for timeout processing +(defgeneric process-timeout (async-fd)) + ;;SM factory (defun create-state-machine(sm-type read-fd write-fd socket) (let ((sm (make-instance sm-type :read-fd read-fd :write-fd write-fd :socket socket)))
Modified: branches/home/psmith/restructure/src/io/nio-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nio-package.lisp (original) +++ branches/home/psmith/restructure/src/io/nio-package.lisp Tue Apr 17 00:26:29 2007 @@ -30,7 +30,7 @@
;; async-fd.lisp async-fd process-read process-write foreign-read-buffer foreign-write-buffer close-sm - recommend-buffer-size close-pending + recommend-buffer-size close-pending process-timeout
;; async-socket.lisp
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nio-server.lisp (original) +++ branches/home/psmith/restructure/src/io/nio-server.lisp Tue Apr 17 00:26:29 2007 @@ -46,6 +46,7 @@ ;process reads (handler-case (progn + (process-timeout async-fd) (when (read-ready async-fd) (read-more async-fd)) (when (> (buffer-position (foreign-read-buffer async-fd)) 0) (process-read async-fd)) @@ -63,7 +64,8 @@ (write-more async-fd) (push async-fd removals))) (read-error (re) (push async-fd removals)) - (write-error (we) (push async-fd removals)))) + (write-error (we) (push async-fd removals)) + (timeout (to) (push async-fd removals)))) client-hash) (dolist (async-fd removals) (format-log t "nio-server:process-async-fds processing remove for ~a~%" async-fd)
Modified: branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp ============================================================================== --- branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp (original) +++ branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp Tue Apr 17 00:26:29 2007 @@ -63,7 +63,6 @@ (defparameter +log-file-name+ "./out")
(defun run-logging-server(listen-ip out-file &optional (allowed-ips "ips.txt")) - (setf nio-yarpc:+process-jobs-inline+ nil) (setf +log-file-name+ out-file) (nio:load-ips allowed-ips) (sb-thread:make-thread #'(lambda()(nio:start-server 'nio-yarpc:yarpc-state-machine :host listen-ip :port 16323 :accept-connection 'nio:check-ip)) :name "nio-server")
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp Tue Apr 17 00:26:29 2007 @@ -32,9 +32,9 @@ yarpc-state-machine-factory get-packet-factory
;; yarpc-state-machine - yarpc-state-machine job-queue run-job +process-jobs-inline+ +serialise-packet-fn+ + yarpc-state-machine job-queue run-job +process-jobs-inline+ +serialise-packet-fn+ process-timeout ;to be moved - test-rpc test-rpc-list test-rpc-string execute-call defremote + test-rpc test-rpc-list test-rpc-string execute-call defremote process-timeout ;;yarpc-client-state-machine yarpc-client-state-machine remote-execute simulate-connection
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Tue Apr 17 00:26:29 2007 @@ -48,11 +48,12 @@ :reader start-time :documentation "The (floating point) start time") (timeout :initarg :timeout - :initform 1.5 + :initform 15 + :reader timeout :documentation "The time in seconds before a timeout should occur, abviously we dont guarantee that this will be honored, it depends on other processing but should be close.")))
-(defun remote-job(callback) - (make-instance 'remote-job :callback callback)) +(defun remote-job(callback &key (timeout 15)) + (make-instance 'remote-job :callback callback :timeout timeout))
(defun yarpc-client-state-machine () @@ -67,11 +68,37 @@ (defmethod print-object ((sm yarpc-client-state-machine) stream) (format stream "#<YARPC-CLIENT-STATE-MACHINE ~A >" (call-next-method sm nil)))
+(defmethod print-object ((job remote-job) stream) + (format stream "#<REMOTE-JOB :start-time ~A :timeout ~A>" (start-time job) (timeout job))) + + (defconstant STATE-INITIALISED 0) (defconstant STATE-SENT-REQUEST 1)
(defparameter +request-id+ 0)
+ +(defun check-timeouts(id job) +; (format-log t "Checking timeout on ~A~%" job) + (when (> (get-universal-high-res) (+ (start-time job) (timeout job))) + (format-log t "Timeout detected ~A ~A~%" id job) + t)) + +(defun finish-job (request-id sm result) + "Remove the job from the request map and call the callback with the result" + (let ((remote-job (gethash request-id (request-map sm)))) + (when remote-job + (remhash request-id (request-map sm)) + (funcall (callback remote-job) result)))) + +(defmethod process-timeout((sm yarpc-client-state-machine)) + (let ((requests (request-map sm))) +#+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, searching for timeouts in ~A ~%" requests) + (maphash #'(lambda (id job) + (when (check-timeouts id job) (finish-job id sm nil))) + requests))) + + (defmethod process-outgoing-packet((sm yarpc-client-state-machine)) #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%") (let ((ttd (nio-utils:take (job-queue sm) :blocking-call nil))) @@ -88,8 +115,7 @@ (request-id (request-id response))) #+nio-debug (format-log t "yarpc-client-state-machine:process-incoming-packet :result ~A :request-id ~A~%" result request-id) ; (maphash #'(lambda (k v) (format t "~a -> ~a~%" k v)) (request-map sm)) - (let ((remote-job (gethash request-id (request-map sm)))) - (funcall (callback remote-job) result)))) + (finish-job request-id sm result)))
(defparameter *simulate-calls* nil)
@@ -107,3 +133,14 @@ (setf (nio:active-conn node) (nio::create-state-machine 'yarpc-client-state-machine 1 1 6)) (push node nio::*nodes-list*)))
+ + +(defun test-timeout() + (let* ((done nil) + (job (remote-job #'(lambda(x) (format-log t "~A finished~%" x) (setf done t)) :timeout 30))) + (format-log t "Job: ~A~%" job) + (loop while (not done) do + (check-timeouts 99 job) + (format-log t ".~%") + (sleep 1)) + (format-log t "done test~%"))) \ No newline at end of file
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Tue Apr 17 00:26:29 2007 @@ -52,12 +52,6 @@ (defconstant STATE-INITIALISED 0) (defconstant STATE-SEND-RESPONSE 1)
- -(defparameter +process-jobs-inline+ t - "Set this to make the NIO thread process the RPC calls - warning the procedure should not block!") - - - (defun run-job(&key (blocking t)) #+nio-debug (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%") (let ((server-job (nio-utils:take nio-yarpc:job-queue :blocking-call blocking))) @@ -66,6 +60,7 @@ #+nio-debug (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job) (nio-utils:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
+(defmethod process-timeout((sm yarpc-state-machine)))
(defmethod process-outgoing-packet((sm yarpc-state-machine)) #+nio-debug2 (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" ) @@ -78,8 +73,7 @@ ;Process a call method packet by placing it in the job-queue (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet)) #+nio-debug (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call) - (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm))) - (when +process-jobs-inline+ (run-job :blocking nil))) + (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm))))