We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent de4bc95 commit 282c62aCopy full SHA for 282c62a
internal/consumer/rabbitmq/consumer.go
@@ -239,7 +239,15 @@ func (r *rabbitMQ) Consume(ctx context.Context,
239
"consumer_meta": meta,
240
}).Info("stopping the worker")
241
return
242
- case receivedMsg := <-r.msgReceiver:
+ case receivedMsg, ok := <-r.msgReceiver:
243
+ if !ok {
244
+ // deliveries channel closed (e.g., due to Stop/Cancel or connection closure)
245
+ logrus.WithFields(logrus.Fields{
246
+ "queue_name": r.option.QueueName,
247
+ "consumer_meta": meta,
248
+ }).Info("message receiver closed, stopping the worker")
249
+ return
250
+ }
251
msg, err := buildMessage(meta, receivedMsg)
252
if err != nil {
253
if err == errors.ErrInvalidMessageFormat {
0 commit comments