Skip to content

Commit

Permalink
Checkpoint - optional refactor of subtaskrunner into client methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
Crystal Lemire committed Oct 11, 2023
1 parent 3f534dc commit 108949f
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 18 deletions.
11 changes: 4 additions & 7 deletions protocol/daemons/pricefeed/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func (c *Client) start(ctx context.Context,
grpcClient daemontypes.GrpcClient,
exchangeIdToStartupConfig map[types.ExchangeId]*types.ExchangeStartupConfig,
exchangeIdToExchangeDetails map[types.ExchangeId]types.ExchangeQueryDetails,
subTaskRunner SubTaskRunner,
) (err error) {
// 1. Establish connections to gRPC servers.
queryConn, err := grpcClient.NewTcpConnection(ctx, appFlags.GrpcAddress)
Expand Down Expand Up @@ -222,7 +221,7 @@ func (c *Client) start(ctx context.Context,
c.runningSubtasksWaitGroup.Add(1)
go func() {
defer c.runningSubtasksWaitGroup.Done()
subTaskRunner.StartPriceEncoder(
c.StartPriceEncoder(
exchangeId,
priceFeedMutableMarketConfigs,
exchangeToMarketPrices,
Expand All @@ -235,7 +234,7 @@ func (c *Client) start(ctx context.Context,
c.runningSubtasksWaitGroup.Add(1)
go func() {
defer c.runningSubtasksWaitGroup.Done()
subTaskRunner.StartPriceFetcher(
c.StartPriceFetcher(
ticker,
stop,
priceFeedMutableMarketConfigs,
Expand All @@ -253,7 +252,7 @@ func (c *Client) start(ctx context.Context,
c.runningSubtasksWaitGroup.Add(1)
go func() {
defer c.runningSubtasksWaitGroup.Done()
subTaskRunner.StartMarketParamUpdater(
c.StartMarketParamUpdater(
ctx,
marketParamUpdaterTicker,
marketParamUpdaterStop,
Expand All @@ -277,7 +276,7 @@ func (c *Client) start(ctx context.Context,
c.completeStartup()

pricefeedClient := api.NewPriceFeedServiceClient(daemonConn)
subTaskRunner.StartPriceUpdater(
c.StartPriceUpdater(
ctx,
priceUpdaterTicker,
priceUpdaterStop,
Expand All @@ -301,7 +300,6 @@ func StartNewClient(
grpcClient daemontypes.GrpcClient,
exchangeIdToStartupConfig map[types.ExchangeId]*types.ExchangeStartupConfig,
exchangeIdToExchangeDetails map[types.ExchangeId]types.ExchangeQueryDetails,
subTaskRunner SubTaskRunner,
) (client *Client) {
client = newClient()
client.runningSubtasksWaitGroup.Add(1)
Expand All @@ -315,7 +313,6 @@ func StartNewClient(
grpcClient,
exchangeIdToStartupConfig,
exchangeIdToExchangeDetails,
subTaskRunner,
)
if err != nil {
logger.Error("Error initializing pricefeed daemon: %w", err.Error())
Expand Down
3 changes: 1 addition & 2 deletions protocol/daemons/pricefeed/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

var (
subTaskRunnerImpl = SubTaskRunnerImpl{}
subTaskRunnerImpl = newClient()
)

// FakeSubTaskRunner acts as a dummy struct replacing `SubTaskRunner` that simply advances the
Expand Down Expand Up @@ -337,7 +337,6 @@ func TestStop(t *testing.T) {
&daemontypes.GrpcClientImpl{},
constants.TestExchangeStartupConfigs,
constants.TestExchangeIdToExchangeQueryDetails,
&SubTaskRunnerImpl{},
)

// Stop the daemon.
Expand Down
16 changes: 7 additions & 9 deletions protocol/daemons/pricefeed/client/sub_task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@ var (
}
)

// SubTaskRunnerImpl is the struct that implements the `SubTaskRunner` interface.
type SubTaskRunnerImpl struct{}

// Ensure the `SubTaskRunnerImpl` struct is implemented at compile time.
var _ SubTaskRunner = (*SubTaskRunnerImpl)(nil)
var _ SubTaskRunner = (*Client)(nil)

// SubTaskRunner is the interface for running pricefeed client task functions.
type SubTaskRunner interface {
Expand Down Expand Up @@ -75,7 +72,7 @@ type SubTaskRunner interface {
// 2) Transform `MarketPriceTimestamps` and exchange ids into an `UpdateMarketPricesRequest` struct.
// StartPriceUpdater runs in the daemon's main goroutine and does not need access to the daemon's wait group
// to signal task completion.
func (s *SubTaskRunnerImpl) StartPriceUpdater(
func (_ *Client) StartPriceUpdater(
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand All @@ -87,7 +84,7 @@ func (s *SubTaskRunnerImpl) StartPriceUpdater(
select {
case <-ticker.C:
err := RunPriceUpdaterTaskLoop(ctx, exchangeToMarketPrices, priceFeedServiceClient, logger)
if err != nil {
if err != nil && err != types.ErrEmptyMarketPriceUpdate {
panic(err)
}

Expand All @@ -103,7 +100,7 @@ func (s *SubTaskRunnerImpl) StartPriceUpdater(
// StartPriceEncoder reads price fetcher responses from a shared channel, and does not need a ticker or stop
// signal from the daemon to exit. It marks itself as done in the daemon's wait group when the price fetcher
// closes the shared channel.
func (s *SubTaskRunnerImpl) StartPriceEncoder(
func (_ *Client) StartPriceEncoder(
exchangeId types.ExchangeId,
configs types.PricefeedMutableMarketConfigs,
exchangeToMarketPrices types.ExchangeToMarketPrices,
Expand Down Expand Up @@ -149,7 +146,7 @@ func (s *SubTaskRunnerImpl) StartPriceEncoder(
// NOTE: the subtask response shared channel has a buffer size and goroutines will block if the buffer is full.
// NOTE: the price fetcher kicks off 1 to n go routines every time the subtask loop runs, but the subtask
// loop blocks until all go routines are done. This means that these go routines are not tracked by the wait group.
func (s *SubTaskRunnerImpl) StartPriceFetcher(
func (_ *Client) StartPriceFetcher(
ticker *time.Ticker,
stop <-chan bool,
configs types.PricefeedMutableMarketConfigs,
Expand Down Expand Up @@ -212,7 +209,7 @@ func (s *SubTaskRunnerImpl) StartPriceFetcher(

// StartMarketParamUpdater periodically starts a goroutine to update the market parameters that control which
// markets the daemon queries and how they are queried and computed from each exchange.
func (s *SubTaskRunnerImpl) StartMarketParamUpdater(
func (_ *Client) StartMarketParamUpdater(
ctx context.Context,
ticker *time.Ticker,
stop <-chan bool,
Expand Down Expand Up @@ -291,6 +288,7 @@ func RunPriceUpdaterTaskLoop(
metrics.PriceUpdaterZeroPrices,
metrics.Count,
)
return types.ErrEmptyMarketPriceUpdate
}

return nil
Expand Down
10 changes: 10 additions & 0 deletions protocol/daemons/pricefeed/client/types/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package types

import (
"errors"
)

var (
ErrEmptyMarketPriceUpdate = errors.New("Market price update has length of 0")
ErrUnableToUpdatePrices = errors.New("Unable to update prices")
)

0 comments on commit 108949f

Please sign in to comment.