Skip to content

Refactored sending lock of subscriber #589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

stephanvanzwienen
Copy link

@stephanvanzwienen stephanvanzwienen commented Jul 30, 2025

Motivation / Background

Refactored sending lock of subscriber to the SendToSubscriber loop to be as close as possible to the sending part of the logic. This results in the output buffer actually being utilized and fixes blocking behavior mentioned in issue #275 and #382.

Detail

In a project we came to a point where we wanted to rely on "flushing" behavior noted in the docs here. Which we expected to work as go channels can still be read from even after calling close as per doc

But to our surprise in testing where we set up a PubSub and published an equal amount of messages as the buffer and then calling close on the pubsub. We only received a fraction of the messages in our subscriber handler. Note that we made sure to only start reading from the channel after a few seconds, which should be plenty amount of time, to give the publisher a change to fill up the buffer. We noticed that the buffer didn't contain the expected published messages. But we did notice the messages being present int the persistedMessages map when making usage of the persist config. So timing couldn't being the issue.
I tracked down the culprit of this (for us) unexpected behavior to the s.sending.Lock() being in a place where it introduces a deadlock.

When Publishing() multiple messages each message result in a go routine which loops over all the subscriber of the targeted topic. For each subscriber a new go routine is created which finally send the actual message through subscriber.sendMessageToSubscriber(message, logFields).

However, ALL go routines share the same "sending lock". Thus, if multiple messages are sent concurrently 1 go routine will keep this lock hostage ("deadlock") until it returns from this sendMessageToSubscriber func. This is due the unlock being done on defer AND because of the last switch case where the func only returns if:

  1. the message is acked
  2. the message is nacked
  3. s.closing is received
    And this switch case could take a while to actually trigger depending on when a dev calls one of these 3 options.
    This part of the logic has no further reason (as far as I understand) to keep this sending lock.

Now by moving the lock /unlock to the "sending" part of the logic we avoid this deadlock. And circles back to our issue where now the buffer is actually utilized and flushing can be done. While also fixing #275 and #382.

Alternative approaches considered (if applicable)

N/A

Checklist

The resources of our team are limited. There are a couple of things that you can do to help us merge your PR faster:

  • I wrote tests for the changes.
  • All tests are passing.
    • If you are testing a Pub/Sub, you can start Docker with make up.
    • You can start with make test_short for a quick check.
    • If you want to run all tests, use make test.
  • Code has no breaking changes.
  • (If applicable) documentation on watermill.io is updated.

…o be as close as possible to the sending part of the logic. This results in the outputbuffer acutally being utilized and fixes blocking behavior mentioned in issue 275/382
@stephanvanzwienen stephanvanzwienen changed the title refacotored sending lock of subscriber to the SendToSubscriber loop t… Refactored sending lock of subscriber Jul 30, 2025
@stephanvanzwienen
Copy link
Author

@m110 @roblaszczak can one of you have a look at this ? 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant