fix(swarm): block downstream tasks when upstream fails#145
Merged
warren618 merged 1 commit intoMay 28, 2026
Merged
Conversation
Bug --- A failed task in layer N did NOT prevent layer N+1 tasks that declare it as a dependency from being dispatched. The orchestration loop in _execute_run set all_succeeded=False on failure but never gated the next layer. The downstream worker's upstream-summary loop in _execute_layer silently skipped the missing key (the `if source_task_id in task_summaries` check), so the worker ran with no upstream context. This is most visible in the investment_committee preset, where portfolio_manager depends_on=["task-risk"]: a failed risk_officer let PM produce a "decision" with no risk input. Reproducer attached as tests/test_swarm_dag_gating.py shows the same pattern in 3 tasks. The layer-boundary _sync_run_tasks_snapshot added in HKUDS#132 surfaces the failure to polling clients sooner, but it does not gate dispatch — PM still runs the moment the layer with the failed risk task finishes. Fix --- _execute_layer: before dispatch, walk task.depends_on and load each upstream from TaskStore. If any upstream has status != completed (failed / blocked / cancelled / missing), mark this task TaskStatus.blocked, record blocked_by + reason, emit task_blocked, and skip executor.submit. Same-layer peers with no shared upstream are unaffected (the loop continues). _execute_run: tasks blocked in _execute_layer are absent from layer_results. After the result loop, any layer_task_id missing from layer_results was blocked -> set all_succeeded = False so the run is marked RunStatus.failed at finalization. Tests ----- 3 new regression tests in tests/test_swarm_dag_gating.py (TDD red-green verified before-and-after on this PR's diff): - test_failed_upstream_blocks_downstream - test_blocked_downstream_emits_task_blocked_event - test_run_marked_failed_when_downstream_blocked Full swarm test suite: 115/115 pass (112 existing + 3 new).
warren618
added a commit
that referenced
this pull request
May 28, 2026
Follow-up to #145. The new task_blocked event was emitted without agent_id, so the CLI live panel (cli/_legacy.py) could not attach it to the per-agent row — blocked agents stayed visually "pending" until the run finalized. Three small fixes: - runtime.py: pass agent_id=task.agent_id on the task_blocked emit - cli/_legacy.py: handle task_blocked in the live event loop - models.py: extend SwarmEvent docstring to mention task_blocked - test_swarm_dag_gating.py: assert agent_id is present on the event
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
When a task fails in the DAG, downstream tasks that declare it as a
dependency are dispatched anyway, with an empty upstream context. This is
most visible in the
investment_committeepreset, where a failedrisk_officerletsportfolio_managerproduce a "decision" with no riskinput — clearly unsafe for any production use.
The layer-boundary
_sync_run_tasks_snapshotadded in #132 surfaces taskfailure to polling clients sooner, but it does not gate dispatch — PM
still runs the moment the layer containing the failed risk task finishes.
Root cause
_execute_runflipsall_succeeded = Falseon task failure but neverbreaks out of the layer loop or gates the next layer.
_execute_layerreadsfrom
task_summariesviaif source_task_id in task_summaries— afailed upstream never populates that dict, so the lookup silently
yields no upstream context. The worker then runs with whatever its
prompt template makes of an empty
{upstream_context}.Fix
_execute_layer: before submitting each task, walktask.depends_onand load each upstream from
TaskStore. If any upstream hasstatus != completed(failed / blocked / cancelled / missing), mark thistask
TaskStatus.blocked, recordblocked_by+ reason, emittask_blocked, and skipexecutor.submit. Same-layer peers with noshared upstream are unaffected — the loop continues.
_execute_run: blocked tasks are absent fromlayer_results. After theresult-processing loop, any
layer_task_idmissing fromlayer_resultswas blocked → set
all_succeeded = Falseso the run is finalized asRunStatus.failed.Blocked state cascades naturally through deeper layers: layer-3 task Z
depending on a blocked layer-2 task Y will see
Y.status="blocked" != completedand block in turn.Test plan
agent/tests/test_swarm_dag_gating.pywith 3 regressiontests, all TDD red-green verified against this diff:
test_failed_upstream_blocks_downstream— PM isblocked, notfailedorcompleted, when risk failstest_blocked_downstream_emits_task_blocked_event— events.jsonlcontains
task_blockedfor PM, NOTtask_startedtest_run_marked_failed_when_downstream_blocked— run finalizes asRunStatus.failed-k swarm, 0 regressions)events.jsonlpost-fix showstask_blockednottask_startedfor PM,and PM's
started_atremainsNone.Diff is +181 LOC additive (no behavior change for previously-passing DAGs).