Skip to content

Commit

Permalink
pub/sub rabbitmq: fix subscribe error (#1377)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@gmail.com>
  • Loading branch information
dmitsh authored Dec 14, 2021
1 parent 9dbdaee commit 3fcf50c
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions pubsub/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/streadway/amqp"

"github.com/dapr/components-contrib/pubsub"
Expand Down Expand Up @@ -228,25 +227,18 @@ func (r *rabbitMQ) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler
queueName := fmt.Sprintf("%s-%s", r.metadata.consumerID, req.Topic)
r.logger.Infof("%s subscribe to topic/queue '%s/%s'", logMessagePrefix, req.Topic, queueName)

// By the time Subscribe exits, the subscription should be active.
err := retry.NotifyRecover(func() error {
if _, _, _, err := r.ensureSubscription(req, queueName); err != nil {
r.logger.Warnf("failed attempt to subscribe to %s: %v", queueName, err)
return err
}
return nil
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(r.metadata.reconnectWait), 4), func(err error, d time.Duration) {
r.logger.Infof("failed to subscribe to %s. Retrying...", queueName)
}, func() {
r.logger.Infof("successfully subscribed to %s after initial error(s)", queueName)
})
if err != nil {
return err
}
ackCh := make(chan struct{}, 1)
ctx, cancel := context.WithTimeout(r.ctx, time.Minute)
defer cancel()

go r.subscribeForever(req, queueName, handler)
go r.subscribeForever(req, queueName, handler, ackCh)

return nil
select {
case <-ctx.Done():
return fmt.Errorf("failed to subscribe to %s", queueName)
case <-ackCh:
return nil
}
}

// this function call should be wrapped by channelMutex.
Expand Down Expand Up @@ -331,7 +323,10 @@ func (r *rabbitMQ) ensureSubscription(req pubsub.SubscribeRequest, queueName str
return r.channel, r.connectionCount, q, err
}

func (r *rabbitMQ) subscribeForever(req pubsub.SubscribeRequest, queueName string, handler pubsub.Handler) {
func (r *rabbitMQ) subscribeForever(req pubsub.SubscribeRequest, queueName string, handler pubsub.Handler, ackCh chan struct{}) {
// one-time notification on successful subscribe
var subscribed bool

for {
var (
err error
Expand Down Expand Up @@ -362,6 +357,12 @@ func (r *rabbitMQ) subscribeForever(req pubsub.SubscribeRequest, queueName strin
break
}

if !subscribed {
subscribed = true
ackCh <- struct{}{}
ackCh = nil
}

err = r.listenMessages(channel, msgs, req.Topic, handler)
if err != nil {
errFuncName = "listenMessages"
Expand Down

0 comments on commit 3fcf50c

Please sign in to comment.