diff --git a/connection.go b/connection.go index d5eff34..dcd4d8f 100644 --- a/connection.go +++ b/connection.go @@ -139,14 +139,17 @@ func (c *Conn) Close() { } func (c *Conn) close() { + // Allow users to do anything they need to do before we tear everything down c.stopFunc() - _ = c.conn.Close() c.responseChanMutex.Lock() defer c.responseChanMutex.Unlock() for key, responseChan := range c.responseChannels { close(responseChan) delete(c.responseChannels, key) } + + // Close the connection only after we have the response channel lock and we have deleted all response channels to ensure we don't receive on a closed channel + _ = c.conn.Close() } func (c *Conn) callEventListener(event *Event) { @@ -235,29 +238,45 @@ func (c *Conn) eventLoop() { func (c *Conn) receiveLoop() { for c.runningContext.Err() == nil { - response, err := c.readResponse() + err := c.doMessage() if err != nil { + log.Println("Error receiving message", err) break } + } +} - c.responseChanMutex.RLock() - responseChan, ok := c.responseChannels[response.GetHeader("Content-Type")] - if !ok && len(c.responseChannels) <= 0 { - // We must have shutdown! - break - } - c.responseChanMutex.RUnlock() - if ok && c.runningContext.Err() == nil { - ctx, cancel := context.WithTimeout(c.runningContext, 5*time.Second) - select { - case responseChan <- response: - case <-c.runningContext.Done(): - cancel() - return - case <-ctx.Done(): - log.Printf("No one to handle response %v\n", response) - } - cancel() +func (c *Conn) doMessage() error { + response, err := c.readResponse() + if err != nil { + return err + } + + c.responseChanMutex.RLock() + defer c.responseChanMutex.RUnlock() + responseChan, ok := c.responseChannels[response.GetHeader("Content-Type")] + if !ok && len(c.responseChannels) <= 0 { + // We must have shutdown! + return errors.New("no response channels") + } + + // We have a handler + if ok { + // Only allow 5 seconds to allow the handler to receive hte message on the channel + ctx, cancel := context.WithTimeout(c.runningContext, 5*time.Second) + defer cancel() + + select { + case responseChan <- response: + case <-c.runningContext.Done(): + // Parent connection context has stopped we most likely shutdown in the middle of waiting for a handler to handle the message + return c.runningContext.Err() + case <-ctx.Done(): + // Do not return an error since this is not fatal but log since it could be a indication of problems + log.Printf("No one to handle response\nIs the connection overloaded or stopping?\n%v\n\n", response) } + } else { + return errors.New("no response channel for Content-Type: " + response.GetHeader("Content-Type")) } + return nil } diff --git a/example/events/events.go b/example/events/events.go index 2b03623..3d43049 100644 --- a/example/events/events.go +++ b/example/events/events.go @@ -21,7 +21,7 @@ import ( func main() { // Connect to FreeSWITCH - conn, err := eslgo.Dial("127.0.0.1:8021", "ClueCon", func() { + conn, err := eslgo.Dial("127.0.0.1:8021", "ClueCon1", func() { fmt.Println("Inbound Connection Disconnected") }) if err != nil {