forked from skip-mev/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlifecycle.go
128 lines (106 loc) · 3.12 KB
/
lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package oracle
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
"github.com/skip-mev/connect/v2/oracle/types"
)
// Start starts the (blocking) oracle. This will initialize the oracle
// with the relevant price and market mapper providers, and then start all of them.
func (o *OracleImpl) Start(ctx context.Context) error {
o.logger.Info("starting oracle")
o.running.Store(true)
defer o.running.Store(false)
if err := o.Init(ctx); err != nil {
o.logger.Error("failed to initialize oracle", zap.Error(err))
return err
}
// Set the main context for the oracle.
ctx, _ = o.setMainCtx(ctx)
// Start all price providers which have tickers.
for name, state := range o.priceProviders {
providerTickers, err := types.ProviderTickersFromMarketMap(name, o.marketMap)
if err != nil {
o.logger.Error("failed to create provider market map", zap.String("provider", name), zap.Error(err))
return err
}
// Update the provider's state.
_, err = o.UpdateProviderState(providerTickers, state)
if err != nil {
o.logger.Error("failed to update provider state", zap.String("provider", name), zap.Error(err))
return err
}
}
// Start the market map provider.
if o.mmProvider != nil {
o.logger.Info("starting marketmap provider")
o.wg.Add(1)
go func() {
defer o.wg.Done()
o.execProviderFn(ctx, o.mmProvider)
}()
o.wg.Add(1)
go func() {
defer o.wg.Done()
o.listenForMarketMapUpdates(ctx)
}()
}
// Start price fetch loop.
ticker := time.NewTicker(o.cfg.UpdateInterval)
defer ticker.Stop()
o.metrics.SetConnectBuildInfo()
for {
select {
case <-ctx.Done():
o.Stop()
o.logger.Info("oracle stopped via context")
return ctx.Err()
case <-ticker.C:
o.fetchAllPrices()
}
}
}
// Stop stops the oracle. This is a synchronous operation that will
// wait for all providers to exit.
func (o *OracleImpl) Stop() {
o.logger.Info("stopping oracle")
if _, cancel := o.getMainCtx(); cancel != nil {
o.logger.Info("cancelling context")
cancel()
}
o.logger.Info("waiting for routines to stop")
o.wg.Wait()
o.logger.Info("oracle exited successfully")
}
func (o *OracleImpl) IsRunning() bool { return o.running.Load() }
// execProviderFn starts a provider and recovers from any panics that occur.
func (o *OracleImpl) execProviderFn(
ctx context.Context,
p generalProvider,
) {
defer func() {
if r := recover(); r != nil {
o.logger.Error("recovered from panic", zap.Error(fmt.Errorf("%v", r)))
}
}()
if ctx == nil {
o.logger.Error("main context is nil; cannot start provider", zap.String("provider", p.Name()))
return
}
err := p.Start(ctx)
o.logger.Error("provider exited", zap.String("provider", p.Name()), zap.Error(err))
}
// getMainCtx returns the main context for the oracle.
func (o *OracleImpl) getMainCtx() (context.Context, context.CancelFunc) {
o.mut.Lock()
defer o.mut.Unlock()
return o.mainCtx, o.mainCancel
}
// setMainCtx sets the main context for the oracle.
func (o *OracleImpl) setMainCtx(ctx context.Context) (context.Context, context.CancelFunc) {
o.mut.Lock()
defer o.mut.Unlock()
o.mainCtx, o.mainCancel = context.WithCancel(ctx)
return o.mainCtx, o.mainCancel
}