From fb72f3a665c110474dcf8c0059284f21de9c2df7 Mon Sep 17 00:00:00 2001 From: Jason Chen Date: Fri, 3 Jan 2025 07:30:58 +0000 Subject: [PATCH 1/3] improvement, aggregate pre-samples of datapoints from AMM and AFQ. --- config/config.go | 1 + oracle_server/oracle_server.go | 29 +++++++------- oracle_server/oracle_server_test.go | 2 +- plugin_wrapper/plugin_wrapper.go | 57 +++++++++++++++++---------- plugin_wrapper/plugin_wrapper_test.go | 16 ++++---- types/plugin_spec.go | 2 +- types/types.go | 11 +++--- 7 files changed, 68 insertions(+), 50 deletions(-) diff --git a/config/config.go b/config/config.go index 20718d5..13d81fb 100644 --- a/config/config.go +++ b/config/config.go @@ -21,6 +21,7 @@ var ( defaultProfileDir = "." defaultVoteBufferAfterPenalty = uint64(3600 * 24) // The buffering time window in blocks to continue vote after the last penalty event. + PreSamplingRange = 5 // pre-sampling starts in 5s in advance. ConfidenceStrategyLinear = 0 ConfidenceStrategyFixed = 1 defaultConfidenceStrategy = ConfidenceStrategyLinear // 0: linear, 1: fixed. diff --git a/oracle_server/oracle_server.go b/oracle_server/oracle_server.go index 24bcc0d..d55e86d 100644 --- a/oracle_server/oracle_server.go +++ b/oracle_server/oracle_server.go @@ -39,14 +39,14 @@ var ForexCurrencies = map[string]struct{}{ } var ( - saltRange = new(big.Int).SetUint64(math.MaxInt64) - alertBalance = new(big.Int).SetUint64(2000000000000) // 2000 Gwei, 0.000002 Ether - invalidPrice = big.NewInt(0) - invalidSalt = big.NewInt(0) - tenSecsInterval = 10 * time.Second // ticker to check L2 connectivity and gc round data. - oneSecsInterval = 1 * time.Second // sampling interval during data pre-sampling period. - preSamplingRange = uint64(5) // pre-sampling starts in 5 blocks in advance. - bridgerSymbols = []string{"ATN-USDC", "NTN-USDC", "USDC-USD"} // used for value bridging to USD by USDC + saltRange = new(big.Int).SetUint64(math.MaxInt64) + alertBalance = new(big.Int).SetUint64(2000000000000) // 2000 Gwei, 0.000002 Ether + invalidPrice = big.NewInt(0) + invalidSalt = big.NewInt(0) + tenSecsInterval = 10 * time.Second // ticker to check L2 connectivity and gc round data. + oneSecsInterval = 1 * time.Second // sampling interval during data pre-sampling period. + + bridgerSymbols = []string{"ATN-USDC", "NTN-USDC", "USDC-USD"} // used for value bridging to USD by USDC numOfPlugins = metrics.GetOrRegisterGauge("oracle/plugins", nil) oracleRound = metrics.GetOrRegisterGauge("oracle/round", nil) @@ -349,9 +349,9 @@ func (os *OracleServer) initStates() (uint64, []string, uint64, error) { return currentRound.Uint64(), symbols, votePeriod.Uint64(), nil } -func (os *OracleServer) gcDataSamples() { +func (os *OracleServer) gcExpiredSamples() { for _, plugin := range os.runningPlugins { - plugin.GCSamples() + plugin.GCExpiredSamples() } } @@ -445,7 +445,7 @@ func (os *OracleServer) handlePreSampling(preSampleTS int64) error { os.logger.Error("handle pre-sampling", "error", err.Error()) return err } - if nextRoundHeight-curHeight > preSamplingRange { + if nextRoundHeight-curHeight > uint64(config.PreSamplingRange) { return nil } @@ -780,7 +780,7 @@ func (os *OracleServer) aggregateBridgedPrice(srcSymbol string, target int64, us func (os *OracleServer) aggregatePrice(s string, target int64) (*types.Price, error) { var prices []decimal.Decimal for _, plugin := range os.runningPlugins { - p, err := plugin.GetSample(s, target) + p, err := plugin.GetSampledPrice(s, target) if err != nil { continue } @@ -919,9 +919,10 @@ func (os *OracleServer) Start() { err := os.handleRoundVote() if err != nil { - continue + os.logger.Error("round voting failed", "error", err.Error()) } - os.gcDataSamples() + // at the end of each round, gc expired samples of per plugin. + os.gcExpiredSamples() // after vote finished, gc useless symbols by protocol required symbols. os.samplingSymbols = os.protocolSymbols // attach the bridger symbols too once the sampling symbols is replaced by protocol symbols. diff --git a/oracle_server/oracle_server_test.go b/oracle_server/oracle_server_test.go index 1e8ba2b..f57f6b6 100644 --- a/oracle_server/oracle_server_test.go +++ b/oracle_server/oracle_server_test.go @@ -167,7 +167,7 @@ func TestOracleServer(t *testing.T) { require.Equal(t, true, helpers.ResolveSimulatedPrice(NTNUSD).Equal(roundData.Prices[NTNUSD].Price)) require.Equal(t, true, helpers.ResolveSimulatedPrice(ATNUSD).Equal(roundData.Prices[ATNUSD].Price)) t.Log(roundData) - srv.gcDataSamples() + srv.gcExpiredSamples() srv.runningPlugins["template_plugin"].Close() }) diff --git a/plugin_wrapper/plugin_wrapper.go b/plugin_wrapper/plugin_wrapper.go index bb31861..b745a79 100644 --- a/plugin_wrapper/plugin_wrapper.go +++ b/plugin_wrapper/plugin_wrapper.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" + "github.com/shopspring/decimal" "os" "os/exec" "strings" @@ -17,9 +18,7 @@ import ( ) var ( - // time to live in the cache for each single sample. - // todo: check if we can use it for AMM data aggregation? - sampleTTL = 1800 // 30 minutes + sampleTTL = 30 // 30s, the TTL of a sample before GC it. ) // PluginWrapper is the unified wrapper for the interface of a plugin, it contains metadata of a corresponding @@ -104,7 +103,11 @@ func (pw *PluginWrapper) AddSample(prices []types.Price, ts int64) { } } -func (pw *PluginWrapper) GetSample(symbol string, target int64) (types.Price, error) { +// GetSampledPrice returns the aggregated price computed from a set of pre-samples of a symbol by a specific plugin. +// For data points from AMM and AFQ markets, they are aggregated by the samples of the last pre-samplings period, +// while for data points from CEX, the last sample of the pre-sampling period will be taken. +// The target is the timestamp on which the round block is mined, it's used to select datapoint from CEX data source. +func (pw *PluginWrapper) GetSampledPrice(symbol string, target int64) (types.Price, error) { pw.lockSamples.RLock() defer pw.lockSamples.RUnlock() tsMap, ok := pw.samples[symbol] @@ -112,24 +115,34 @@ func (pw *PluginWrapper) GetSample(symbol string, target int64) (types.Price, er return types.Price{}, types.ErrNoAvailablePrice } - // Short-circuit if there's only one sample + // for AMMs or AFQs, get the average price of the collected samples of the recent pre-sampling period. + if pw.dataSrcType == types.SrcAMM || pw.dataSrcType == types.SrcAFQ { + if len(tsMap) < config.PreSamplingRange { + pw.logger.Debug("samples are not enough for price aggregation", "symbol", symbol, "samples", len(tsMap)) + return types.Price{}, types.ErrNoSufficientPrices + } + + var prices []decimal.Decimal + for _, sample := range tsMap { + prices = append(prices, sample.Price) + } + avgPrice := decimal.Avg(prices[0], prices[1:]...) + + return types.Price{Symbol: symbol, Price: avgPrice, Timestamp: target}, nil + } + + // for CEX, we just need to take the last sample, short-circuit if there's only one sample of data points from CEX if len(tsMap) == 1 { for _, price := range tsMap { return price, nil // Return the only sample } } - // for AMMs, always get the latest sample, right after the round event arrived. - if pw.dataSrcType == types.SrcAMM { - target = time.Now().Unix() - } - - // If the target timestamp exists, return it + // Try to get the target TS sample, otherwise we search for the nearest measurement. if p, ok := tsMap[target]; ok { return p, nil } - // Find and return the nearest sampled price to the timestamp. var nearestKey int64 var minDistance int64 = math.MaxInt64 for ts := range tsMap { @@ -147,7 +160,10 @@ func (pw *PluginWrapper) GetSample(symbol string, target int64) (types.Price, er return tsMap[nearestKey], nil } -func (pw *PluginWrapper) GCSamples() { +// GCExpiredSamples removes data points that are older than the TTL seconds of per plugin, it leaves recent samples +// together with next round's pre-samples as the input for the price aggregation for AMM, AFQ plugins. While, for CEX +// plugins, only the latest sample are kept without GC. +func (pw *PluginWrapper) GCExpiredSamples() { pw.lockSamples.Lock() defer pw.lockSamples.Unlock() @@ -159,26 +175,25 @@ func (pw *PluginWrapper) GCSamples() { continue // Skip if there are no samples for this symbol } - // Remove samples older than 2 hours + // Remove samples older than TTL seconds. for ts := range tsMap { if ts < threshold { delete(tsMap, ts) } } - // todo: if we use them for AMM data aggregation, we need to keep them. - // If there are still samples left, keep only the latest one - if len(tsMap) > 0 { + // For CEX plugins, keep only the latest sample. + if len(tsMap) > 0 && pw.dataSrcType == types.SrcCEX { latestTimestamp := pw.latestTimestamps[symbol] - - // Keep only the latest sample for ts := range tsMap { if ts != latestTimestamp { delete(tsMap, ts) } } - } else { - // If no samples left, remove the symbol from the map + } + + // If no samples left, remove the symbol from the map + if len(tsMap) == 0 { delete(pw.samples, symbol) delete(pw.latestTimestamps, symbol) // Also clean up the latest timestamp } diff --git a/plugin_wrapper/plugin_wrapper_test.go b/plugin_wrapper/plugin_wrapper_test.go index d0fa572..cc0332f 100644 --- a/plugin_wrapper/plugin_wrapper_test.go +++ b/plugin_wrapper/plugin_wrapper_test.go @@ -32,48 +32,48 @@ func TestPluginWrapper(t *testing.T) { } target := now - price, err := p.GetSample("NTNGBP", target) + price, err := p.GetSampledPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now, price.Timestamp) // upper bound target = now + 100 - price, err = p.GetSample("NTNGBP", target) + price, err = p.GetSampledPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+59, price.Timestamp) // lower bound target = now - 1 - price, err = p.GetSample("NTNGBP", target) + price, err = p.GetSampledPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now, price.Timestamp) // middle target = now + 29 - price, err = p.GetSample("NTNGBP", target) + price, err = p.GetSampledPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+28, price.Timestamp) // middle target = now + 33 - price, err = p.GetSample("NTNGBP", target) + price, err = p.GetSampledPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+35, price.Timestamp) // middle target = now + 34 - price, err = p.GetSample("NTNGBP", target) + price, err = p.GetSampledPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+35, price.Timestamp) // middle target = now + 35 - price, err = p.GetSample("NTNGBP", target) + price, err = p.GetSampledPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+35, price.Timestamp) // test gc, at least 1 sample is kept in the cache. - p.GCSamples() + p.GCExpiredSamples() require.Equal(t, 1, len(p.samples)) }) } diff --git a/types/plugin_spec.go b/types/plugin_spec.go index f114f29..85b5ea7 100644 --- a/types/plugin_spec.go +++ b/types/plugin_spec.go @@ -8,7 +8,7 @@ import ( // This file defines the autonity oracle plugins specification on top of go-plugin framework which leverage the localhost // net rpc, or grpc for plugin integrations. -// DataSourceType is used by oracle server to pick up pre-samples with different strategy. +// DataSourceType is used by oracle server to aggregate pre-samples with different strategy. type DataSourceType int const ( diff --git a/types/types.go b/types/types.go index 7369422..6b3110f 100644 --- a/types/types.go +++ b/types/types.go @@ -18,11 +18,12 @@ var ( AutonityContractAddress = crypto.CreateAddress(Deployer, 0) OracleContractAddress = crypto.CreateAddress(Deployer, 2) - ErrPeerOnSync = errors.New("l1 node is on peer sync") - ErrNoAvailablePrice = errors.New("no available prices collected yet") - ErrNoDataRound = errors.New("no data collected at current round") - ErrNoSymbolsObserved = errors.New("no symbols observed from oracle contract") - ErrMissingServiceKey = errors.New("the key to access the data source is missing, please check the plugin config") + ErrPeerOnSync = errors.New("l1 node is on peer sync") + ErrNoAvailablePrice = errors.New("no available prices collected yet") + ErrNoSufficientPrices = errors.New("no sufficient num of prices were collected yet") + ErrNoDataRound = errors.New("no data collected at current round") + ErrNoSymbolsObserved = errors.New("no symbols observed from oracle contract") + ErrMissingServiceKey = errors.New("the key to access the data source is missing, please check the plugin config") ) // Price is the structure contains the exchange rate of a symbol with a timestamp at which the sampling happens. From 88f1c111a672fdcc9fa0c24a990769d24b1c3863 Mon Sep 17 00:00:00 2001 From: Jason Chen Date: Fri, 3 Jan 2025 08:12:37 +0000 Subject: [PATCH 2/3] refine, loggings. --- config/config.go | 2 +- oracle_server/oracle_server.go | 2 +- plugin_wrapper/plugin_wrapper.go | 12 ++++++------ plugin_wrapper/plugin_wrapper_test.go | 14 +++++++------- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/config/config.go b/config/config.go index 13d81fb..ff60841 100644 --- a/config/config.go +++ b/config/config.go @@ -21,7 +21,7 @@ var ( defaultProfileDir = "." defaultVoteBufferAfterPenalty = uint64(3600 * 24) // The buffering time window in blocks to continue vote after the last penalty event. - PreSamplingRange = 5 // pre-sampling starts in 5s in advance. + PreSamplingRange = 6 // pre-sampling starts in 6s in advance. ConfidenceStrategyLinear = 0 ConfidenceStrategyFixed = 1 defaultConfidenceStrategy = ConfidenceStrategyLinear // 0: linear, 1: fixed. diff --git a/oracle_server/oracle_server.go b/oracle_server/oracle_server.go index d55e86d..4f143ad 100644 --- a/oracle_server/oracle_server.go +++ b/oracle_server/oracle_server.go @@ -780,7 +780,7 @@ func (os *OracleServer) aggregateBridgedPrice(srcSymbol string, target int64, us func (os *OracleServer) aggregatePrice(s string, target int64) (*types.Price, error) { var prices []decimal.Decimal for _, plugin := range os.runningPlugins { - p, err := plugin.GetSampledPrice(s, target) + p, err := plugin.GetAggregatedPrice(s, target) if err != nil { continue } diff --git a/plugin_wrapper/plugin_wrapper.go b/plugin_wrapper/plugin_wrapper.go index b745a79..da47565 100644 --- a/plugin_wrapper/plugin_wrapper.go +++ b/plugin_wrapper/plugin_wrapper.go @@ -103,11 +103,11 @@ func (pw *PluginWrapper) AddSample(prices []types.Price, ts int64) { } } -// GetSampledPrice returns the aggregated price computed from a set of pre-samples of a symbol by a specific plugin. -// For data points from AMM and AFQ markets, they are aggregated by the samples of the last pre-samplings period, +// GetAggregatedPrice returns the aggregated price computed from a set of pre-samples of a symbol by a specific plugin. +// For data points from AMM and AFQ markets, they are aggregated by the samples of the recent pre-samplings period, // while for data points from CEX, the last sample of the pre-sampling period will be taken. // The target is the timestamp on which the round block is mined, it's used to select datapoint from CEX data source. -func (pw *PluginWrapper) GetSampledPrice(symbol string, target int64) (types.Price, error) { +func (pw *PluginWrapper) GetAggregatedPrice(symbol string, target int64) (types.Price, error) { pw.lockSamples.RLock() defer pw.lockSamples.RUnlock() tsMap, ok := pw.samples[symbol] @@ -117,8 +117,8 @@ func (pw *PluginWrapper) GetSampledPrice(symbol string, target int64) (types.Pri // for AMMs or AFQs, get the average price of the collected samples of the recent pre-sampling period. if pw.dataSrcType == types.SrcAMM || pw.dataSrcType == types.SrcAFQ { - if len(tsMap) < config.PreSamplingRange { - pw.logger.Debug("samples are not enough for price aggregation", "symbol", symbol, "samples", len(tsMap)) + if len(tsMap) < config.PreSamplingRange-1 { + pw.logger.Info("samples are not yet enough for aggregation", "symbol", symbol, "samples", len(tsMap)) return types.Price{}, types.ErrNoSufficientPrices } @@ -127,7 +127,7 @@ func (pw *PluginWrapper) GetSampledPrice(symbol string, target int64) (types.Pri prices = append(prices, sample.Price) } avgPrice := decimal.Avg(prices[0], prices[1:]...) - + pw.logger.Debug("num of samples being aggregated", "symbol", symbol, "samples", len(tsMap), "avgPrice", avgPrice.String()) return types.Price{Symbol: symbol, Price: avgPrice, Timestamp: target}, nil } diff --git a/plugin_wrapper/plugin_wrapper_test.go b/plugin_wrapper/plugin_wrapper_test.go index cc0332f..81e0850 100644 --- a/plugin_wrapper/plugin_wrapper_test.go +++ b/plugin_wrapper/plugin_wrapper_test.go @@ -32,43 +32,43 @@ func TestPluginWrapper(t *testing.T) { } target := now - price, err := p.GetSampledPrice("NTNGBP", target) + price, err := p.GetAggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now, price.Timestamp) // upper bound target = now + 100 - price, err = p.GetSampledPrice("NTNGBP", target) + price, err = p.GetAggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+59, price.Timestamp) // lower bound target = now - 1 - price, err = p.GetSampledPrice("NTNGBP", target) + price, err = p.GetAggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now, price.Timestamp) // middle target = now + 29 - price, err = p.GetSampledPrice("NTNGBP", target) + price, err = p.GetAggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+28, price.Timestamp) // middle target = now + 33 - price, err = p.GetSampledPrice("NTNGBP", target) + price, err = p.GetAggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+35, price.Timestamp) // middle target = now + 34 - price, err = p.GetSampledPrice("NTNGBP", target) + price, err = p.GetAggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+35, price.Timestamp) // middle target = now + 35 - price, err = p.GetSampledPrice("NTNGBP", target) + price, err = p.GetAggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+35, price.Timestamp) From 13de0c7afe52691c69563d0835dda783d7dc0d5a Mon Sep 17 00:00:00 2001 From: Jason Chen Date: Fri, 3 Jan 2025 08:16:16 +0000 Subject: [PATCH 3/3] lint, no lint error. --- config/config.go | 3 ++- oracle_server/oracle_server.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index ff60841..555786d 100644 --- a/config/config.go +++ b/config/config.go @@ -21,7 +21,6 @@ var ( defaultProfileDir = "." defaultVoteBufferAfterPenalty = uint64(3600 * 24) // The buffering time window in blocks to continue vote after the last penalty event. - PreSamplingRange = 6 // pre-sampling starts in 6s in advance. ConfidenceStrategyLinear = 0 ConfidenceStrategyFixed = 1 defaultConfidenceStrategy = ConfidenceStrategyLinear // 0: linear, 1: fixed. @@ -31,6 +30,8 @@ var ( // for data reporting interface to collect oracle clients version. const Version uint8 = 24 +const PreSamplingRange = 6 // pre-sampling starts in 6s in advance + // MetricsNameSpace is the name space of oracle-server's metrics in influxDB. const MetricsNameSpace = "autoracle." const MetricsInterval = time.Second * 10 diff --git a/oracle_server/oracle_server.go b/oracle_server/oracle_server.go index 4f143ad..ea61040 100644 --- a/oracle_server/oracle_server.go +++ b/oracle_server/oracle_server.go @@ -445,7 +445,7 @@ func (os *OracleServer) handlePreSampling(preSampleTS int64) error { os.logger.Error("handle pre-sampling", "error", err.Error()) return err } - if nextRoundHeight-curHeight > uint64(config.PreSamplingRange) { + if nextRoundHeight-curHeight > uint64(config.PreSamplingRange) { //nolint return nil }