Author: psmith Date: Sun Jan 14 23:00:39 2007 New Revision: 35
Added: branches/home/psmith/restructure/run-yarpc-client.lisp Modified: branches/home/psmith/restructure/src/buffer/buffer.lisp branches/home/psmith/restructure/src/buffer/nio-buffer-package.lisp branches/home/psmith/restructure/src/event/epoll.lisp branches/home/psmith/restructure/src/io/async-fd.lisp branches/home/psmith/restructure/src/io/nio-package.lisp branches/home/psmith/restructure/src/io/nio-server.lisp branches/home/psmith/restructure/src/io/packet.lisp branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp branches/home/psmith/restructure/src/statemachine/state-machine.lisp Log: yarpc - Send packet OK
Added: branches/home/psmith/restructure/run-yarpc-client.lisp ============================================================================== --- (empty file) +++ branches/home/psmith/restructure/run-yarpc-client.lisp Sun Jan 14 23:00:39 2007 @@ -0,0 +1,11 @@ +(push :nio-debug *features*) +(require :asdf) +(require :nio-yarpc) + +(sb-thread:make-thread #'(lambda()(nio:start-server 'identity 'identity 'nio-yarpc:yarpc-state-machine :host "127.0.0.1" :port 9897)) :name "nio-server") +(sleep 4) +(let ((sm (nio:add-connection "127.0.0.1" 16323 'nio-yarpc:yarpc-state-machine))) +(format t "toplevel adding conn ~A~%" sm) +(format t "Result of remote-execute ~A~%" (nio-yarpc:remote-execute sm "(test-rpc-list)"))) + +
Modified: branches/home/psmith/restructure/src/buffer/buffer.lisp ============================================================================== --- branches/home/psmith/restructure/src/buffer/buffer.lisp (original) +++ branches/home/psmith/restructure/src/buffer/buffer.lisp Sun Jan 14 23:00:39 2007 @@ -131,17 +131,24 @@ (sb-ext:octets-to-string (bytebuffer-read-vector bb num-bytes-to-read) :external-format external-format))
+;grrr... +;(defmethod bytebuffer-write-byte ((bb byte-buffer) value) +; (cffi:%mem-set value (buffer-buf bb) :unsigned-char position) +; (inc-position bb 1)) + ;; Write bytes from vector vec to bytebuffer (defmethod bytebuffer-write-vector((bb byte-buffer) vec) :documentation "Returns number of bytes written to bytebuffer" - (if (> (remaining bb) 0) - 0 + (format t "bytebuffer-write-vector - called with ~A ~A"bb vec) +; (if (> (remaining bb) 0) +; 0 (progn - (clear bb) - (let ((bytes-written (cffi:mem-write-vector vec (buffer-buf bb) :unsigned-char))) - (format t "bytebuffer-write-vector - byteswritten: ~A" bytes-written) +; (clear bb) + (let ((bytes-written (cffi:mem-write-vector vec (buffer-buf bb) :unsigned-char (length vec) (buffer-position bb)))) + (format t "bytebuffer-write-vector - byteswritten: ~A~%" bytes-written) (inc-position bb bytes-written) - bytes-written)))) + bytes-written))) +;)
;; Writes data from string str to bytebuffer using specified encoding ;TODO move string-to-octets into nio-compat
Modified: branches/home/psmith/restructure/src/buffer/nio-buffer-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/buffer/nio-buffer-package.lisp (original) +++ branches/home/psmith/restructure/src/buffer/nio-buffer-package.lisp Sun Jan 14 23:00:39 2007 @@ -27,5 +27,5 @@ (defpackage :nio-buffer (:use :cl) (:export - byte-buffer free-buffer remaining inc-position get-string buffer-buf bytebuffer-write-vector bytebuffer-write-string bytebuffer-read-vector bytebuffer-read-string flip + byte-buffer free-buffer remaining inc-position get-string buffer-buf bytebuffer-write-vector bytebuffer-write-string bytebuffer-read-vector bytebuffer-read-string flip clear buffer-position ))
Modified: branches/home/psmith/restructure/src/event/epoll.lisp ============================================================================== --- branches/home/psmith/restructure/src/event/epoll.lisp (original) +++ branches/home/psmith/restructure/src/event/epoll.lisp Sun Jan 14 23:00:39 2007 @@ -76,14 +76,14 @@ #+nio-debug (format t "poll-events called with :event-queue ~A~%" event-queue) (with-foreign-object (events 'epoll-event +epoll-size+) (memzero events (* +epoll-event-size+ +epoll-size+)) - (loop for res = (%epoll-wait event-queue events +epoll-size+ -1) + (loop for res = (%epoll-wait event-queue events +epoll-size+ 1000) do (progn #+nio-debug (format t "poll-events - dealing with ~A~%" res) (case res (-1 (error 'poll-error)) - (0 nil) + (return nil) (t (let ((idents nil)) (loop for i from 0 below res do
Modified: branches/home/psmith/restructure/src/io/async-fd.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/async-fd.lisp (original) +++ branches/home/psmith/restructure/src/io/async-fd.lisp Sun Jan 14 23:00:39 2007 @@ -47,18 +47,19 @@ (read-fd :initarg :read-fd :accessor read-fd)
- (foreign-read-buffer :initform (byte-buffer 4096)) - (foreign-write-buffer :initform (byte-buffer 4096) + (foreign-read-buffer :initform (byte-buffer 1024) + :accessor foreign-read-buffer) + (foreign-write-buffer :initform (byte-buffer 1024) :accessor foreign-write-buffer)
;; (lisp-read-buffer :initform (make-uint8-seq 1024)) ;; (lisp-read-buffer-write-ptr :initform 0)
- (read-ready-p :initform nil - :accessor read-ready-p + (read-ready :initform nil + :accessor read-ready :documentation "Have we been notified as read ready and not received EAGAIN from %read?") - (write-ready-p :initform nil - :accessor write-ready-p + (write-ready :initform nil + :accessor write-ready :documentation "Have we been notified as write ready and not received EAGAIN from %write?")
(close-pending :initform nil) @@ -73,21 +74,28 @@
(defmethod print-object ((async-fd async-fd) stream) - (with-slots (read-fd write-fd) async-fd - (format stream "#<ASYNC-FD r/w fd: ~D/~D.>" - read-fd write-fd))) + (with-slots (socket read-fd write-fd) async-fd + (format stream "#<ASYNC-FD :socket ~D :read-fd ~D :write-fd ~D.>" + socket read-fd write-fd)))
+;;Implement this in concrete SM for read +(defgeneric process-read (async-fd)) + +;;Implement this in concrete SM for read +(defgeneric process-write (async-fd))
-;;SM factory + + +;Loop over state machines calling process-outgoing-packets via state-machine::process-write + +;;SM factory (defun create-state-machine(sm-type read-fd write-fd socket) (let ((sm (make-instance sm-type :read-fd read-fd :write-fd write-fd :socket socket))) (format t "create-state-machine - Created ~S~%" sm) - (nio-buffer:flip (foreign-write-buffer sm)) + (nio-buffer:clear (foreign-read-buffer sm)) + (nio-buffer:clear (foreign-write-buffer sm)) sm))
-;;Implement this in concrete SM for read -(defgeneric process-read (async-fd)) - ;;override this in concrete SM for close ;(defmethod process-close((async-fd async-fd)reason)()) (defmethod process-close((async-fd async-fd)reason)()) @@ -108,9 +116,9 @@ (with-slots (foreign-read-buffer read-fd) state-machine (format t "read-more called with ~A~%" state-machine)
-#+nio-debug (format t "read-more - calling read()~%") +#+nio-debug (format t "read-more - calling read() into ~A~%" foreign-read-buffer) (let ((new-bytes (%read read-fd (buffer-buf foreign-read-buffer) (remaining foreign-read-buffer)))) -#+nio-debug (format t "read-more : Read ~A bytes~%" new-bytes) +#+nio-debug (format t "read-more : Read ~A bytes into ~A~%" new-bytes foreign-read-buffer) (cond ((< new-bytes 0) (progn @@ -124,13 +132,8 @@ nil);;(throw 'end-of-file nil))
(t - (progn ;;Update buffer position - (inc-position foreign-read-buffer new-bytes) - -#+nio-debug (format t "read-more prior to process :buffer ~A~%" foreign-read-buffer) - (process-read state-machine))))))) - + (inc-position foreign-read-buffer new-bytes))))))
(defun close-async-fd (async-fd) "Close ASYNC-FD's fd after everything has been written from write-queue." @@ -149,33 +152,38 @@
(defun write-more (async-fd) "Write data from ASYNC-FD's write bytebuffer" -#+nio-debug (format t "write-more called with ~A~%" async-fd) +#+nio-debug (format t "async-fd:write-more - called with ~A~%" async-fd) (with-slots (write-fd foreign-write-buffer close-pending) async-fd - (setf (write-ready-p async-fd) t) -#+nio-debug (format t "foreign-write-buffer b4 flip ~A~%" foreign-write-buffer) +#+nio-debug (format t "async-fd:write-more - foreign-write-buffer b4 flip ~A~%" foreign-write-buffer) (nio-buffer:flip foreign-write-buffer) -#+nio-debug (format t "foreign-write-buffer after flip ~A~%" foreign-write-buffer) +#+nio-debug (format t "async-fd:write-more -foreign-write-buffer after flip ~A~%" foreign-write-buffer)
(let ((now-written 0)) (do ((total-written 0)) ((or (eql now-written -1) (eql (remaining foreign-write-buffer) 0)) total-written) (progn (setf now-written (%write write-fd (buffer-buf foreign-write-buffer) (remaining foreign-write-buffer))) - - (format t "after write :foreign-write-buffer ~A :now-written ~A :total-written ~A ~%" foreign-write-buffer now-written total-written) +
(when (not (eql now-written -1)) (inc-position foreign-write-buffer now-written) - (incf total-written now-written)))) + (incf total-written now-written))) +#+nio-debug (format t "async-fd:write-more - after write :foreign-write-buffer ~A :now-written ~A :total-written ~A ~%" foreign-write-buffer now-written total-written) + ) +
+ (if (eql now-written -1) ;;Deal with failure - (when (eql now-written -1) (let ((err (get-errno))) (format t "write-more - write returned -1 :errno ~A~%" err) (unless (eql err 11) ;; eagain - failed to write whole buffer need to wait for next notify (let ((err-cond (make-instance 'write-error :error err))) (close err-cond) - (error err-cond)))))) + (error err-cond)))) + ;;update buffers + (if (eql (remaining foreign-write-buffer) 0) + (clear foreign-write-buffer) + (error 'not-implemented-yet))))
#+nio-debug (format t "write buffer after write :~A~%" foreign-write-buffer) (when (eql (remaining foreign-write-buffer) 0)
Modified: branches/home/psmith/restructure/src/io/nio-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nio-package.lisp (original) +++ branches/home/psmith/restructure/src/io/nio-package.lisp Sun Jan 14 23:00:39 2007 @@ -29,13 +29,13 @@ (:export
;; async-fd.lisp - async-fd process-read foreign-read-buffer foreign-write-buffer close-sm + async-fd process-read process-write foreign-read-buffer foreign-write-buffer close-sm
;; async-socket.lisp ;;nio-server - start-server + start-server add-connection ;;packet - packet + packet write-bytes ))
Modified: branches/home/psmith/restructure/src/io/nio-server.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/nio-server.lisp (original) +++ branches/home/psmith/restructure/src/io/nio-server.lisp Sun Jan 14 23:00:39 2007 @@ -33,6 +33,28 @@ ;; (format t "Accepting connection from ~S:~D [~A].~%" host port proto) t)
+;TODO thread safety +(defparameter +connected-sockets+ nil + "List of sockets that have been connected and are awaiting addition to the event-notification system") + +;loop over hashtable +(defun process-async-fds (client-hash) + (maphash #'(lambda (k async-fd) + (format t "Dealing with ~a => ~a~%" k async-fd) + + ;process reads + (when (read-ready async-fd) (read-more async-fd)) + (when (> (buffer-position (foreign-read-buffer async-fd)) 0) + (process-read async-fd)) + + ;process-writes + (process-write async-fd) + (when (write-ready async-fd) (write-more async-fd))) + client-hash)) + + + + (defun start-server (connection-handler accept-filter connection-type &key (protocol :inet) @@ -70,9 +92,8 @@ (declare (ignore cond)) (format t "Poll-error, exiting..~%") (throw 'poll-error-exit nil)))) - - (loop for unix-epoll-events = (poll-events event-queue) do - + + (loop for unix-epoll-events = (poll-events event-queue) do (loop for (fd . event) in unix-epoll-events do (cond @@ -113,10 +134,41 @@ (force-close-async-fd async-fd) (throw 'error-exit nil))))
- (when (read-event-p event) (read-more async-fd)) - (when (write-event-p event) (write-more async-fd)) - ))) - )) - ))))) + (when (read-event-p event) (setf (read-ready async-fd) t)) + (when (write-event-p event) (setf (write-ready async-fd) t)))))))) + (format t "Process client adds~%") + + ;add outgoing sockets to event queue + (format t "start-server::sockets enqueued ~A~%" +connected-sockets+) + (loop for new-fd in +connected-sockets+ do + (format t "Dealing with ~A~%" new-fd) + (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd) + (add-async-fd event-queue new-fd :read-write)) + + ;TODO thread safety + (setf +connected-sockets+ nil) + + ;loop over async-fd's processing where necessary + (process-async-fds client-hash) + )))) (ignore-errors (close-fd sock)))) + + +(defun add-connection (host port connection-type + &key + (protocol :inet) + + ) + (let ((sock nil)) + (setq sock (ecase protocol + (:inet (make-inet-socket)) + (:inet6 (make-inet6-socket)))) + + (if (connect-inet-socket sock host port) + (let ((sm (create-state-machine connection-type sock sock sock))) + (push sm +connected-sockets+) + (format t "add-connection::sockets enqueued ~A~%" +connected-sockets+) + (return-from add-connection sm)) + (format t "Connect failed!!~A ~%" (get-errno))))) + \ No newline at end of file
Modified: branches/home/psmith/restructure/src/io/packet.lisp ============================================================================== --- branches/home/psmith/restructure/src/io/packet.lisp (original) +++ branches/home/psmith/restructure/src/io/packet.lisp Sun Jan 14 23:00:39 2007 @@ -28,13 +28,13 @@
;; state-machines instantiate packets for the associated protocol ;; either based on incomming data from a packet factory or in -;; preperation for sending a packet for the current protocol. +;; preparation for sending a packet for the current protocol. +;; +;; All concete packets implement write-bytes for xfer to the io layer
-;; All concete packets implement get-bytes for xfer to the io layer
(defclass packet () ())
-(defmethod get-bytes((a-packet packet)) - ()) - +;Implement in concrete +(defgeneric write-bytes(packet nio-buffer))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc-package.lisp Sun Jan 14 23:00:39 2007 @@ -29,5 +29,5 @@ (:export
;; yarpc-state-machine - yarpc-state-machine yarpc-state-machine-factory test-rpc test-rpc-list test-rpc-string get-packet-factory + yarpc-state-machine yarpc-state-machine-factory test-rpc test-rpc-list test-rpc-string get-packet-factory remote-execute ))
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp Sun Jan 14 23:00:39 2007 @@ -35,11 +35,11 @@ (defun yarpc-packet-factory () (make-instance 'yarpc-packet-factory))
-(defconstant CALL-METHOD-PACKET-ID 0) +(defconstant CALL-METHOD-PACKET-ID #x0) (defconstant METHOD-RESPONSE-PACKET-ID 1)
(defmethod get-packet ((pf yarpc-packet-factory) buf) - (nio-buffer:flip buf) + (flip buf) ; (format t "get-packet::read string - ~A~%" (bytebuffer-read-string buf (remaining buf))) (if (>= (remaining buf) 1) ;; First byte denotes packet ID (ecase (elt (bytebuffer-read-vector buf 1) 0) @@ -49,6 +49,17 @@ (defclass call-method-packet (packet)((call-string :initarg :call :accessor get-call-string)))
+(defmethod print-object ((packet call-method-packet) stream) + (format stream "#<CALL-METHOD-PACKET ~A >" (get-call-string packet))) + +(defmethod write-bytes((packet call-method-packet) buf) + (format t "yarpc-packet-factory:write-bytes - writing ~A to ~A~%" packet buf) +; (nio-buffer:flip buf) + (nio-buffer:bytebuffer-write-vector buf #(#x0)) + (nio-buffer:bytebuffer-write-string buf (get-call-string packet)) + (format t "yarpc-packet-factory:write-bytes - written ~A~%" buf) ) + + (defclass method-response-packet (packet)())
Modified: branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp (original) +++ branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp Sun Jan 14 23:00:39 2007 @@ -40,7 +40,10 @@ ;; (test-rpc "who" 2 's) ;; response - who 2 'S ;; -(defclass yarpc-state-machine (state-machine)()) +(defclass yarpc-state-machine (state-machine) + ((outgoing-packet :initarg :outgoing-packet + :accessor outgoing-packet + :initform nil)))
(defun yarpc-state-machine () (make-instance 'yarpc-state-machine)) @@ -88,8 +91,16 @@
(define-condition authorization-error (error) ())
+ +(defmethod process-outgoing-packet((sm yarpc-state-machine)) + (format t "process-outgoing-packet called~%") + (let ((packet (outgoing-packet sm))) + (setf (outgoing-packet sm) nil) + packet)) + + ;Process a call method packet, returns -(defmethod process-packet ((sm yarpc-state-machine) (call call-method-packet)) +(defmethod process-incomming-packet ((sm yarpc-state-machine) (call call-method-packet)) ;todo change state, create method-response packet and return it ;(assert (eql state 0)) (handler-case @@ -111,3 +122,7 @@ (apply (first rpc-call-list) (rest rpc-call-list)) (error 'authorization-error))))
+ +(defmethod remote-execute ((sm yarpc-state-machine) call-string) + (setf (outgoing-packet sm) (make-instance 'call-method-packet :call call-string))) + \ No newline at end of file
Modified: branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp ============================================================================== --- branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp (original) +++ branches/home/psmith/restructure/src/statemachine/nio-sm-package.lisp Sun Jan 14 23:00:39 2007 @@ -29,5 +29,5 @@ (:export
;; state-machine - state-machine packet-factory get-packet-factory get-packet + state-machine packet-factory get-packet-factory get-packet process-outgoing-packet process-incoming-packet ))
Modified: branches/home/psmith/restructure/src/statemachine/state-machine.lisp ============================================================================== --- branches/home/psmith/restructure/src/statemachine/state-machine.lisp (original) +++ branches/home/psmith/restructure/src/statemachine/state-machine.lisp Sun Jan 14 23:00:39 2007 @@ -32,7 +32,7 @@ ;Base class for state machines ; ;Converts incomming data between bytes and packets using the supplied packet-factory. -;Converts outgoing data between packets and bytes using the get-bytes method on packet. +;Converts outgoing data between packets and bytes using the write-bytes method on packet. ; ;This way only the protocols packet heirarchy knows about binary representations and ; the SM can deal with protocol logic and state maintenance @@ -42,21 +42,32 @@ (defmethod print-object ((sm state-machine) stream) (format stream "#<STATE-MACHINE ~A >" (call-next-method sm nil)))
-(defgeneric process-packet(state-machine packet)) +(defgeneric process-incomming-packet(state-machine packet)) + + +(defgeneric process-outgoing-packet(state-machine)) +
(defgeneric get-packet-factory(state-machine))
+;The connection is read ready. +;Use the packet factory to obtain any valid packet and pass it through (defmethod process-read((sm state-machine)) - (with-slots (foreign-read-buffer foreign-write-buffer) sm + (with-slots (foreign-read-buffer) sm (let ((incomming-packet (get-packet (get-packet-factory sm) foreign-read-buffer))) (format t "state-machine::process-read - incomming packet: ~A~%" incomming-packet) (when incomming-packet - (multiple-value-bind (ret-packet close) (process-packet sm incomming-packet) - (format t "state-machine::process-read - return packet: ~A~%" ret-packet) - (when ret-packet (put-packet ret-packet foreign-write-buffer)) - (if close - (close-sm sm) - )))))) + (when (not (process-incomming-packet sm incomming-packet)) + (close-sm sm)))))) + +;The connection is write ready. +;See if theres anything ready to be written in the SM +(defmethod process-write((sm state-machine)) + (with-slots (foreign-write-buffer) sm + (let ((outgoing-packet (process-outgoing-packet sm))) + (format t "state-machine::process-write - outgoing packet: ~A~%" outgoing-packet) + (when outgoing-packet (write-bytes outgoing-packet foreign-write-buffer))))) +
@@ -65,7 +76,3 @@
; Get the packet in buf using the packet factory (defgeneric get-packet (packet-factory buf)) - -; Write the packet to the buffer -(defun put-packet (packet buf) - (nio-buffer:bytebuffer-write-vector buf (get-bytes packet)))