Skip to content

Commit

Permalink
Update reconnection logic. Add listeners to channel/connection close …
Browse files Browse the repository at this point in the history
…events. (#168)

If amqp channel is closed we close connection to trigger reconnection logic.
Some additional info can be found here https://github.com/rabbitmq/amqp091-go/blob/86bd7954c003db8771ba0989dc8484aba2088d11/doc.go#L109
  • Loading branch information
vminkobin authored Oct 6, 2022
1 parent b780095 commit c4b2ca4
Showing 1 changed file with 57 additions and 32 deletions.
89 changes: 57 additions & 32 deletions mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,10 @@ func Connect(url string, options ...Option) (*Client, error) {
}

func (c *Client) Close() error {
if c.amqpChan != nil {
err := c.amqpChan.Close()
if err != nil {
log.Errorf("Close amqp channel: %v", err)
}
}

if c.conn != nil && !c.conn.IsClosed() {
err := c.conn.Close()
if err != nil {
return err
return fmt.Errorf("close connection: %v", err)
}
}

Expand Down Expand Up @@ -133,57 +126,89 @@ func (c *Client) ListenConnectionAsync(ctx context.Context, wg *sync.WaitGroup)
}()
}

func (c *Client) initNotifyCloseListeners() (<-chan *amqp.Error, <-chan *amqp.Error) {
return c.conn.NotifyClose(make(chan *amqp.Error)),
c.amqpChan.NotifyClose(make(chan *amqp.Error))
}

func (c *Client) ListenConnection(ctx context.Context) error {
log.Info("start listen connection")

connErrCh, chanErrCh := c.initNotifyCloseListeners()

for {
select {

case <-ctx.Done():
err := c.Close()
if err != nil {
return fmt.Errorf("close mq: %v", err)
}
return nil
default:
err := c.checkConnection(ctx)

case err, ok := <-chanErrCh:
if !ok {
// stop receiving from this channel to avoid multiple reads from closed channel before reconnected
chanErrCh = nil
}

log.Info("received amqp channel close notification")
if err != nil {
log.Errorf("amqp channel closed with error: %v", err)
}

if c.conn.IsClosed() {
break
}

// close connection to trigger reconnect logic
// it will send notification to connErrCh
if err := c.conn.Close(); err != nil {
return fmt.Errorf("close connection: %v", err)
}

case err := <-connErrCh:
log.Info("received connection close notification")
if err != nil {
log.Errorf("connection closed with error: %v", err)
}

if err := c.reconnectWithRetry(ctx); err != nil {
return fmt.Errorf("check mq connection: %v", err)
}

time.Sleep(time.Second * 10)
// reassign listeners to new connection and channel
connErrCh, chanErrCh = c.initNotifyCloseListeners()
}
}
}

func (c *Client) checkConnection(ctx context.Context) error {
if c.conn.IsClosed() {
log.Warn("MQ connection lost")
func (c *Client) reconnectWithRetry(ctx context.Context) error {
for i := 0; i < reconnectionAttemptsNum; i++ {
time.Sleep(reconnectionTimeout)

for i := 0; i < reconnectionAttemptsNum; i++ {
time.Sleep(reconnectionTimeout)
log.Info("Connecting to MQ... Attempt ", i+1)

log.Info("Connecting to MQ... Attempt ", i+1)
err := c.reconnect()
if err != nil {
log.Errorf("Reconnect: %v", err)
continue
}

err := c.reconnect()
for _, connClient := range c.connClients {
err = connClient.Reconnect(ctx)
if err != nil {
log.Errorf("Reconnect: %v", err)
log.Errorf("Reconnect for %+v: %v", connClient, err)
continue
}

for _, connClient := range c.connClients {
err = connClient.Reconnect(ctx)
if err != nil {
log.Errorf("Reconnect for %+v: %v", connClient, err)
continue
}
}

log.Info("MQ connection established")
return nil
}

return fmt.Errorf("failed to establish MQ connection")
log.Info("MQ connection established")
return nil
}

return nil
return fmt.Errorf("failed to establish MQ connection")

}

func (c *Client) reconnect() error {
Expand Down

0 comments on commit c4b2ca4

Please sign in to comment.