Skip to content

Commit

Permalink
Fix the locking in the receive loop to ensure we do not receive messa…
Browse files Browse the repository at this point in the history
…ges on closed channels in the middle of shutting down.
  • Loading branch information
winsock committed Feb 16, 2021
1 parent f4c7402 commit eb9b0ed
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 21 deletions.
59 changes: 39 additions & 20 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,17 @@ func (c *Conn) Close() {
}

func (c *Conn) close() {
// Allow users to do anything they need to do before we tear everything down
c.stopFunc()
_ = c.conn.Close()
c.responseChanMutex.Lock()
defer c.responseChanMutex.Unlock()
for key, responseChan := range c.responseChannels {
close(responseChan)
delete(c.responseChannels, key)
}

// Close the connection only after we have the response channel lock and we have deleted all response channels to ensure we don't receive on a closed channel
_ = c.conn.Close()
}

func (c *Conn) callEventListener(event *Event) {
Expand Down Expand Up @@ -235,29 +238,45 @@ func (c *Conn) eventLoop() {

func (c *Conn) receiveLoop() {
for c.runningContext.Err() == nil {
response, err := c.readResponse()
err := c.doMessage()
if err != nil {
log.Println("Error receiving message", err)
break
}
}
}

c.responseChanMutex.RLock()
responseChan, ok := c.responseChannels[response.GetHeader("Content-Type")]
if !ok && len(c.responseChannels) <= 0 {
// We must have shutdown!
break
}
c.responseChanMutex.RUnlock()
if ok && c.runningContext.Err() == nil {
ctx, cancel := context.WithTimeout(c.runningContext, 5*time.Second)
select {
case responseChan <- response:
case <-c.runningContext.Done():
cancel()
return
case <-ctx.Done():
log.Printf("No one to handle response %v\n", response)
}
cancel()
func (c *Conn) doMessage() error {
response, err := c.readResponse()
if err != nil {
return err
}

c.responseChanMutex.RLock()
defer c.responseChanMutex.RUnlock()
responseChan, ok := c.responseChannels[response.GetHeader("Content-Type")]
if !ok && len(c.responseChannels) <= 0 {
// We must have shutdown!
return errors.New("no response channels")
}

// We have a handler
if ok {
// Only allow 5 seconds to allow the handler to receive hte message on the channel
ctx, cancel := context.WithTimeout(c.runningContext, 5*time.Second)
defer cancel()

select {
case responseChan <- response:
case <-c.runningContext.Done():
// Parent connection context has stopped we most likely shutdown in the middle of waiting for a handler to handle the message
return c.runningContext.Err()
case <-ctx.Done():
// Do not return an error since this is not fatal but log since it could be a indication of problems
log.Printf("No one to handle response\nIs the connection overloaded or stopping?\n%v\n\n", response)
}
} else {
return errors.New("no response channel for Content-Type: " + response.GetHeader("Content-Type"))
}
return nil
}
2 changes: 1 addition & 1 deletion example/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

func main() {
// Connect to FreeSWITCH
conn, err := eslgo.Dial("127.0.0.1:8021", "ClueCon", func() {
conn, err := eslgo.Dial("127.0.0.1:8021", "ClueCon1", func() {
fmt.Println("Inbound Connection Disconnected")
})
if err != nil {
Expand Down

0 comments on commit eb9b0ed

Please sign in to comment.