Skip to content

feat(iii-queue): consume the configuration worker for runtime config#1873

Merged
andersonleal merged 4 commits into
mainfrom
feat/iii-queue-configuration-worker
Jun 18, 2026
Merged

feat(iii-queue): consume the configuration worker for runtime config#1873
andersonleal merged 4 commits into
mainfrom
feat/iii-queue-configuration-worker

Conversation

@andersonleal

@andersonleal andersonleal commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

What

Migrates the iii-queue worker to consume the builtin configuration worker for its settings, mirroring the iii-http integration (#1842).

Queue config — the backing adapter/transport plus per-queue retries, concurrency, FIFO ordering, and backoff — is now runtime-editable and hot-reloaded. The worker registers a JSON-schema'd entry, seeds it from the config.yaml block only on first boot, reads the live value (with ${VAR:default} expansion) at startup, and hot-applies configuration:updated events without an engine restart. After first boot the configuration entry is the source of truth; config.yaml is seed-only and survives restarts.

Scope decisions

  • Full adapter hot-swap — a runtime change to adapter re-instantiates the transport and restarts every consumer. Messages still queued in the old transport do not migrate (accepted tradeoff, documented in-code).
  • Best-effort per-queue apply — a strict fetch+validate gate (any failure keeps the previous config); past the gate, each queue's consumer restart is independent, so one failure is logged while the others apply and that queue keeps its previous consumer.

Hardening (from review)

  • Boot path validates the fetched config and falls back to the validated seed — a value that apply_config rejects at runtime can still be stored via configuration::set (the JSON schema can't express the fifo cross-field rule), so without this a bad stored value would boot a broken consumer on restart.
  • destroyed flag (set under apply_lock in destroy) makes a late apply — from an in-flight event or the one-shot retry task — bail instead of re-spawning consumers on a torn-down worker (the queue analogue of HttpWorker's shutdown_rx guard).
  • The builtin adapter nacks still-outstanding deliveries when a consumer channel closes, so a restart returns buffered messages to the queue (at-least-once) instead of stranding them — there is no visibility-timeout reclaim otherwise.

Tests & coverage

  • 7 unit tests (configuration.rs), 1 builtin-adapter nack-on-close test, 10 e2e tests (queue_configuration_e2e.rs): seed-on-first-boot, no-clobber across restart, hot-add (live consumer), hot-change, adapter hot-swap (with Arc::ptr_eq proving the transport was rebuilt), hot-remove, invalid-value-keeps-previous, boot-fallback-to-seed, destroy gate, ${VAR} expansion, and timeout one-shot retry (paused clock).
  • Existing queue_integration (12) exercise the graceful-degradation path (no configuration worker present).
  • llvm-cov on the queue module: config.rs 100%, configuration.rs 97.7%, queue.rs 96.8%, builtin adapter 97.3%. Residual is error/tracing branches and the unrelated durable:subscriber paths.

Verification

cargo test -p iii --lib workers::queue
cargo test -p iii --test queue_configuration_e2e
cargo test -p iii --test queue_integration
cargo clippy -p iii --all-targets   # 0 errors

Summary by CodeRabbit

  • New Features
    • Added runtime queue configuration updates without restarts, including per-queue behavior changes and hot-swapping the queue adapter/transport.
    • Queue configuration schemas are now serializable and exposed for validation and documentation.
  • Bug Fixes
    • Improved consumer shutdown handling so buffered deliveries are always cleared and nacked reliably, even under prefetch/window pressure.
  • Tests
    • Added end-to-end integration coverage for configuration seeding/persistence, hot add/remove, adapter swapping, and validation/fallback.
    • Added tests for consumer-stop nacking behavior.
  • Chores
    • Expanded built-in function detection to include the iii-queue:: prefix.

@vercel

vercel Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
iii-website Ready Ready Preview, Comment Jun 18, 2026 7:39pm

Request Review

@coderabbitai

coderabbitai Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

QueueWorker is refactored to store its adapter and configuration as hot-swappable Arc<RwLock<...>> fields via a new LiveState struct. A new configuration.rs module integrates the queue worker with the built-in configuration worker, enabling live config registration, fetch, and change handling with one-shot retry on timeout. The built-in adapter gains a bounded VecDeque to track and nack all buffered deliveries on consumer stop. All service functions and console endpoints snapshot the adapter/config to ensure stable views across updates. Unit and E2E tests validate the full pipeline including timeout retries, per-queue consumer isolation, and adapter identity changes.

Changes

Queue Worker Hot-Swap Configuration Integration

Layer / File(s) Summary
Config struct schema and serialization extensions
engine/src/workers/queue/config.rs, engine/src/workers/queue/mod.rs
FunctionQueueConfig and QueueModuleConfig gain Serialize, JsonSchema, and PartialEq derives with expanded doc comments flowing into the generated JSON Schema. Configuration module is declared in mod.rs.
Built-in adapter consumer-stop buffered-delivery nack fix
engine/src/workers/queue/adapters/builtin/adapter.rs
consume_function_queue's poll task now tracks popped delivery IDs in a prefetch-bounded outstanding VecDeque; on channel closure, all tracked deliveries are removed from delivery_map and nacked with reason "consumer stopped". Includes integration tests for both buffered and prefetch-pressure scenarios.
QueueWorker hot-swap struct refactor
engine/src/workers/queue/queue.rs
QueueWorker internal state reworked to store Arc<RwLock<LiveState>> containing adapter and config, with optional seed, per-queue AbortHandle HashMap, apply_lock, and destroyed flag for lifecycle gating.
configuration.rs integration module
engine/src/workers/queue/configuration.rs
New module implementing register_config (conditional initial seeding), fetch_config (null/missing fallback, malformed error, default queue normalization), on_config_change (apply with one-shot timeout retry), register_config_trigger, and unit tests with stub_configuration mock.
Service function and console endpoint snapshot usage
engine/src/workers/queue/queue.rs
All service enqueue, DLQ, redrive, and console list/stats/dlq endpoints are updated to use adapter_snapshot() and config_snapshot() for stable views during hot-swap transitions. Trigger registration/unregistration snapshot adapter before async blocks.
apply_config hot-swap orchestration and snapshot helpers
engine/src/workers/queue/queue.rs
Implements live/config/adapter snapshot helpers, atomic state swapping (set_config, set_live), and core apply_config under apply_lock with full transport hot-swap on adapter identity change or per-queue consumer diff on same adapter. Includes spawn_consumer and adapter resolution logic.
QueueWorker lifecycle: startup, shutdown, and build
engine/src/workers/queue/queue.rs
start_background_tasks time-bounds bus calls, validates config, spawns consumers, registers trigger, and runs catch-up apply. destroy unregisters trigger, sets destroyed, drains and aborts handles. ConfigurableWorker::build initializes all RwLock/atomic fields and seed.
MockQueueAdapter consumer setup failure isolation
engine/src/workers/queue/queue.rs
Adds setup_fail_queues set to simulate per-queue consumer setup failures for config-apply isolation testing.
Updated unit test scaffolding and timeout/retry tests
engine/src/workers/queue/queue.rs
Test helpers updated to construct QueueWorker with new state fields. New tests cover per-queue spawn isolation, downcastable tokio::time::error::Elapsed on apply timeout, and one-shot timeout retry.
Telemetry built-in function classifier update
engine/src/workers/telemetry/mod.rs
is_iii_builtin_function_id recognizes iii-queue:: prefix as built-in, with unit test assertion for iii-queue::on-config-change.
End-to-end integration test suite
engine/tests/queue_configuration_e2e.rs
Comprehensive E2E tests against real FsAdapter covering seed-on-boot, persistence across restarts, hot-add/change/remove queues, adapter hot-swap, invalid config rejection with prior state preservation, boot fallback on invalid stored config, post-destroy suppression of config changes, and ${VAR:default} placeholder expansion while preserving verbatim in stored config.

Sequence Diagram(s)

sequenceDiagram
  participant Engine
  participant QueueWorker
  participant configuration_rs as configuration.rs
  participant ConfigWorker as Configuration Worker Bus
  participant QueueAdapter

  rect rgba(70, 130, 180, 0.5)
    Note over Engine,QueueAdapter: Boot / start_background_tasks
    QueueWorker->>configuration_rs: register_config(engine, seed)
    configuration_rs->>ConfigWorker: configuration::get
    ConfigWorker-->>configuration_rs: NOT_FOUND or existing value
    configuration_rs->>ConfigWorker: register (schema + conditional initial_value)
    QueueWorker->>configuration_rs: fetch_config(engine, fallback)
    ConfigWorker-->>configuration_rs: stored value or null
    configuration_rs-->>QueueWorker: validated QueueModuleConfig
    QueueWorker->>QueueAdapter: setup_function_queue per queue config
    QueueWorker->>configuration_rs: register_config_trigger(engine)
    configuration_rs->>QueueWorker: on_config_change (catch-up apply)
  end

  rect rgba(34, 139, 34, 0.5)
    Note over Engine,QueueAdapter: Runtime config change
    ConfigWorker->>configuration_rs: configuration:updated event
    configuration_rs->>QueueWorker: on_config_change
    QueueWorker->>QueueWorker: apply_config under apply_lock
    QueueWorker->>configuration_rs: fetch_config (live value)
    alt adapter identity changed
      QueueWorker->>QueueAdapter: rebuild adapter (hot-swap)
      QueueWorker->>QueueAdapter: restart all consumers
    else same adapter, queue diff
      QueueWorker->>QueueAdapter: per-queue add/remove consumers
    end
    alt apply timeout
      configuration_rs->>configuration_rs: spawn one-shot retry after delay
      configuration_rs->>QueueWorker: on_config_change (retry)
    end
  end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • iii-hq/iii#1638: The main PR's engine/src/workers/queue/queue.rs changes around queue consumer task tracking and destroy() draining/aborting per-queue AbortHandles are directly related to this PR's QueueWorker lifecycle hot-reload work.
  • iii-hq/iii#1731: The main PR's new configuration integration and its config normalization explicitly ensure the built-in default queue always exists (and tests validate first-boot seeding of it), which directly aligns with this PR's provision built-in default queue behavior.
  • iii-hq/iii#1283: Both PRs modify the built-in function-queue adapter's consume_function_queue delivery tracking and closure nacking logic (managing popped-but-not-yet-consumed delivery IDs in engine/src/workers/queue/adapters/builtin/adapter.rs).

🐇 The queues now swap hot as a summer breeze,
Config lives on—stored values don't freeze.
A nack for each job left stranded behind,
The consumer stops, but no message's resigned.
Schema and trigger and timeout retry—
The rabbit hops on as the configs fly! 🎉

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and concisely describes the main change: integrating the configuration worker into iii-queue for runtime config management.
Description check ✅ Passed The description comprehensively covers what changed, why it matters, scope decisions, hardening measures, and test coverage with specific metrics.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/iii-queue-configuration-worker

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
engine/src/workers/queue/config.rs (1)

129-143: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject non-runnable queue configs at validation time.

validate() currently accepts concurrency: 0 and type: "fifo" with an empty message_group_field.
At Line 803 in engine/src/workers/queue/queue.rs, concurrency: 0 creates Semaphore::new(0), which can stall processing indefinitely once messages arrive. Empty FIFO group-field values also pass validation but fail every enqueue path at runtime.

Suggested patch
 pub fn validate(&self) -> anyhow::Result<()> {
     for (name, queue_config) in &self.queue_configs {
+        if queue_config.concurrency == 0 {
+            anyhow::bail!(
+                "Queue '{}' has invalid concurrency '{}'. Must be >= 1",
+                name,
+                queue_config.concurrency
+            );
+        }
+
         if queue_config.r#type != "standard" && queue_config.r#type != "fifo" {
             anyhow::bail!(
                 "Queue '{}' has invalid type '{}'. Must be 'standard' or 'fifo'",
                 name,
                 queue_config.r#type
             );
         }
-        if queue_config.r#type == "fifo" && queue_config.message_group_field.is_none() {
+        if queue_config.r#type == "fifo"
+            && queue_config
+                .message_group_field
+                .as_deref()
+                .map(str::trim)
+                .filter(|v| !v.is_empty())
+                .is_none()
+        {
             anyhow::bail!(
                 "Queue '{}' is of type 'fifo' but 'message_group_field' is not set",
                 name
             );
         }
     }
     Ok(())
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@engine/src/workers/queue/config.rs` around lines 129 - 143, The validate()
method in the queue configuration is missing validation checks that would
prevent invalid configurations from causing runtime failures. Add a check to
reject queue configs where concurrency is set to 0, as this creates an
unresponsive Semaphore at runtime. Additionally, strengthen the existing FIFO
validation to also reject cases where message_group_field is an empty string
(not just when it is None), since empty group field values pass the current
validation but fail during enqueue operations. These checks should be added
within the loop that iterates through self.queue_configs, similar to the
existing type validation checks.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@engine/src/workers/queue/adapters/builtin/adapter.rs`:
- Around line 478-481: The eviction of messages from the outstanding queue
occurs before the send operation completes, which can strand messages if the
send blocks and the receiver is dropped. Restructure the code block by moving
the eviction logic (the check on outstanding.len() and the pop_front() call) to
execute after the push_back and send operations succeed, rather than before.
This ensures that messages are only removed from outstanding after they have
been successfully sent and will not be left stranded in the delivery_map if the
channel blocks.

In `@engine/src/workers/queue/queue.rs`:
- Around line 956-973: The hot-swap logic has a race condition where set_config
is called before set_adapter, creating a window where concurrent enqueue
operations can see the new config with the old adapter, causing message
misrouting. Fix this by making adapter and config updates atomic together rather
than in separate steps. Refactor the code so that both set_config and
set_adapter operations (or their underlying state) are protected by a single
synchronization primitive or are updated as a single atomic unit, ensuring
concurrent calls always see either the old config+adapter pair or the new pair,
never a mixed state during the swap.

---

Outside diff comments:
In `@engine/src/workers/queue/config.rs`:
- Around line 129-143: The validate() method in the queue configuration is
missing validation checks that would prevent invalid configurations from causing
runtime failures. Add a check to reject queue configs where concurrency is set
to 0, as this creates an unresponsive Semaphore at runtime. Additionally,
strengthen the existing FIFO validation to also reject cases where
message_group_field is an empty string (not just when it is None), since empty
group field values pass the current validation but fail during enqueue
operations. These checks should be added within the loop that iterates through
self.queue_configs, similar to the existing type validation checks.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 277cbb29-07a0-4e08-9dd2-64a980735d26

📥 Commits

Reviewing files that changed from the base of the PR and between 2020682 and 4d00c3a.

📒 Files selected for processing (6)
  • engine/src/workers/queue/adapters/builtin/adapter.rs
  • engine/src/workers/queue/config.rs
  • engine/src/workers/queue/configuration.rs
  • engine/src/workers/queue/mod.rs
  • engine/src/workers/queue/queue.rs
  • engine/tests/queue_configuration_e2e.rs

Comment thread engine/src/workers/queue/adapters/builtin/adapter.rs Outdated
Comment thread engine/src/workers/queue/queue.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
engine/src/workers/queue/queue.rs (2)

1119-1120: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Rebuild the adapter before adopting fetched boot config.

Line 1120 only swaps config. The adapter was already constructed from the seed config.yaml value, so if the stored configuration selects a different adapter/config, Line 1138 starts consumers on the stale adapter. The catch-up at Line 1166 will not repair this because apply_config now sees the fetched config as both old and new.

Suggested boot-path fix
             Ok(config) => match config.validate() {
-                Ok(()) => self.set_config(config),
+                Ok(()) => {
+                    let adapter_changed = Self::effective_adapter(&self.config_snapshot())
+                        != Self::effective_adapter(&config);
+                    let adapter_ready = if adapter_changed {
+                        match Self::resolve_adapter(&self.engine, &config).await {
+                            Ok(adapter) => {
+                                self.set_adapter(adapter);
+                                true
+                            }
+                            Err(err) => {
+                                tracing::warn!(
+                                    error = %err,
+                                    "iii-queue: stored configuration selects an unavailable adapter; continuing with static config"
+                                );
+                                false
+                            }
+                        }
+                    } else {
+                        true
+                    };
+                    if adapter_ready {
+                        self.set_config(config);
+                    }
+                }
                 Err(err) => tracing::warn!(

Also applies to: 1136-1138

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@engine/src/workers/queue/queue.rs` around lines 1119 - 1120, The adapter is
initialized from the seed config.yaml before the fetched boot configuration is
adopted at line 1120. When config.validate() succeeds and
self.set_config(config) is called, the adapter must also be rebuilt with the new
configuration before consumers are started around line 1138. Currently,
consumers start on the stale adapter instance that was constructed from the
original seed configuration. Rebuild the adapter after setting the new config by
invoking the appropriate adapter reconstruction logic that uses the newly
adopted configuration, ensuring the adapter reflects the fetched boot config
before any consumers begin operating on it.

633-657: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Replay durable topic subscriptions when replacing the adapter.

Line 649 stores durable:subscriber state in whichever adapter was current at registration. The full swap branch installs a fresh adapter and only restarts function-queue consumers from queue_configs, so existing topic subscriptions remain on the old adapter while enqueue publishes to the new one. Re-subscribe registered durable triggers on new_adapter, or keep subscription state outside the adapter.

Also applies to: 958-984

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@engine/src/workers/queue/queue.rs` around lines 633 - 657, When the adapter
is replaced in the full swap operation, existing topic subscriptions registered
via adapter.subscribe() remain on the old adapter while new enqueue operations
publish to the new adapter, causing a mismatch. After installing the fresh
adapter during the swap, re-subscribe all registered durable triggers on the new
adapter to ensure subscriptions follow the adapter replacement. This may require
iterating through existing trigger configurations and re-calling the
subscription logic with the updated adapter reference.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@engine/src/workers/queue/queue.rs`:
- Around line 1119-1120: The adapter is initialized from the seed config.yaml
before the fetched boot configuration is adopted at line 1120. When
config.validate() succeeds and self.set_config(config) is called, the adapter
must also be rebuilt with the new configuration before consumers are started
around line 1138. Currently, consumers start on the stale adapter instance that
was constructed from the original seed configuration. Rebuild the adapter after
setting the new config by invoking the appropriate adapter reconstruction logic
that uses the newly adopted configuration, ensuring the adapter reflects the
fetched boot config before any consumers begin operating on it.
- Around line 633-657: When the adapter is replaced in the full swap operation,
existing topic subscriptions registered via adapter.subscribe() remain on the
old adapter while new enqueue operations publish to the new adapter, causing a
mismatch. After installing the fresh adapter during the swap, re-subscribe all
registered durable triggers on the new adapter to ensure subscriptions follow
the adapter replacement. This may require iterating through existing trigger
configurations and re-calling the subscription logic with the updated adapter
reference.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: afc49591-5ca0-4489-a175-d45621c5dae3

📥 Commits

Reviewing files that changed from the base of the PR and between 4d00c3a and d1dc7df.

📒 Files selected for processing (4)
  • engine/src/workers/queue/configuration.rs
  • engine/src/workers/queue/queue.rs
  • engine/src/workers/telemetry/mod.rs
  • engine/tests/queue_configuration_e2e.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • engine/tests/queue_configuration_e2e.rs
  • engine/src/workers/queue/configuration.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
engine/src/workers/queue/queue.rs (3)

151-156: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use one LiveState snapshot for config-derived keys and adapter calls.

resolve_queue_key() snapshots config internally, then these callers snapshot the adapter separately. A hot-swap between those reads can redrive/peek/discard against an adapter generation that does not match the config used to add __fn_queue::.

Suggested direction
-    fn resolve_queue_key(&self, name: &str) -> String {
-        if self.config_snapshot().queue_configs.contains_key(name) {
+    fn resolve_queue_key_for(config: &QueueModuleConfig, name: &str) -> String {
+        if config.queue_configs.contains_key(name) {
             format!("__fn_queue::{}", name)
         } else {
             name.to_string()
         }
     }
+
+    fn resolve_queue_key(&self, name: &str) -> String {
+        let config = self.config_snapshot();
+        Self::resolve_queue_key_for(config.as_ref(), name)
+    }

Then use the same snapshot in each operation that needs both:

-        let resolved = self.resolve_queue_key(&input.queue);
-        match self.adapter_snapshot().redrive_dlq(&resolved).await {
+        let live = self.live_snapshot();
+        let resolved = Self::resolve_queue_key_for(live.config.as_ref(), &input.queue);
+        match live.adapter.redrive_dlq(&resolved).await {

Apply the same pattern to redrive_message, discard_message, console_topic_stats, and console_dlq_messages.

Also applies to: 292-293, 344-347, 406-409, 493-500, 571-574

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@engine/src/workers/queue/queue.rs` around lines 151 - 156, The
resolve_queue_key() method creates its own config snapshot internally, but
callers then snapshot the adapter separately. This creates a race condition
where the adapter generation may not match the config used to determine the
queue key. Refactor resolve_queue_key() to accept a LiveState snapshot parameter
instead of creating one internally, then update all callers (including
redrive_message, discard_message, console_topic_stats, and console_dlq_messages)
to obtain a single snapshot and pass it to resolve_queue_key() along with their
adapter operations to ensure consistency between config and adapter generations.

645-669: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Replay durable subscribers when the adapter is replaced.

register_trigger() materializes durable subscriptions inside the current adapter via subscribe(), but the adapter hot-swap path only restarts function-queue consumers. After set_live(...), existing durable subscriber triggers are not subscribed on the new adapter, so subsequent durable publishes can stop faning out to registered subscribers.

Replay registered durable:subscriber triggers onto new_adapter during the hot-swap, or move durable subscription state outside the adapter so replacing the transport cannot drop it.

Also applies to: 987-1021

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@engine/src/workers/queue/queue.rs` around lines 645 - 669, The
`register_trigger()` method currently subscribes durable triggers to the current
adapter via the `adapter.subscribe()` call, but when the adapter is hot-swapped
during `set_live()`, these subscriptions are lost because they only exist in the
old adapter instance. To fix this, you must either replay all existing durable
subscriber triggers onto the new adapter immediately after the adapter
replacement completes, or refactor to store durable subscription state outside
the adapter so subscriptions persist across adapter replacements. If replaying,
collect all registered durable subscriber triggers and call their subscribe
logic on the new adapter instance after the swap occurs.

1149-1150: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Resolve the fetched startup adapter before adopting fetched config.

On boot/reload, a stored configuration can select a different adapter than config.yaml, but this path only calls set_config(config). That leaves LiveState as new config + old adapter; the catch-up apply_config() will not fix it because it compares the already-adopted config to the fetched value and sees no adapter change.

Suggested fix
-            Ok(config) => match config.validate() {
-                Ok(()) => self.set_config(config),
+            Ok(config) => match config.validate() {
+                Ok(()) => {
+                    if Self::effective_adapter(&self.config_snapshot())
+                        != Self::effective_adapter(&config)
+                    {
+                        match Self::resolve_adapter(&self.engine, &config).await {
+                            Ok(adapter) => self.set_live(config, adapter),
+                            Err(err) => tracing::warn!(
+                                error = %err,
+                                "iii-queue: stored configuration selects an unavailable adapter; continuing with static config"
+                            ),
+                        }
+                    } else {
+                        self.set_config(config);
+                    }
+                }
                 Err(err) => tracing::warn!(
                     error = %err,
                     "iii-queue: stored configuration is invalid; continuing with static config"
                 ),
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@engine/src/workers/queue/queue.rs` around lines 1149 - 1150, The issue is
that when the fetched configuration is adopted via set_config(config) in the
Ok(()) branch after config.validate(), the adapter is not resolved from the
fetched configuration first. This causes LiveState to have a mismatch of new
config combined with the old adapter. To fix this, before calling
set_config(config), you need to resolve the startup adapter from the fetched
config so that the adapter is properly updated to match the fetched
configuration before adoption.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@engine/src/workers/queue/queue.rs`:
- Around line 151-156: The resolve_queue_key() method creates its own config
snapshot internally, but callers then snapshot the adapter separately. This
creates a race condition where the adapter generation may not match the config
used to determine the queue key. Refactor resolve_queue_key() to accept a
LiveState snapshot parameter instead of creating one internally, then update all
callers (including redrive_message, discard_message, console_topic_stats, and
console_dlq_messages) to obtain a single snapshot and pass it to
resolve_queue_key() along with their adapter operations to ensure consistency
between config and adapter generations.
- Around line 645-669: The `register_trigger()` method currently subscribes
durable triggers to the current adapter via the `adapter.subscribe()` call, but
when the adapter is hot-swapped during `set_live()`, these subscriptions are
lost because they only exist in the old adapter instance. To fix this, you must
either replay all existing durable subscriber triggers onto the new adapter
immediately after the adapter replacement completes, or refactor to store
durable subscription state outside the adapter so subscriptions persist across
adapter replacements. If replaying, collect all registered durable subscriber
triggers and call their subscribe logic on the new adapter instance after the
swap occurs.
- Around line 1149-1150: The issue is that when the fetched configuration is
adopted via set_config(config) in the Ok(()) branch after config.validate(), the
adapter is not resolved from the fetched configuration first. This causes
LiveState to have a mismatch of new config combined with the old adapter. To fix
this, before calling set_config(config), you need to resolve the startup adapter
from the fetched config so that the adapter is properly updated to match the
fetched configuration before adoption.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 3df62e24-f0e9-43b3-bd58-9b90f6466dc4

📥 Commits

Reviewing files that changed from the base of the PR and between d82fbad and aa65873.

📒 Files selected for processing (2)
  • engine/src/workers/queue/adapters/builtin/adapter.rs
  • engine/src/workers/queue/queue.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • engine/src/workers/queue/adapters/builtin/adapter.rs

sergiofilhowz
sergiofilhowz previously approved these changes Jun 18, 2026
Migrate the `iii-queue` worker to consume the builtin `configuration`
worker for its settings, mirroring the `iii-http` integration (#1842).
Queue config (adapter/transport selection and per-queue retries,
concurrency, FIFO ordering, backoff) is now runtime-editable and
hot-reloaded: the worker registers a JSON-schema'd entry, seeds it from
the config.yaml block only on first boot, reads the live value (with
`${VAR:default}` expansion) at startup, and hot-applies
`configuration:updated` events without an engine restart. After first
boot the configuration entry is the source of truth; config.yaml is
seed-only and survives restarts.

Scope decisions:
- Full adapter hot-swap: a runtime change to `adapter` re-instantiates
  the transport and restarts every consumer (queued messages in the old
  transport do not migrate — accepted tradeoff, documented in-code).
- Best-effort per-queue apply: a strict fetch+validate gate (any failure
  keeps the previous config); past the gate each queue's consumer
  restart is independent, so one failure is logged while the others
  apply and that queue keeps its previous consumer.

Hardening:
- Boot path validates the fetched config and falls back to the seed,
  matching the runtime gate (a value `apply_config` rejects can be
  stored via `configuration::set` since the schema can't express the
  fifo cross-field rule).
- `destroyed` flag (set under `apply_lock` in `destroy`) makes a late
  apply — from an in-flight event or the one-shot retry — bail instead
  of re-spawning consumers on a torn-down worker.
- Builtin adapter nacks still-outstanding deliveries when a consumer
  channel closes, so a restart returns buffered messages to the queue
  (at-least-once) instead of stranding them (no visibility-timeout
  reclaim otherwise).

Tests: 7 unit (configuration), 10 e2e (seed/no-clobber/hot-add/hot-change/
adapter-swap/remove/invalid-keeps-previous/destroy-gate/env-expansion/
timeout-retry), plus a builtin-adapter nack-on-close test. Coverage:
config.rs 100%, configuration.rs 97.7%, queue.rs 96.8%.
…argo fmt

CI: `all_functions_on_bare_engine_are_iii_builtins` flagged the new
`iii-queue::on-config-change` handler as a non-builtin function.
`is_iii_builtin_function_id` allowlists `iii-http::` and `iii-state::`
(which register the same config handler) but not `iii-queue::`; add the
prefix and a matching unit assertion. Also apply `cargo fmt`.
Two valid findings from the PR review:

1. Builtin adapter could strand the oldest buffered delivery on consumer
   stop. The `outstanding` window was trimmed BEFORE the channel send; if
   the buffer was full and the send blocked, then the receiver was
   dropped, the just-evicted delivery_id was never nacked (stranded in
   delivery_map). Trim AFTER a successful send instead — a send only
   unblocks once the consumer makes room, so anything beyond the window
   has necessarily been received. Added a regression test
   (`consumer_stop_nacks_oldest_when_buffer_full`) that reproduces the
   stranding (prefetch=2, 3 jobs, drop receiver mid-blocked-send).

2. Adapter hot-swap exposed a mixed-state window: `set_config` ran before
   `set_adapter` with a consumer drain (awaits) between them, so a
   concurrent enqueue could read the new per-queue config while still
   publishing through the old, dying transport. Combine config + adapter
   into a single `LiveState` behind one `RwLock` and swap them atomically
   via `set_live`; `enqueue_to_function_queue` now reads both from one
   `live_snapshot()`, so no reader can observe a half-applied swap.
Adapter config now renders as a closed per-adapter discriminated union
(keyed on `name`), each branch carrying a typed `config`, mirroring the
iii-state integration — the console shows typed fields instead of an
opaque, read-only object. Branches: builtin/redis/bridge, plus rabbitmq
and an in-process `memory` test transport under their respective features.
`configuration::set` rejects unknown adapter names at the schema gate.

Also reject `concurrency: 0` (schema `range(min=1)` + `validate()` guard):
it reaches `mpsc::channel(0)` and panics the consumer — crashing boot from
a seed value, or the config-change handler mid-apply at runtime.

Tests: closed-union schema tests (real jsonschema validator), unbuildable-
adapter keeps-previous + unknown-name rejection e2e, strengthened
consumer-stop nack tests (assert redelivery to the DLQ, not a silent drop),
and adapter rebuild-failure coverage.
@andersonleal andersonleal force-pushed the feat/iii-queue-configuration-worker branch from eba161d to 342d709 Compare June 18, 2026 19:39
@andersonleal andersonleal merged commit 67ce7e0 into main Jun 18, 2026
36 checks passed
@andersonleal andersonleal deleted the feat/iii-queue-configuration-worker branch June 18, 2026 20:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants