Skip to content

sleepy processes #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 24, 2025
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ Having established the class which carries the graph definition, we wrap a solve
:debug? t
:match-address match-address)))
;; Set up the core simulation components: the network host and the dryad.
(simulation-add-event simulation
(make-event :callback *local-courier* :time 0))
(with-active-simulation simulation
(simulation-add-event simulation
(make-event :callback *local-courier* :time 0)))
(simulation-add-event simulation (make-event :callback dryad :time 0))
;; Prime the dryad with messages to spawn workers for the eight vertices.
(loop :for j :from 1 :to 8
Expand Down
2 changes: 1 addition & 1 deletion VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"1.0.1"
"1.1.0"
2 changes: 1 addition & 1 deletion anatevka.asd
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
:version (:read-file-form "VERSION.txt")
:pathname "src/"
:depends-on (#:alexandria
(:version #:aether "1.1.0")
(:version #:aether "1.2.0")
)
:in-order-to ((asdf:test-op (asdf:test-op #:anatevka-tests)))
:around-compile (lambda (compile)
Expand Down
27 changes: 14 additions & 13 deletions src/dryad.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
;;;

(define-message-handler handler-message-sow
((dryad dryad) (message message-sow) now)
((dryad dryad) (message message-sow))
"Adjoin a new node to the problem graph.

NOTE: In the basic implementation, these messages must be waiting for the DRYAD on launch."
Expand All @@ -72,12 +72,12 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD
(log-entry :entry-type 'handling-sow
:address node-address
:id node-id)
(schedule node-process now)
(schedule node-process (now))
(setf (gethash node-address (dryad-ids dryad)) node-id
(gethash node-address (dryad-sprouted? dryad)) nil)))

(define-message-handler handler-message-discover
((dryad dryad) (message message-discover) now)
((dryad dryad) (message message-discover))
"Handles a DISCOVER message, sent by a BLOSSOM-NODE which expects a list of other BLOSSOM-NODE addresses to which it should send PINGs."
(let ((channels
(loop :for address :being :the :hash-keys :of (dryad-ids dryad)
Expand All @@ -91,7 +91,7 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD
:id (message-discover-id message)))))

(define-message-handler handler-message-sprout
((dryad dryad) (message message-sprout) now)
((dryad dryad) (message message-sprout))
"Handles a SPROUT message, indicating that a BLOSSOM-NODE has been matched (for the first time)."
(with-slots (address) message
(a:when-let ((id (gethash address (dryad-ids dryad))))
Expand All @@ -101,7 +101,7 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD
(setf (gethash address (dryad-sprouted? dryad)) t))))

(define-rpc-handler handler-message-wilting
((dryad dryad) (message message-wilting) now)
((dryad dryad) (message message-wilting))
"Handles a wilting message, indicating that a BLOSSOM-NODE is dying."
(with-slots (address) message
(let ((id (gethash address (dryad-ids dryad))))
Expand All @@ -123,11 +123,11 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD
;;; DRYAD command definitions
;;;

(define-process-upkeep ((dryad dryad) now) (START)
(define-process-upkeep ((dryad dryad)) (START)
"Start listening for ripe sprouted pairs."
(process-continuation dryad `(SPROUTS-LOOP)))

(define-process-upkeep ((dryad dryad) now) (SPROUTS-LOOP)
(define-process-upkeep ((dryad dryad)) (SPROUTS-LOOP)
"Loop over sprouted nodes, looking for ripe pairs."
;; if not everyone is sprouted, hold off
;; NB: the loop returns T if the hash table is empty, so we additionally
Expand All @@ -136,7 +136,8 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD
(loop :for sprouted? :in (a:hash-table-values (dryad-sprouted? dryad))
:always sprouted?))
(process-continuation dryad `(SPROUTS-LOOP))
(finish-with-scheduling))
(wake-on-network)
(finish-handler))
(let ((addresses (a:hash-table-keys (dryad-sprouted? dryad))))
(flet ((payload-constructor ()
(make-message-values :reply-channel (register)
Expand All @@ -157,14 +158,14 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD
(process-continuation dryad
`(SEND-EXPAND ,address)
`(SPROUTS-LOOP))
(finish-with-scheduling))
(finish-handler))
;; if we're in the middle of an augment, we should pause for a bit
;; NB: it is deliberate that we defer this to after the loop, so that we
;; might prefer to SEND-EXPAND vs. just waiting bc of an augment
(when mid-augment?
(process-continuation dryad
`(SPROUTS-LOOP))
(finish-with-scheduling)))
(finish-handler)))
;; all clear!
(let ((emitted-addresses nil)
(pairs nil))
Expand All @@ -187,7 +188,7 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD
`(PROCESS-PAIRS ,pairs)
`(WIND-DOWN)))))))

(define-process-upkeep ((dryad dryad) now) (PROCESS-PAIRS pairs)
(define-process-upkeep ((dryad dryad)) (PROCESS-PAIRS pairs)
"Iterates through `PAIRS' of addresses and sends corresponding WILT and REAP messages."
(dolist (address-pair pairs)
(log-entry :entry-type 'processing-pair
Expand All @@ -198,7 +199,7 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD
(send-message (dryad-match-address dryad)
(make-message-reap :ids id-pair)))))

(define-process-upkeep ((dryad dryad) now) (SEND-EXPAND sprout)
(define-process-upkeep ((dryad dryad)) (SEND-EXPAND sprout)
"Directs SPROUT to perform blossom expansion."
(unless (process-lockable-aborting? dryad)
;; if we directly send the sprout a blossom-expand message, it will
Expand All @@ -224,6 +225,6 @@ NOTE: In the basic implementation, these messages must be waiting for the DRYAD
(expand-reply topmost)
nil)))))))

(define-process-upkeep ((dryad dryad) now) (WIND-DOWN &optional (counter 50))
(define-process-upkeep ((dryad dryad)) (WIND-DOWN &optional (counter 50))
(unless (zerop counter)
(process-continuation dryad `(WIND-DOWN ,(1- counter)))))
12 changes: 6 additions & 6 deletions src/lock.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,36 @@
(mapcar #'blossom-edge-target-node (union (blossom-node-petals node)
(blossom-node-children node))))

(define-process-upkeep ((node blossom-node) now)
(define-process-upkeep ((node blossom-node))
(aether::%FINISH-UNLOCK)
(setf (blossom-node-pingable node) ':ALL)
(setf (blossom-node-held-by-roots node) nil)
(when (process-lockable-done-signal node) ; signal := destroy? && ! aborting?
(setf (blossom-node-parent node) nil
(blossom-node-children node) nil
(blossom-node-positive? node) t))
(schedule* (call-next-method)))
(call-next-method))

;;;
;;; blossom-node handlers
;;;

(define-message-handler handle-message-lock
((node blossom-node) (message message-lock) now)
((node blossom-node) (message message-lock))
"Prepares a BLOSSOM-NODE to be locked."
(when (blossom-node-wilting node)
(send-message (message-reply-channel message)
(make-message-rpc-done :result nil))
(finish-with-scheduling))
(finish-handler))
(unless (process-lockable-locked? node)
(setf (blossom-node-pingable node) ':NONE))
(schedule* (call-next-method)))
(call-next-method))

;;;
;;; supervisor command definitions
;;;

(define-process-upkeep ((supervisor supervisor) now)
(define-process-upkeep ((supervisor supervisor))
(BROADCAST-UNLOCK &key destroy? &allow-other-keys)
"Cleans up after BROADCAST-LOCK."
(with-slots (aborting? done-signal downward-rx-latches downward-tx-latches upward-tx-latch) supervisor
Expand Down
61 changes: 33 additions & 28 deletions src/node.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ evalutes to
(if (string< x y) x y)))

(define-message-subordinate handle-message-id-query
((node blossom-node) (message message-id-query) now)
((node blossom-node) (message message-id-query))
"Replies with the minimum ID at this macrovertex."
(cond
((null (blossom-node-petals node))
Expand All @@ -306,7 +306,7 @@ evalutes to
;; tree to respond to a safe subset (or to all) of PING requests.

(define-broadcast-handler handle-message-broadcast-pingability
((node blossom-node) (message message-broadcast-pingability) now)
((node blossom-node) (message message-broadcast-pingability))
"Changes the pingability of `NODE' (and children / petals) to `PING-TYPE'."
(with-slots (ping-type) message
(log-entry :entry-type 'changing-pingability
Expand All @@ -325,25 +325,25 @@ evalutes to
;; better to implement the micromessages after all.

(define-rpc-handler handle-message-set
((node blossom-node) (message message-set) now)
((node blossom-node) (message message-set))
"Handles a remote SETF request."
(with-slots (slots values reply-channel) message
(with-slots (slots values) message
(loop :for slot :in slots
:for value :in values
:do (setf (slot-value node slot) value))
(values)))

(define-rpc-handler handle-message-push
((node blossom-node) (message message-push) now)
((node blossom-node) (message message-push))
"Handles a remote PUSH request."
(with-slots (slot value reply-channel) message
(with-slots (slot value) message
(push value (slot-value node slot))
(values)))

(define-rpc-handler handle-message-values
((node blossom-node) (message message-values) now)
((node blossom-node) (message message-values))
"Handles a remote request for data."
(with-slots (values reply-channel) message
(with-slots (values) message
(loop :for value :in values
:collect (slot-value node value))))

Expand All @@ -355,7 +355,7 @@ evalutes to
;; and should halt its process.

(define-message-handler handle-message-sprout-on-blossom
((node blossom-node) (message message-sprout) now)
((node blossom-node) (message message-sprout))
"Handles a request that a root node (perhaps not a vertex) alert the DRYAD that it has sprouted."
(cond
((blossom-node-petals node)
Expand All @@ -367,7 +367,7 @@ evalutes to
(make-message-sprout :address (process-public-address node))))))

(define-message-handler handle-message-wilt
((node blossom-node) (message message-wilt) now)
((node blossom-node) (message message-wilt))
;; sanity check: are we actually allowed to wilt?
(when (or (blossom-node-parent node)
(blossom-node-pistil node)
Expand Down Expand Up @@ -443,28 +443,33 @@ evalutes to
;;; basic command definitions for BLOSSOM-NODE
;;;

(define-process-upkeep ((node blossom-node) now) (START)
(define-process-upkeep ((node blossom-node)) (START)
"Blossom nodes represent (contracted subgraphs of) vertex(es). The START command drops the blossom node into an infinite loop, SCAN-LOOP, which enacts the basic behavior."
(process-continuation node `(SCAN-LOOP)))

(define-process-upkeep ((node blossom-node) now) (SCAN-LOOP &optional repeat?)
(define-process-upkeep ((node blossom-node)) (SCAN-LOOP &optional repeat?)
"If we're out of things to do & unmatched, consider starting a SCAN. If REPEAT? is set, then this is _not_ our first time trying to SCAN to find something to do, and the previous attempt(s) resulted in no action."
(unless (blossom-node-wilting node)
(process-continuation node `(SCAN-LOOP))
(unless (or (process-lockable-locked? node)
(blossom-node-parent node)
(blossom-node-pistil node)
(blossom-node-match-edge node)
(blossom-node-paused? node))
;; doing this manual command injection rather than sending a message is a
;; stopgap against sending multiple SCAN messages, which looks gross / wrong.
(let ((scan-message (make-message-scan
:local-root (process-public-address node)
:weight 0
:repeat? repeat?)))

(process-continuation node `(START-SCAN ,scan-message))))))

(define-process-upkeep ((node blossom-node) now) (IDLE)
(when (or (process-lockable-locked? node)
(blossom-node-parent node)
(blossom-node-pistil node)
(blossom-node-match-edge node)
(blossom-node-paused? node))
;; in this situation, we're not currently able to make progress (because we're
;; not able to act on our own). fall asleep until some other actor picks us up
;; as a passive participant in whatever action it is performing.
(wake-on-network)
(finish-handler))
;; doing this manual command injection rather than sending a message is a
;; stopgap against sending multiple SCAN messages, which looks gross / wrong.
(let ((scan-message (make-message-scan
:local-root (process-public-address node)
:weight 0
:repeat? repeat?)))
(process-continuation node `(START-SCAN ,scan-message)))))

(define-process-upkeep ((node blossom-node)) (IDLE)
(unless (blossom-node-wilting node)
(process-continuation node `(IDLE))))
(process-continuation node `(IDLE))
(wake-on-network)))
6 changes: 3 additions & 3 deletions src/operations/augment.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
;;; supervisor command definitions
;;;

(define-process-upkeep ((supervisor supervisor) now) (START-AUGMENT pong)
(define-process-upkeep ((supervisor supervisor)) (START-AUGMENT pong)
"Sets up the augmentation procedure."
(with-slots (edges source-root target-root) pong
(let* ((edge (first edges))
Expand All @@ -59,7 +59,7 @@
`(BROADCAST-UNLOCK :destroy? ,T)
`(HALT)))))

(define-process-upkeep ((supervisor supervisor) now) (AUGMENT edge)
(define-process-upkeep ((supervisor supervisor)) (AUGMENT edge)
"Perform an augmentation along a given edge."
(unless (process-lockable-aborting? supervisor)
(log-entry :entry-type 'augment
Expand All @@ -82,7 +82,7 @@
;;;

(define-message-handler handle-message-percolate
((node blossom-node) (message message-percolate) now)
((node blossom-node) (message message-percolate))
"Performs a step in the path augmentation process."
(with-slots (traversal-edge reply-channel) message
;; does the previous node expect me to link to it?
Expand Down
Loading