Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvement, aggregate pre-samples of datapoints from AMM and AFQ. #44

Merged
merged 3 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var (
// for data reporting interface to collect oracle clients version.
const Version uint8 = 24

const PreSamplingRange = 6 // pre-sampling starts in 6s in advance

// MetricsNameSpace is the name space of oracle-server's metrics in influxDB.
const MetricsNameSpace = "autoracle."
const MetricsInterval = time.Second * 10
Expand Down
29 changes: 15 additions & 14 deletions oracle_server/oracle_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ var ForexCurrencies = map[string]struct{}{
}

var (
saltRange = new(big.Int).SetUint64(math.MaxInt64)
alertBalance = new(big.Int).SetUint64(2000000000000) // 2000 Gwei, 0.000002 Ether
invalidPrice = big.NewInt(0)
invalidSalt = big.NewInt(0)
tenSecsInterval = 10 * time.Second // ticker to check L2 connectivity and gc round data.
oneSecsInterval = 1 * time.Second // sampling interval during data pre-sampling period.
preSamplingRange = uint64(5) // pre-sampling starts in 5 blocks in advance.
bridgerSymbols = []string{"ATN-USDC", "NTN-USDC", "USDC-USD"} // used for value bridging to USD by USDC
saltRange = new(big.Int).SetUint64(math.MaxInt64)
alertBalance = new(big.Int).SetUint64(2000000000000) // 2000 Gwei, 0.000002 Ether
invalidPrice = big.NewInt(0)
invalidSalt = big.NewInt(0)
tenSecsInterval = 10 * time.Second // ticker to check L2 connectivity and gc round data.
oneSecsInterval = 1 * time.Second // sampling interval during data pre-sampling period.

bridgerSymbols = []string{"ATN-USDC", "NTN-USDC", "USDC-USD"} // used for value bridging to USD by USDC

numOfPlugins = metrics.GetOrRegisterGauge("oracle/plugins", nil)
oracleRound = metrics.GetOrRegisterGauge("oracle/round", nil)
Expand Down Expand Up @@ -349,9 +349,9 @@ func (os *OracleServer) initStates() (uint64, []string, uint64, error) {
return currentRound.Uint64(), symbols, votePeriod.Uint64(), nil
}

func (os *OracleServer) gcDataSamples() {
func (os *OracleServer) gcExpiredSamples() {
for _, plugin := range os.runningPlugins {
plugin.GCSamples()
plugin.GCExpiredSamples()
}
}

Expand Down Expand Up @@ -445,7 +445,7 @@ func (os *OracleServer) handlePreSampling(preSampleTS int64) error {
os.logger.Error("handle pre-sampling", "error", err.Error())
return err
}
if nextRoundHeight-curHeight > preSamplingRange {
if nextRoundHeight-curHeight > uint64(config.PreSamplingRange) { //nolint
return nil
}

Expand Down Expand Up @@ -780,7 +780,7 @@ func (os *OracleServer) aggregateBridgedPrice(srcSymbol string, target int64, us
func (os *OracleServer) aggregatePrice(s string, target int64) (*types.Price, error) {
var prices []decimal.Decimal
for _, plugin := range os.runningPlugins {
p, err := plugin.GetSample(s, target)
p, err := plugin.GetAggregatedPrice(s, target)
if err != nil {
continue
}
Expand Down Expand Up @@ -919,9 +919,10 @@ func (os *OracleServer) Start() {

err := os.handleRoundVote()
if err != nil {
continue
os.logger.Error("round voting failed", "error", err.Error())
}
os.gcDataSamples()
// at the end of each round, gc expired samples of per plugin.
os.gcExpiredSamples()
// after vote finished, gc useless symbols by protocol required symbols.
os.samplingSymbols = os.protocolSymbols
// attach the bridger symbols too once the sampling symbols is replaced by protocol symbols.
Expand Down
2 changes: 1 addition & 1 deletion oracle_server/oracle_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestOracleServer(t *testing.T) {
require.Equal(t, true, helpers.ResolveSimulatedPrice(NTNUSD).Equal(roundData.Prices[NTNUSD].Price))
require.Equal(t, true, helpers.ResolveSimulatedPrice(ATNUSD).Equal(roundData.Prices[ATNUSD].Price))
t.Log(roundData)
srv.gcDataSamples()
srv.gcExpiredSamples()
srv.runningPlugins["template_plugin"].Close()
})

Expand Down
57 changes: 36 additions & 21 deletions plugin_wrapper/plugin_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/shopspring/decimal"
"os"
"os/exec"
"strings"
Expand All @@ -17,9 +18,7 @@ import (
)

var (
// time to live in the cache for each single sample.
// todo: check if we can use it for AMM data aggregation?
sampleTTL = 1800 // 30 minutes
sampleTTL = 30 // 30s, the TTL of a sample before GC it.
)

// PluginWrapper is the unified wrapper for the interface of a plugin, it contains metadata of a corresponding
Expand Down Expand Up @@ -104,32 +103,46 @@ func (pw *PluginWrapper) AddSample(prices []types.Price, ts int64) {
}
}

func (pw *PluginWrapper) GetSample(symbol string, target int64) (types.Price, error) {
// GetAggregatedPrice returns the aggregated price computed from a set of pre-samples of a symbol by a specific plugin.
// For data points from AMM and AFQ markets, they are aggregated by the samples of the recent pre-samplings period,
// while for data points from CEX, the last sample of the pre-sampling period will be taken.
// The target is the timestamp on which the round block is mined, it's used to select datapoint from CEX data source.
func (pw *PluginWrapper) GetAggregatedPrice(symbol string, target int64) (types.Price, error) {
pw.lockSamples.RLock()
defer pw.lockSamples.RUnlock()
tsMap, ok := pw.samples[symbol]
if !ok {
return types.Price{}, types.ErrNoAvailablePrice
}

// Short-circuit if there's only one sample
// for AMMs or AFQs, get the average price of the collected samples of the recent pre-sampling period.
if pw.dataSrcType == types.SrcAMM || pw.dataSrcType == types.SrcAFQ {
if len(tsMap) < config.PreSamplingRange-1 {
pw.logger.Info("samples are not yet enough for aggregation", "symbol", symbol, "samples", len(tsMap))
return types.Price{}, types.ErrNoSufficientPrices
}

var prices []decimal.Decimal
for _, sample := range tsMap {
prices = append(prices, sample.Price)
}
avgPrice := decimal.Avg(prices[0], prices[1:]...)
pw.logger.Debug("num of samples being aggregated", "symbol", symbol, "samples", len(tsMap), "avgPrice", avgPrice.String())
return types.Price{Symbol: symbol, Price: avgPrice, Timestamp: target}, nil
}

// for CEX, we just need to take the last sample, short-circuit if there's only one sample of data points from CEX
if len(tsMap) == 1 {
for _, price := range tsMap {
return price, nil // Return the only sample
}
}

// for AMMs, always get the latest sample, right after the round event arrived.
if pw.dataSrcType == types.SrcAMM {
target = time.Now().Unix()
}

// If the target timestamp exists, return it
// Try to get the target TS sample, otherwise we search for the nearest measurement.
if p, ok := tsMap[target]; ok {
return p, nil
}

// Find and return the nearest sampled price to the timestamp.
var nearestKey int64
var minDistance int64 = math.MaxInt64
for ts := range tsMap {
Expand All @@ -147,7 +160,10 @@ func (pw *PluginWrapper) GetSample(symbol string, target int64) (types.Price, er
return tsMap[nearestKey], nil
}

func (pw *PluginWrapper) GCSamples() {
// GCExpiredSamples removes data points that are older than the TTL seconds of per plugin, it leaves recent samples
// together with next round's pre-samples as the input for the price aggregation for AMM, AFQ plugins. While, for CEX
// plugins, only the latest sample are kept without GC.
func (pw *PluginWrapper) GCExpiredSamples() {
pw.lockSamples.Lock()
defer pw.lockSamples.Unlock()

Expand All @@ -159,26 +175,25 @@ func (pw *PluginWrapper) GCSamples() {
continue // Skip if there are no samples for this symbol
}

// Remove samples older than 2 hours
// Remove samples older than TTL seconds.
for ts := range tsMap {
if ts < threshold {
delete(tsMap, ts)
}
}

// todo: if we use them for AMM data aggregation, we need to keep them.
// If there are still samples left, keep only the latest one
if len(tsMap) > 0 {
// For CEX plugins, keep only the latest sample.
if len(tsMap) > 0 && pw.dataSrcType == types.SrcCEX {
latestTimestamp := pw.latestTimestamps[symbol]

// Keep only the latest sample
for ts := range tsMap {
if ts != latestTimestamp {
delete(tsMap, ts)
}
}
} else {
// If no samples left, remove the symbol from the map
}

// If no samples left, remove the symbol from the map
if len(tsMap) == 0 {
delete(pw.samples, symbol)
delete(pw.latestTimestamps, symbol) // Also clean up the latest timestamp
}
Expand Down
16 changes: 8 additions & 8 deletions plugin_wrapper/plugin_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,48 +32,48 @@ func TestPluginWrapper(t *testing.T) {
}

target := now
price, err := p.GetSample("NTNGBP", target)
price, err := p.GetAggregatedPrice("NTNGBP", target)
require.NoError(t, err)
require.Equal(t, now, price.Timestamp)

// upper bound
target = now + 100
price, err = p.GetSample("NTNGBP", target)
price, err = p.GetAggregatedPrice("NTNGBP", target)
require.NoError(t, err)
require.Equal(t, now+59, price.Timestamp)

// lower bound
target = now - 1
price, err = p.GetSample("NTNGBP", target)
price, err = p.GetAggregatedPrice("NTNGBP", target)
require.NoError(t, err)
require.Equal(t, now, price.Timestamp)

// middle
target = now + 29
price, err = p.GetSample("NTNGBP", target)
price, err = p.GetAggregatedPrice("NTNGBP", target)
require.NoError(t, err)
require.Equal(t, now+28, price.Timestamp)

// middle
target = now + 33
price, err = p.GetSample("NTNGBP", target)
price, err = p.GetAggregatedPrice("NTNGBP", target)
require.NoError(t, err)
require.Equal(t, now+35, price.Timestamp)

// middle
target = now + 34
price, err = p.GetSample("NTNGBP", target)
price, err = p.GetAggregatedPrice("NTNGBP", target)
require.NoError(t, err)
require.Equal(t, now+35, price.Timestamp)

// middle
target = now + 35
price, err = p.GetSample("NTNGBP", target)
price, err = p.GetAggregatedPrice("NTNGBP", target)
require.NoError(t, err)
require.Equal(t, now+35, price.Timestamp)

// test gc, at least 1 sample is kept in the cache.
p.GCSamples()
p.GCExpiredSamples()
require.Equal(t, 1, len(p.samples))
})
}
2 changes: 1 addition & 1 deletion types/plugin_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// This file defines the autonity oracle plugins specification on top of go-plugin framework which leverage the localhost
// net rpc, or grpc for plugin integrations.

// DataSourceType is used by oracle server to pick up pre-samples with different strategy.
// DataSourceType is used by oracle server to aggregate pre-samples with different strategy.
type DataSourceType int

const (
Expand Down
11 changes: 6 additions & 5 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ var (
AutonityContractAddress = crypto.CreateAddress(Deployer, 0)
OracleContractAddress = crypto.CreateAddress(Deployer, 2)

ErrPeerOnSync = errors.New("l1 node is on peer sync")
ErrNoAvailablePrice = errors.New("no available prices collected yet")
ErrNoDataRound = errors.New("no data collected at current round")
ErrNoSymbolsObserved = errors.New("no symbols observed from oracle contract")
ErrMissingServiceKey = errors.New("the key to access the data source is missing, please check the plugin config")
ErrPeerOnSync = errors.New("l1 node is on peer sync")
ErrNoAvailablePrice = errors.New("no available prices collected yet")
ErrNoSufficientPrices = errors.New("no sufficient num of prices were collected yet")
ErrNoDataRound = errors.New("no data collected at current round")
ErrNoSymbolsObserved = errors.New("no symbols observed from oracle contract")
ErrMissingServiceKey = errors.New("the key to access the data source is missing, please check the plugin config")
)

// Price is the structure contains the exchange rate of a symbol with a timestamp at which the sampling happens.
Expand Down
Loading