Author: psmith
Date: Tue Apr 17 00:26:29 2007
New Revision: 109
Modified:
branches/home/psmith/restructure/src/io/async-fd.lisp
branches/home/psmith/restructure/src/io/nio-package.lisp
branches/home/psmith/restructure/src/io/nio-server.lisp
branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp
branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
Log:
Remove +process-jobs-inline+ as can't work like this. Added timeout mechanism
Modified: branches/home/psmith/restructure/src/io/async-fd.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/async-fd.lisp (original)
+++ branches/home/psmith/restructure/src/io/async-fd.lisp Tue Apr 17 00:26:29 2007
@@ -62,6 +62,9 @@
;;Implement this in concrete SM for read
(defgeneric process-write (async-fd))
+;;Implement this in concrete SM for timeout processing
+(defgeneric process-timeout (async-fd))
+
;;SM factory
(defun create-state-machine(sm-type read-fd write-fd socket)
(let ((sm (make-instance sm-type :read-fd read-fd :write-fd write-fd :socket socket)))
Modified: branches/home/psmith/restructure/src/io/nio-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-package.lisp (original)
+++ branches/home/psmith/restructure/src/io/nio-package.lisp Tue Apr 17 00:26:29 2007
@@ -30,7 +30,7 @@
;; async-fd.lisp
async-fd process-read process-write foreign-read-buffer foreign-write-buffer close-sm
- recommend-buffer-size close-pending
+ recommend-buffer-size close-pending process-timeout
;; async-socket.lisp
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp
==============================================================================
--- branches/home/psmith/restructure/src/io/nio-server.lisp (original)
+++ branches/home/psmith/restructure/src/io/nio-server.lisp Tue Apr 17 00:26:29 2007
@@ -46,6 +46,7 @@
;process reads
(handler-case
(progn
+ (process-timeout async-fd)
(when (read-ready async-fd) (read-more async-fd))
(when (> (buffer-position (foreign-read-buffer async-fd)) 0)
(process-read async-fd))
@@ -63,7 +64,8 @@
(write-more async-fd)
(push async-fd removals)))
(read-error (re) (push async-fd removals))
- (write-error (we) (push async-fd removals))))
+ (write-error (we) (push async-fd removals))
+ (timeout (to) (push async-fd removals))))
client-hash)
(dolist (async-fd removals)
(format-log t "nio-server:process-async-fds processing remove for ~a~%" async-fd)
Modified: branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp
==============================================================================
--- branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp (original)
+++ branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp Tue Apr 17 00:26:29 2007
@@ -63,7 +63,6 @@
(defparameter +log-file-name+ "./out")
(defun run-logging-server(listen-ip out-file &optional (allowed-ips "ips.txt"))
- (setf nio-yarpc:+process-jobs-inline+ nil)
(setf +log-file-name+ out-file)
(nio:load-ips allowed-ips)
(sb-thread:make-thread #'(lambda()(nio:start-server 'nio-yarpc:yarpc-state-machine :host listen-ip :port 16323 :accept-connection 'nio:check-ip)) :name "nio-server")
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp Tue Apr 17 00:26:29 2007
@@ -32,9 +32,9 @@
yarpc-state-machine-factory get-packet-factory
;; yarpc-state-machine
- yarpc-state-machine job-queue run-job +process-jobs-inline+ +serialise-packet-fn+
+ yarpc-state-machine job-queue run-job +process-jobs-inline+ +serialise-packet-fn+ process-timeout
;to be moved
- test-rpc test-rpc-list test-rpc-string execute-call defremote
+ test-rpc test-rpc-list test-rpc-string execute-call defremote process-timeout
;;yarpc-client-state-machine
yarpc-client-state-machine remote-execute simulate-connection
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp Tue Apr 17 00:26:29 2007
@@ -48,11 +48,12 @@
:reader start-time
:documentation "The (floating point) start time")
(timeout :initarg :timeout
- :initform 1.5
+ :initform 15
+ :reader timeout
:documentation "The time in seconds before a timeout should occur, abviously we dont guarantee that this will be honored, it depends on other processing but should be close.")))
-(defun remote-job(callback)
- (make-instance 'remote-job :callback callback))
+(defun remote-job(callback &key (timeout 15))
+ (make-instance 'remote-job :callback callback :timeout timeout))
(defun yarpc-client-state-machine ()
@@ -67,11 +68,37 @@
(defmethod print-object ((sm yarpc-client-state-machine) stream)
(format stream "#<YARPC-CLIENT-STATE-MACHINE ~A >" (call-next-method sm nil)))
+(defmethod print-object ((job remote-job) stream)
+ (format stream "#<REMOTE-JOB :start-time ~A :timeout ~A>" (start-time job) (timeout job)))
+
+
(defconstant STATE-INITIALISED 0)
(defconstant STATE-SENT-REQUEST 1)
(defparameter +request-id+ 0)
+
+(defun check-timeouts(id job)
+; (format-log t "Checking timeout on ~A~%" job)
+ (when (> (get-universal-high-res) (+ (start-time job) (timeout job)))
+ (format-log t "Timeout detected ~A ~A~%" id job)
+ t))
+
+(defun finish-job (request-id sm result)
+ "Remove the job from the request map and call the callback with the result"
+ (let ((remote-job (gethash request-id (request-map sm))))
+ (when remote-job
+ (remhash request-id (request-map sm))
+ (funcall (callback remote-job) result))))
+
+(defmethod process-timeout((sm yarpc-client-state-machine))
+ (let ((requests (request-map sm)))
+#+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, searching for timeouts in ~A ~%" requests)
+ (maphash #'(lambda (id job)
+ (when (check-timeouts id job) (finish-job id sm nil)))
+ requests)))
+
+
(defmethod process-outgoing-packet((sm yarpc-client-state-machine))
#+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%")
(let ((ttd (nio-utils:take (job-queue sm) :blocking-call nil)))
@@ -88,8 +115,7 @@
(request-id (request-id response)))
#+nio-debug (format-log t "yarpc-client-state-machine:process-incoming-packet :result ~A :request-id ~A~%" result request-id)
; (maphash #'(lambda (k v) (format t "~a -> ~a~%" k v)) (request-map sm))
- (let ((remote-job (gethash request-id (request-map sm))))
- (funcall (callback remote-job) result))))
+ (finish-job request-id sm result)))
(defparameter *simulate-calls* nil)
@@ -107,3 +133,14 @@
(setf (nio:active-conn node) (nio::create-state-machine 'yarpc-client-state-machine 1 1 6))
(push node nio::*nodes-list*)))
+
+
+(defun test-timeout()
+ (let* ((done nil)
+ (job (remote-job #'(lambda(x) (format-log t "~A finished~%" x) (setf done t)) :timeout 30)))
+ (format-log t "Job: ~A~%" job)
+ (loop while (not done) do
+ (check-timeouts 99 job)
+ (format-log t ".~%")
+ (sleep 1))
+ (format-log t "done test~%")))
\ No newline at end of file
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp
==============================================================================
--- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original)
+++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Tue Apr 17 00:26:29 2007
@@ -52,12 +52,6 @@
(defconstant STATE-INITIALISED 0)
(defconstant STATE-SEND-RESPONSE 1)
-
-(defparameter +process-jobs-inline+ t
- "Set this to make the NIO thread process the RPC calls - warning the procedure should not block!")
-
-
-
(defun run-job(&key (blocking t))
#+nio-debug (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%")
(let ((server-job (nio-utils:take nio-yarpc:job-queue :blocking-call blocking)))
@@ -66,6 +60,7 @@
#+nio-debug (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job)
(nio-utils:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
+(defmethod process-timeout((sm yarpc-state-machine)))
(defmethod process-outgoing-packet((sm yarpc-state-machine))
#+nio-debug2 (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" )
@@ -78,8 +73,7 @@
;Process a call method packet by placing it in the job-queue
(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
#+nio-debug (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call)
- (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
- (when +process-jobs-inline+ (run-job :blocking nil)))
+ (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm))))