-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
3879604
commit db44f65
Showing
11 changed files
with
703 additions
and
279 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package db | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"go.uber.org/zap" | ||
) | ||
|
||
type PollableDBQuery[ValueType any] func(ctx context.Context, lastSeenID int64, numRows int32) (results []ValueType, lastID int64, err error) | ||
|
||
// Poll whenever notified, or at an interval if not notified | ||
type PollingOptions struct { | ||
Interval time.Duration | ||
Notifier <-chan bool | ||
NumRows int32 | ||
} | ||
|
||
type DBSubscription[ValueType any] struct { | ||
ctx context.Context | ||
log *zap.Logger | ||
lastSeenID int64 | ||
options PollingOptions | ||
query PollableDBQuery[ValueType] | ||
updates chan<- []ValueType | ||
} | ||
|
||
func NewDBSubscription[ValueType any]( | ||
ctx context.Context, | ||
log *zap.Logger, | ||
query PollableDBQuery[ValueType], | ||
lastSeenID int64, | ||
options PollingOptions, | ||
) *DBSubscription[ValueType] { | ||
return &DBSubscription[ValueType]{ | ||
ctx: ctx, | ||
log: log, | ||
lastSeenID: lastSeenID, | ||
options: options, | ||
query: query, | ||
updates: nil, | ||
} | ||
} | ||
|
||
func (s *DBSubscription[ValueType]) Start() (<-chan []ValueType, error) { | ||
if s.updates != nil { | ||
return nil, fmt.Errorf("Already started") | ||
} | ||
if s.options.NumRows <= 0 || s.log == nil { | ||
return nil, fmt.Errorf("Required params not provided") | ||
} | ||
updates := make(chan []ValueType) | ||
s.updates = updates | ||
|
||
go func() { | ||
s.poll() | ||
|
||
timer := time.NewTimer(s.options.Interval) | ||
for { | ||
timer.Reset(s.options.Interval) | ||
select { | ||
case <-s.ctx.Done(): | ||
s.log.Info("Context done; stopping subscription") | ||
close(s.updates) | ||
return | ||
case <-s.options.Notifier: | ||
s.poll() | ||
case <-timer.C: | ||
s.poll() | ||
} | ||
} | ||
}() | ||
|
||
return updates, nil | ||
} | ||
|
||
func (s *DBSubscription[ValueType]) poll() { | ||
// Repeatedly query page by page until no more results | ||
for { | ||
results, lastID, err := s.query(s.ctx, s.lastSeenID, s.options.NumRows) | ||
if err != nil { | ||
s.log.Error( | ||
"Error querying for DB subscription", | ||
zap.Error(err), | ||
zap.Int64("lastSeenID", s.lastSeenID), | ||
zap.Int32("numRows", s.options.NumRows), | ||
) | ||
// Did not update lastSeenID; will retry on next poll | ||
break | ||
} | ||
if len(results) == 0 { | ||
break | ||
} | ||
s.lastSeenID = lastID | ||
s.updates <- results | ||
if int32(len(results)) < s.options.NumRows { | ||
break | ||
} | ||
} | ||
} |
Oops, something went wrong.