Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cron to clear stucked ivr call channel connection #65

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions core/tasks/ivr/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
retryIVRLock = "retry_ivr_calls"
expireIVRLock = "expire_ivr_calls"
clearIVRLock = "clear_ivr_connections"
)

func init() {
Expand All @@ -43,6 +44,14 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error
},
)

cron.StartCron(quit, rt.RP, clearIVRLock, time.Hour,
func(lockName string, lockValue string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
return clearStuckedChannelConnections(ctx, rt, clearIVRLock, lockValue)
},
)

return nil
}

Expand Down Expand Up @@ -168,6 +177,41 @@ func expireCalls(ctx context.Context, rt *runtime.Runtime, lockName string, lock
return nil
}

func clearStuckedChannelConnections(ctx context.Context, rt *runtime.Runtime, lockName string, lockValue string) error {
log := logrus.WithField("comp", "ivr_cron_cleaner").WithField("lock", lockValue)
start := time.Now()

ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()

result, err := rt.DB.ExecContext(ctx, clearStuckedChanelConnectionsSQL)
if err != nil {
return errors.Wrapf(err, "error cleaning stucked connections")
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return errors.Wrapf(err, "error getting rows affected on cleaning stucked connections")
}
if rowsAffected > 0 {
log.WithField("count", rowsAffected).WithField("elapsed", time.Since(start)).Info("stucked channel connections")
}
return nil
}

const clearStuckedChanelConnectionsSQL = `
UPDATE channels_channelconnection
SET status = 'F'
WHERE id in (
SELECT id
FROM channels_channelconnection
WHERE
(status = 'W' OR status = 'R' OR status = 'I') AND
modified_on < NOW() - INTERVAL '1 DAY'
LIMIT 100
)
`

const selectExpiredRunsSQL = `
SELECT
fr.id as run_id,
Expand Down
49 changes: 49 additions & 0 deletions core/tasks/ivr/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,52 @@ func TestRetries(t *testing.T) {
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1)
}

func TestClearConnections(t *testing.T) {
ctx, rt, db, rp := testsuite.Get()
rc := rp.Get()
defer rc.Close()

defer testsuite.Reset(testsuite.ResetAll)

ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider)

db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

// call our master starter
err := starts.CreateFlowBatches(ctx, rt, start)
assert.NoError(t, err)

task, err := queue.PopNextTask(rc, queue.HandlerQueue)
assert.NoError(t, err)
batch := &models.FlowStartBatch{}
err = json.Unmarshal(task.Task, batch)
assert.NoError(t, err)

client.callError = nil
client.callID = ivr.CallID("call1")
err = HandleFlowStartBatch(ctx, rt, batch)
assert.NoError(t, err)
testsuite.AssertQuery(t, db,
`SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1",
).Returns(1)

// update channel connection to be modified_on 2 days ago
db.MustExec(`UPDATE channels_channelconnection SET modified_on = NOW() - INTERVAL '2 DAY' WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1",
)

// cleaning
err = clearStuckedChannelConnections(ctx, rt, "cleaner_test", "cleaner_test")
assert.NoError(t, err)

// status should be Failed
testsuite.AssertQuery(t, db,
`SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusFailed, "call1",
).Returns(1)
}