From 05dd747e1ff65e48dc653983920fe0d24103283b Mon Sep 17 00:00:00 2001 From: Jason Chen <54585152+Jason-Zhangxin-Chen@users.noreply.github.com> Date: Wed, 8 Jan 2025 14:16:37 +0800 Subject: [PATCH] feature, VWAP aggregation for AMM and AFQ market data points. (#45) * feature, VWAP aggregation for AMM and AFQ market data points. * lint, fix lint error. * test, repair test. * improvement, do the VWAP for recent samples. * refine, code refine and add todo for confidence adjustment. * refine, code refine and doc update for VWAP. * test, fix test. * improvement, confidence adjusted for historic round data. * improvement, refine for oracle server. * lint, no lint. --- config/config_for_test.yml | 4 +- config/oracle_config.yml | 17 +- helpers/helpers.go | 38 ++ helpers/helpers_test.go | 82 ++++ oracle_server/oracle_server.go | 87 +++- plugin_wrapper/plugin_wrapper.go | 35 +- plugin_wrapper/plugin_wrapper_test.go | 16 +- plugins/binance/binance.go | 4 + plugins/common/common.go | 12 +- plugins/crypto_airswap/crypto_airswap.go | 108 ++--- plugins/crypto_coinbase/crypto_coinbase.go | 1 + plugins/crypto_coingecko/crypto_coingecko.go | 1 + plugins/crypto_kraken/crypto_kraken.go | 1 + plugins/crypto_uniswap/common/client.go | 426 ++++++++++++++++-- plugins/crypto_uniswap/common/client_test.go | 2 + .../uniswap_usdcx/crypto_uniswap_usdcx.go | 4 + .../forex_currencyfreaks.go | 1 + .../forex_currencylayer.go | 1 + .../forex_exchangerate/forex_exchangerate.go | 1 + .../forex_openexchange/forex_openexchange.go | 1 + plugins/outlier_tester/outlier_tester.go | 13 +- plugins/pcgc_cax/pcgc_cax.go | 1 + plugins/simulator_plugin/common/client.go | 5 + plugins/template_plugin/template_plugin.go | 13 +- types/types.go | 15 +- 25 files changed, 738 insertions(+), 151 deletions(-) diff --git a/config/config_for_test.yml b/config/config_for_test.yml index 738f957..54cb0f7 100644 --- a/config/config_for_test.yml +++ b/config/config_for_test.yml @@ -57,7 +57,7 @@ confidenceStrategy: 0 # 0: linear, 1: fixed # type PluginConfig struct { # Name string `json:"name" yaml:"name"` // the name of the plugin binary. # Key string `json:"key" yaml:"key"` // the API key granted by your data provider to access their data API. -# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http or https. +# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http, https, ws or wss. # Endpoint string `json:"endpoint" yaml:"endpoint"` // the data service endpoint url of the data provider. # Timeout int `json:"timeout" yaml:"timeout"` // the timeout period in seconds that an API request is lasting for. # DataUpdateInterval int `json:"refresh" yaml:"refresh"` // the interval in seconds to fetch data from data provider due to rate limit. @@ -87,7 +87,7 @@ pluginConfigs: refresh: 3600 # optional, buffered data within 3600s, recommended for API rate limited data source. # Un-comment below lines to config the RPC endpoint of a Piccadilly Network Full Node for your AMM plugin which sources ATN & NTN market data from an on-chain AMM. - name: crypto_uniswap - scheme: "wss" # Available values are: "http", "https", "ws" or "wss", default value is "wss". + scheme: "wss" # Available values are: "ws" or "wss", default value is "wss". endpoint: "rpc-internal-1.piccadilly.autonity.org/ws" # The default URL might not be stable for public usage, we recommend you to change it with your validator node's RPC endpoint. #Enable the metric collection for oracle server, supported TS-DB engines are influxDB v1 and v2. diff --git a/config/oracle_config.yml b/config/oracle_config.yml index 0b5ccd2..68d1800 100644 --- a/config/oracle_config.yml +++ b/config/oracle_config.yml @@ -35,12 +35,13 @@ confidenceStrategy: 0 # 0: linear, 1: fixed # # The crypto data plugins are used to fetch market prices for the crypto currency pairs: ATN-USDC, NTN-USDC, NTN-ATN and # USDC-USD. USDC liquidity is bridged to the Autonity public testnet from the Polygon Amoy testnet via a bridge service. -# Out-the-box plugins for collecting ATN-USDC and NTN-USDC market data are available for UniSwap V2 and AirSwap protocols. NTN-ATN market price is derived from -# that market data, and USDC pricing is converted to USD. ATN-NTN, ATN-USD, and NTN-USD prices are then submitted on-chain. -# To retrieve ATN and NTN prices, put the `crypto_uniswap` plugin and `crypto_airswap` plugin in your plugin directory. -# Oracle server can then discover and load them. Configuring the `crypto_uniswap` and `crypto_airswap` plugin does not -# require an API key, it is an open and free data source of a standard EVM RPC websocket service endpoint. The -# end user can connect to specific EVM RPC endpoint base on the blockchain which hosts the uniswap and airswap contracts. +# Out-the-box plugins for collecting ATN-USDC and NTN-USDC market data are available for UniSwap V2 and AirSwap protocols. +# NTN-ATN market price is derived from that market data, and USDC pricing is converted to USD. ATN-NTN, ATN-USD, and NTN-USD +# prices are then submitted on-chain. To retrieve ATN and NTN prices, put the `crypto_uniswap` plugin and `crypto_airswap` +# plugin in your plugin directory. Oracle server can then discover and load them. Configuring the `crypto_uniswap` and +#`crypto_airswap` plugin does not require an API key, it is an open and free data source of a standard EVM RPC websocket +# service endpoint. The end user can connect to specific EVM RPC endpoint base on the blockchain which hosts the uniswap +# and airswap contracts. # USDC-USD prices are required by the protocol to convert the ATN-USDC and NTN-USDC to ATN-USD and NTN-USD. This enables # the reporting of ATN and NTN prices in USD to the ASM. Three plugins are implemented to source the USDC-USD datapoint @@ -57,7 +58,7 @@ confidenceStrategy: 0 # 0: linear, 1: fixed # type PluginConfig struct { # Name string `json:"name" yaml:"name"` // the name of the plugin binary. # Key string `json:"key" yaml:"key"` // the API key granted by your data provider to access their data API. -# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http or https. +# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http, https, ws or wss. # Endpoint string `json:"endpoint" yaml:"endpoint"` // the data service endpoint url of the data provider. # Timeout int `json:"timeout" yaml:"timeout"` // the timeout period in seconds that an API request is lasting for. # DataUpdateInterval int `json:"refresh" yaml:"refresh"` // the interval in seconds to fetch data from data provider due to rate limit. @@ -87,7 +88,7 @@ confidenceStrategy: 0 # 0: linear, 1: fixed # refresh: 3600 # optional, buffered data within 3600s, recommended for API rate limited data source. # Un-comment below lines to config the RPC endpoint of a Piccadilly Network Full Node for your AMM plugin which sources ATN & NTN market data from an on-chain AMM. # - name: crypto_uniswap -# scheme: "wss" # Available values are: "http", "https", "ws" or "wss", default value is "wss". +# scheme: "wss" # Only websocket please, available values are: "ws" or "wss", default value is "wss" for uniswap plugins. # endpoint: "rpc-internal-1.piccadilly.autonity.org/ws" # The default URL might not be stable for public usage, we recommend you to change it with your validator node's RPC endpoint. #Enable the metric collection for oracle server, supported TS-DB engines are influxDB v1 and v2. diff --git a/helpers/helpers.go b/helpers/helpers.go index e64fdc2..8e8523e 100644 --- a/helpers/helpers.go +++ b/helpers/helpers.go @@ -2,11 +2,13 @@ package helpers import ( "encoding/csv" + "errors" "fmt" "github.com/shopspring/decimal" "io" "io/fs" "io/ioutil" //nolint + "math/big" "os" "sort" ) @@ -98,6 +100,42 @@ func Median(prices []decimal.Decimal) (decimal.Decimal, error) { return prices[l/2], nil } +// VWAP computes the volume weighted average price for the input prices with their corresponding volumes +func VWAP(prices []decimal.Decimal, volumes []*big.Int) (decimal.Decimal, *big.Int, error) { + if len(prices) == 0 || len(volumes) == 0 || len(prices) != len(volumes) { + return decimal.Zero, nil, errors.New("prices and volumes must be of the same non-zero length") + } + + var totalWeightedPrice decimal.Decimal + totalVolume := big.NewInt(0) + highestVol := new(big.Int).Set(volumes[0]) + + for i := range prices { + if volumes[i].Cmp(highestVol) > 0 { + highestVol.Set(volumes[i]) + } + + // Convert volume to decimal.Decimal + volumeDecimal := decimal.NewFromBigInt(volumes[i], 0) + + // Calculate weighted price for current price and volume + weightedPrice := prices[i].Mul(volumeDecimal) // Use decimal.Decimal for precision + totalWeightedPrice = totalWeightedPrice.Add(weightedPrice) + + // Accumulate total volume + totalVolume.Add(totalVolume, volumes[i]) + } + + // Avoid division by zero + if totalVolume.Cmp(big.NewInt(0)) == 0 { + return decimal.Zero, nil, errors.New("total volume cannot be zero") + } + + // Calculate VWAP + vwap := totalWeightedPrice.Div(decimal.NewFromBigInt(totalVolume, 0)) + return vwap, highestVol, nil +} + // ListPlugins returns a mapping of file names to fs.FileInfo for executable files in the specified path. func ListPlugins(path string) (map[string]fs.FileInfo, error) { plugins := make(map[string]fs.FileInfo) diff --git a/helpers/helpers_test.go b/helpers/helpers_test.go index 9f8bb36..a381563 100644 --- a/helpers/helpers_test.go +++ b/helpers/helpers_test.go @@ -3,6 +3,7 @@ package helpers import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/require" + "math/big" "testing" ) @@ -71,3 +72,84 @@ func TestMedian(t *testing.T) { require.Error(t, err) }) } + +func TestVWAP(t *testing.T) { + tests := []struct { + prices []decimal.Decimal + volumes []*big.Int + expectedVWAP decimal.Decimal + expectedHighestVol *big.Int + expectError bool + }{ + { + prices: []decimal.Decimal{ + decimal.NewFromFloat(100.0), + decimal.NewFromFloat(200.0), + decimal.NewFromFloat(100.0), + }, + volumes: []*big.Int{ + big.NewInt(10), + big.NewInt(20), + big.NewInt(10), + }, + expectedVWAP: decimal.NewFromFloatWithExponent(150, 0), // (100*10 + 200*20 + 150*30) / (10 + 20 + 30) + expectedHighestVol: big.NewInt(20), + expectError: false, + }, + { + prices: []decimal.Decimal{ + decimal.NewFromFloat(50.0), + decimal.NewFromFloat(75.0), + }, + volumes: []*big.Int{ + big.NewInt(5), + big.NewInt(0), + }, + expectedVWAP: decimal.NewFromFloatWithExponent(50, 0), + expectedHighestVol: big.NewInt(5), + }, + { + prices: []decimal.Decimal{}, + volumes: []*big.Int{}, + expectedVWAP: decimal.Zero, + expectedHighestVol: nil, + expectError: true, + }, + { + prices: []decimal.Decimal{ + decimal.NewFromFloat(100.0), + }, + volumes: []*big.Int{ + big.NewInt(10), + big.NewInt(20), + }, + expectedVWAP: decimal.Zero, + expectedHighestVol: nil, + expectError: true, + }, + } + + for _, test := range tests { + vwap, highestVol, err := VWAP(test.prices, test.volumes) + + if test.expectError { + if err == nil { + t.Errorf("Expected an error for prices: %v and volumes: %v, but got none", test.prices, test.volumes) + } + continue + } + + if err != nil { + t.Errorf("Unexpected error for prices: %v and volumes: %v: %v", test.prices, test.volumes, err) + continue + } + + if !vwap.Equal(test.expectedVWAP) { + t.Errorf("For prices: %v and volumes: %v, expected VWAP: %s, but got: %s", test.prices, test.volumes, test.expectedVWAP.String(), vwap.String()) + } + + if highestVol.Cmp(test.expectedHighestVol) != 0 { + t.Errorf("For prices: %v and volumes: %v, expected highest volume: %s, but got: %s", test.prices, test.volumes, test.expectedHighestVol.String(), highestVol.String()) + } + } +} diff --git a/oracle_server/oracle_server.go b/oracle_server/oracle_server.go index ea61040..af316fc 100644 --- a/oracle_server/oracle_server.go +++ b/oracle_server/oracle_server.go @@ -777,42 +777,96 @@ func (os *OracleServer) aggregateBridgedPrice(srcSymbol string, target int64, us return p, nil } +// aggregatePrice takes the symbol's aggregated data points from all the supported plugins, if there are multiple +// markets' datapoint, it will do a final VWAP aggregation to form the final reporting value. func (os *OracleServer) aggregatePrice(s string, target int64) (*types.Price, error) { var prices []decimal.Decimal + var volumes []*big.Int for _, plugin := range os.runningPlugins { - p, err := plugin.GetAggregatedPrice(s, target) + p, err := plugin.AggregatedPrice(s, target) if err != nil { continue } prices = append(prices, p.Price) + volumes = append(volumes, p.Volume) } if len(prices) == 0 { - return nil, types.ErrNoDataRound + historicRoundPrice, err := os.queryHistoricRoundPrice(s) + if err != nil { + return nil, err + } + + return confidenceAdjustedPrice(&historicRoundPrice, target) } // compute confidence of the symbol from the num of plugins' samples of it. confidence := ComputeConfidence(s, len(prices), os.conf.ConfidenceStrategy) - price := &types.Price{ Timestamp: target, Price: prices[0], + Volume: volumes[0], Symbol: s, Confidence: confidence, } - // we have multiple provider provide prices for this symbol, we have to aggregate it. - if len(prices) > 1 { + _, isForex := ForexCurrencies[s] + + // we have multiple markets' data for this forex symbol, update the price with median value. + if len(prices) > 1 && isForex { p, err := helpers.Median(prices) if err != nil { return nil, err } price.Price = p + price.Volume = types.DefaultVolume + return price, nil + } + + // we have multiple markets' data for this crypto symbol, update the price with VWAP. + if len(prices) > 1 && !isForex { + p, vol, err := helpers.VWAP(prices, volumes) + if err != nil { + return nil, err + } + price.Price = p + price.Volume = vol } return price, nil } +// queryHistoricRoundPrice queries the last available price for a given symbol from the historic rounds. +func (os *OracleServer) queryHistoricRoundPrice(symbol string) (types.Price, error) { + + if len(os.roundData) == 0 { + return types.Price{}, types.ErrNoDataRound + } + + numOfRounds := len(os.roundData) + // Iterate from the current round backward + for i := 0; i < numOfRounds; i++ { + roundID := os.curRound - uint64(i) - 1 //nolint + // Get the round data for the current round ID + roundData, exists := os.roundData[roundID] + if !exists { + continue + } + + if roundData == nil { + continue + } + + // Check if the symbol exists in the Prices map + if price, found := roundData.Prices[symbol]; found { + return price, nil + } + } + + // If no price was found after checking all rounds, return an error + return types.Price{}, types.ErrNoDataRound +} + func (os *OracleServer) samplePrice(symbols []string, ts int64) { if os.lastSampledTS == ts { return @@ -1098,3 +1152,26 @@ func ComputeConfidence(symbol string, numOfSamples, strategy int) uint8 { return uint8(weight) //nolint } + +func confidenceAdjustedPrice(historicRoundPrice *types.Price, target int64) (*types.Price, error) { + // by according to the spreading of price timestamp from the target timestamp, + // we reduce the confidence of the price, set the lowest confidence as 1. + // Calculate the time difference between the target timestamp and the historic price timestamp + timeDifference := target - historicRoundPrice.Timestamp + + var reducedConfidence uint8 + if timeDifference < 60 { // Less than 1 minute + reducedConfidence = historicRoundPrice.Confidence // Keep original confidence + } else if timeDifference < 3600 { // Less than 1 hour + reducedConfidence = historicRoundPrice.Confidence / 2 // Reduce confidence by half + } else { + reducedConfidence = 1 // Set the lowest confidence to 1 if more than 1 hour old + } + + if reducedConfidence == 0 { + return nil, types.ErrNoAvailablePrice + } + + historicRoundPrice.Confidence = reducedConfidence + return historicRoundPrice, nil +} diff --git a/plugin_wrapper/plugin_wrapper.go b/plugin_wrapper/plugin_wrapper.go index da47565..ce68e3a 100644 --- a/plugin_wrapper/plugin_wrapper.go +++ b/plugin_wrapper/plugin_wrapper.go @@ -2,6 +2,7 @@ package pluginwrapper import ( "autonity-oracle/config" + "autonity-oracle/helpers" "autonity-oracle/types" "fmt" "github.com/ethereum/go-ethereum/common/math" @@ -10,6 +11,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" "github.com/shopspring/decimal" + "math/big" "os" "os/exec" "strings" @@ -103,11 +105,11 @@ func (pw *PluginWrapper) AddSample(prices []types.Price, ts int64) { } } -// GetAggregatedPrice returns the aggregated price computed from a set of pre-samples of a symbol by a specific plugin. +// AggregatedPrice returns the aggregated price computed from a set of pre-samples of a symbol by a specific plugin. // For data points from AMM and AFQ markets, they are aggregated by the samples of the recent pre-samplings period, // while for data points from CEX, the last sample of the pre-sampling period will be taken. // The target is the timestamp on which the round block is mined, it's used to select datapoint from CEX data source. -func (pw *PluginWrapper) GetAggregatedPrice(symbol string, target int64) (types.Price, error) { +func (pw *PluginWrapper) AggregatedPrice(symbol string, target int64) (types.Price, error) { pw.lockSamples.RLock() defer pw.lockSamples.RUnlock() tsMap, ok := pw.samples[symbol] @@ -115,23 +117,28 @@ func (pw *PluginWrapper) GetAggregatedPrice(symbol string, target int64) (types. return types.Price{}, types.ErrNoAvailablePrice } - // for AMMs or AFQs, get the average price of the collected samples of the recent pre-sampling period. + // for AMMs or AFQs, as the data points may move quickly, thus we get the VWAP of + // the collected samples of the recent pre-sampling period. if pw.dataSrcType == types.SrcAMM || pw.dataSrcType == types.SrcAFQ { - if len(tsMap) < config.PreSamplingRange-1 { - pw.logger.Info("samples are not yet enough for aggregation", "symbol", symbol, "samples", len(tsMap)) - return types.Price{}, types.ErrNoSufficientPrices - } - var prices []decimal.Decimal + var volumes []*big.Int for _, sample := range tsMap { prices = append(prices, sample.Price) + volumes = append(volumes, sample.Volume) } - avgPrice := decimal.Avg(prices[0], prices[1:]...) - pw.logger.Debug("num of samples being aggregated", "symbol", symbol, "samples", len(tsMap), "avgPrice", avgPrice.String()) - return types.Price{Symbol: symbol, Price: avgPrice, Timestamp: target}, nil + + vwap, highestVol, err := helpers.VWAP(prices, volumes) + if err != nil { + pw.logger.Error("failed to calculate vwap", "symbol", symbol, "err", err) + return types.Price{}, err + } + + pw.logger.Debug("VWAP aggregation", "symbol", symbol, "samples", len(tsMap), "vwap", vwap.String()) + return types.Price{Symbol: symbol, Price: vwap, Timestamp: target, Volume: highestVol}, nil } - // for CEX, we just need to take the last sample, short-circuit if there's only one sample of data points from CEX + // for CEX, we just need to take the last sample as data points from CEX were already aggregated. + // Short-circuit if there's only one sample of data points from CEX if len(tsMap) == 1 { for _, price := range tsMap { return price, nil // Return the only sample @@ -157,7 +164,9 @@ func (pw *PluginWrapper) GetAggregatedPrice(symbol string, target int64) (types. } } - return tsMap[nearestKey], nil + price := tsMap[nearestKey] + pw.logger.Debug("nearest sample", "symbol", symbol, "samples", len(tsMap), "targetTS", target, "nearestTS", nearestKey, "price", price) + return price, nil } // GCExpiredSamples removes data points that are older than the TTL seconds of per plugin, it leaves recent samples diff --git a/plugin_wrapper/plugin_wrapper_test.go b/plugin_wrapper/plugin_wrapper_test.go index 81e0850..ea08f56 100644 --- a/plugin_wrapper/plugin_wrapper_test.go +++ b/plugin_wrapper/plugin_wrapper_test.go @@ -2,6 +2,7 @@ package pluginwrapper import ( "autonity-oracle/types" + "github.com/hashicorp/go-hclog" "github.com/shopspring/decimal" "github.com/stretchr/testify/require" "testing" @@ -11,6 +12,7 @@ import ( func TestPluginWrapper(t *testing.T) { t.Run("test finding nearest data sample", func(t *testing.T) { p := PluginWrapper{ + logger: hclog.NewNullLogger(), samples: make(map[string]map[int64]types.Price), latestTimestamps: make(map[string]int64), dataSrcType: types.SrcCEX, @@ -32,43 +34,43 @@ func TestPluginWrapper(t *testing.T) { } target := now - price, err := p.GetAggregatedPrice("NTNGBP", target) + price, err := p.AggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now, price.Timestamp) // upper bound target = now + 100 - price, err = p.GetAggregatedPrice("NTNGBP", target) + price, err = p.AggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+59, price.Timestamp) // lower bound target = now - 1 - price, err = p.GetAggregatedPrice("NTNGBP", target) + price, err = p.AggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now, price.Timestamp) // middle target = now + 29 - price, err = p.GetAggregatedPrice("NTNGBP", target) + price, err = p.AggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+28, price.Timestamp) // middle target = now + 33 - price, err = p.GetAggregatedPrice("NTNGBP", target) + price, err = p.AggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+35, price.Timestamp) // middle target = now + 34 - price, err = p.GetAggregatedPrice("NTNGBP", target) + price, err = p.AggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+35, price.Timestamp) // middle target = now + 35 - price, err = p.GetAggregatedPrice("NTNGBP", target) + price, err = p.AggregatedPrice("NTNGBP", target) require.NoError(t, err) require.Equal(t, now+35, price.Timestamp) diff --git a/plugins/binance/binance.go b/plugins/binance/binance.go index 17a8d45..d01f45e 100644 --- a/plugins/binance/binance.go +++ b/plugins/binance/binance.go @@ -76,6 +76,10 @@ func (bi *BIClient) FetchPrice(symbols []string) (common.Prices, error) { return nil, err } + for i := range prices { + prices[i].Volume = types.DefaultVolume.String() + } + return prices, nil } diff --git a/plugins/common/common.go b/plugins/common/common.go index 010adab..bd351f6 100644 --- a/plugins/common/common.go +++ b/plugins/common/common.go @@ -40,6 +40,7 @@ const ( type Price struct { Symbol string `json:"symbol,omitempty"` Price string `json:"price,omitempty"` + Volume string `json:"volume,omitempty"` // recent accumulating trade volume in USDCx. } type Prices []Price @@ -102,16 +103,23 @@ func (p *Plugin) FetchPrices(symbols []string) (types.PluginPriceReport, error) now := time.Now().Unix() for _, v := range res { - dec, err := decimal.NewFromString(v.Price) + decPrice, err := decimal.NewFromString(v.Price) if err != nil { p.logger.Error("cannot convert price string to decimal: ", "price", v.Price, "error", err.Error()) continue } + decVol, ok := new(big.Int).SetString(v.Volume, 0) + if !ok { + p.logger.Error("cannot convert volume to big.Int: ", "volume", v.Volume) + continue + } + pr := types.Price{ Timestamp: now, Symbol: availableSymMap[v.Symbol], // set the symbol with the symbol style used in oracle server side. - Price: dec, + Price: decPrice, + Volume: decVol, } p.cachePrices[v.Symbol] = pr report.Prices = append(report.Prices, pr) diff --git a/plugins/crypto_airswap/crypto_airswap.go b/plugins/crypto_airswap/crypto_airswap.go index 2204c54..cc9c1a9 100644 --- a/plugins/crypto_airswap/crypto_airswap.go +++ b/plugins/crypto_airswap/crypto_airswap.go @@ -54,6 +54,49 @@ type Order struct { usdcAmount *big.Int } +// aggregatePrice compute the VWAP of the input orders, and return the total accumulating volumes. +func aggregatePrice(orderBook *ring.Ring, order Order) (*big.Rat, *big.Int, error) { + orderBook.Enqueue(order) + recentOrders := orderBook.Values() + return volumeWeightedPrice(recentOrders) +} + +// volumeWeightedPrice return the volume-weighted exchange ratio of ATN or NTN to USDC, and the total volumes. +func volumeWeightedPrice(orders []interface{}) (*big.Rat, *big.Int, error) { + // Initialize total crypto and USDC amounts + totalCrypto := new(big.Int) + totalUSDC := new(big.Int) + + // Iterate through the orders to sum up the amounts + for _, orderInterface := range orders { + // Type assert to Order + order, ok := orderInterface.(Order) + if !ok { + return nil, nil, fmt.Errorf("invalid order type") + } + + totalCrypto.Add(totalCrypto, order.cryptoAmount) + totalUSDC.Add(totalUSDC, order.usdcAmount) + } + + // Check if totalUSDC is zero to avoid division by zero + if totalUSDC.Cmp(common.Zero) == 0 { + return nil, nil, fmt.Errorf("total USDC amount is zero, cannot compute ratio") + } + + // Scale the totals according to their decimals + scaledTotalCrypto := new(big.Int).Mul(totalCrypto, big.NewInt(int64(math.Pow(10, float64(common.USDCDecimals))))) + scaledTotalUSDC := new(big.Int).Mul(totalUSDC, big.NewInt(int64(math.Pow(10, float64(common.AutonityCryptoDecimals))))) + + // Calculate the weighted ratio as a fraction + if scaledTotalUSDC.Cmp(common.Zero) == 0 { + return nil, nil, fmt.Errorf("scaled total USDC amount is zero, cannot compute ratio") + } + + weightedRatio := new(big.Rat).SetFrac(scaledTotalCrypto, scaledTotalUSDC) + return weightedRatio, totalUSDC, nil +} + type AirswapClient struct { conf *config.PluginConfig client *ethclient.Client @@ -122,7 +165,7 @@ func NewAirswapClient(conf *config.PluginConfig) (*AirswapClient, error) { swapContract: swapContract, doneCh: make(chan struct{}), ticker: time.NewTicker(time.Minute), - lastAggregatedPrices: map[ecommon.Address]common.Price{}, + lastAggregatedPrices: make(map[ecommon.Address]common.Price), } ac.atnOrderBooks.SetCapacity(orderBookCapacity) @@ -200,14 +243,14 @@ func (e *AirswapClient) handleSwapEvent(txnHash ecommon.Hash, swapEvent *swaperc orderBook = &e.ntnOrderBooks } - lastAggregatedPrice, err := aggregatePrice(orderBook, order) + lastAggregatedPrice, volumes, err := aggregatePrice(orderBook, order) if err != nil { e.logger.Error("failed to compute new price", "error", err, "txnHash", txnHash, "order", order) return err } // update the last aggregated price. - e.updatePrice(order.cryptoToken, lastAggregatedPrice.FloatString(common.CryptoToUsdcDecimals)) + e.updatePrice(order.cryptoToken, lastAggregatedPrice.FloatString(common.CryptoToUsdcDecimals), volumes) return nil } @@ -334,54 +377,7 @@ func (e *AirswapClient) extractOrder(logs []*types2.Log, targetSwapEvent *swaper return order, errors.New("skip process swap event of ATN and NTN from airswap") } -// compute new price once new settled order comes. -func aggregatePrice(orderBook *ring.Ring, order Order) (*big.Rat, error) { - orderBook.Enqueue(order) - recentOrders := orderBook.Values() - aggregatedPrice, err := volumeWeightedPrice(recentOrders) - if err != nil { - return nil, err - } - return aggregatedPrice, nil -} - -// volumeWeightedPrice calculates the volume-weighted exchange ratio of ATN or NTN to USDC. -func volumeWeightedPrice(orders []interface{}) (*big.Rat, error) { - // Initialize total crypto and USDC amounts - totalCrypto := new(big.Int) - totalUSDC := new(big.Int) - - // Iterate through the orders to sum up the amounts - for _, orderInterface := range orders { - // Type assert to Order - order, ok := orderInterface.(Order) - if !ok { - return nil, fmt.Errorf("invalid order type") - } - - totalCrypto.Add(totalCrypto, order.cryptoAmount) - totalUSDC.Add(totalUSDC, order.usdcAmount) - } - - // Check if totalUSDC is zero to avoid division by zero - if totalUSDC.Cmp(common.Zero) == 0 { - return nil, fmt.Errorf("total USDC amount is zero, cannot compute ratio") - } - - // Scale the totals according to their decimals - scaledTotalCrypto := new(big.Int).Div(totalCrypto, big.NewInt(int64(math.Pow(10, float64(common.AutonityCryptoDecimals))))) - scaledTotalUSDC := new(big.Int).Div(totalUSDC, big.NewInt(int64(math.Pow(10, float64(common.USDCDecimals))))) - - // Calculate the weighted ratio as a fraction - if scaledTotalUSDC.Cmp(common.Zero) == 0 { - return nil, fmt.Errorf("scaled total USDC amount is zero, cannot compute ratio") - } - - weightedRatio := new(big.Rat).SetFrac(scaledTotalCrypto, scaledTotalUSDC) - return weightedRatio, nil -} - -func (e *AirswapClient) updatePrice(tokenAddress ecommon.Address, price string) { +func (e *AirswapClient) updatePrice(tokenAddress ecommon.Address, price string, volumes *big.Int) { e.priceMutex.Lock() defer e.priceMutex.Unlock() @@ -393,6 +389,7 @@ func (e *AirswapClient) updatePrice(tokenAddress ecommon.Address, price string) e.lastAggregatedPrices[tokenAddress] = common.Price{ Symbol: symbol, Price: price, + Volume: volumes.String(), } } @@ -407,13 +404,7 @@ func (e *AirswapClient) checkHealth() { return } - h, err := e.client.BlockNumber(context.Background()) - if err != nil { - e.logger.Error("get block number", "error", err.Error()) - return - } - - e.logger.Debug("checking heart beat", "current height", h) + e.logger.Debug("checking heart beat", "alive", !e.lostSync) } func (e *AirswapClient) handleConnectivityError() { @@ -446,6 +437,7 @@ func (e *AirswapClient) FetchPrice(_ []string) (common.Prices, error) { e.logger.Error("cannot compute NTN-ATN price", "error", err.Error()) return prices, nil } + ntnATNPrice.Volume = atnPrice.Volume prices = append(prices, ntnATNPrice) } return prices, nil diff --git a/plugins/crypto_coinbase/crypto_coinbase.go b/plugins/crypto_coinbase/crypto_coinbase.go index f3778f8..f669870 100644 --- a/plugins/crypto_coinbase/crypto_coinbase.go +++ b/plugins/crypto_coinbase/crypto_coinbase.go @@ -88,6 +88,7 @@ func (c *CoinBaseClient) FetchPrice(_ []string) (common.Prices, error) { prices = append(prices, common.Price{ Symbol: common.DefaultUSDCSymbol, Price: data.Data.Amount, + Volume: types.DefaultVolume.String(), }) return prices, nil diff --git a/plugins/crypto_coingecko/crypto_coingecko.go b/plugins/crypto_coingecko/crypto_coingecko.go index 21147df..749fca5 100644 --- a/plugins/crypto_coingecko/crypto_coingecko.go +++ b/plugins/crypto_coingecko/crypto_coingecko.go @@ -89,6 +89,7 @@ func (c *CoinGeckoClient) FetchPrice(_ []string) (common.Prices, error) { prices = append(prices, common.Price{ Symbol: common.DefaultUSDCSymbol, Price: strconv.FormatFloat(result.USDCoin.USD, 'f', 6, 64), + Volume: types.DefaultVolume.String(), }) return prices, nil diff --git a/plugins/crypto_kraken/crypto_kraken.go b/plugins/crypto_kraken/crypto_kraken.go index bac7534..6554106 100644 --- a/plugins/crypto_kraken/crypto_kraken.go +++ b/plugins/crypto_kraken/crypto_kraken.go @@ -122,6 +122,7 @@ func (k *KrakenClient) toPrice(symbol string, res *Response) (common.Price, erro price.Symbol = symbol price.Price = usdcResult.P[0] // take the volume weighted average price of today. + price.Volume = types.DefaultVolume.String() return price, nil } diff --git a/plugins/crypto_uniswap/common/client.go b/plugins/crypto_uniswap/common/client.go index 4a5918c..8d45b31 100644 --- a/plugins/crypto_uniswap/common/client.go +++ b/plugins/crypto_uniswap/common/client.go @@ -7,34 +7,69 @@ import ( "autonity-oracle/plugins/crypto_uniswap/contracts/pair" "autonity-oracle/types" "fmt" + "github.com/ethereum/go-ethereum/accounts/abi/bind" ecommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/event" "github.com/hashicorp/go-hclog" + "github.com/shopspring/decimal" + ring "github.com/zfjagann/golang-ring" "math" "math/big" "os" + "sync" + "time" ) var ( - Version = "v0.2.0" - ATNUSDC = "ATN-USDC" - NTNUSDC = "NTN-USDC" - supportedSymbols = common.DefaultCryptoSymbols - NTNTokenAddress = types.AutonityContractAddress // Autonity protocol contract is the NTN token contract. + orderBookCapacity = 64 + Version = "v0.2.0" + ATNUSDC = "ATN-USDC" + NTNUSDC = "NTN-USDC" + supportedSymbols = common.DefaultCryptoSymbols + NTNTokenAddress = types.AutonityContractAddress // Autonity protocol contract is the NTN token contract. ) +type Order struct { + cryptoToUsdcPrice decimal.Decimal // ATN-USDCx or NTN-USDCx ratio. + volume *big.Int // trade volume in usdc of per swap event. +} + type WrappedPair struct { - pairContract *pair.Pair - token0 ecommon.Address - token1 ecommon.Address + pairContract *pair.Pair + pairAddress ecommon.Address + token0 ecommon.Address + token1 ecommon.Address + token0Reserves *big.Int + token1Reserves *big.Int } type UniswapClient struct { - conf *config.PluginConfig - client *ethclient.Client - logger hclog.Logger + conf *config.PluginConfig + client *ethclient.Client + logger hclog.Logger + + atnTokenAddress ecommon.Address + usdxTokenAddress ecommon.Address + atnUSDCPairContract *WrappedPair ntnUSDCPairContract *WrappedPair + + chAtnSwapEvent chan *pair.PairSwap + subAtnSwapEvent event.Subscription + + chNtnSwapEvent chan *pair.PairSwap + subNtnSwapEvent event.Subscription + + doneCh chan struct{} + ticker *time.Ticker + lostSync bool + + atnOrderBooks ring.Ring + ntnOrderBooks ring.Ring + + priceMutex sync.RWMutex + lastAggregatedPrices map[ecommon.Address]common.Price } func NewUniswapClient(conf *config.PluginConfig) (*UniswapClient, error) { @@ -72,7 +107,88 @@ func NewUniswapClient(conf *config.PluginConfig) (*UniswapClient, error) { return nil, err } - return &UniswapClient{conf: conf, client: client, logger: logger, atnUSDCPairContract: atnUSDCPairContract, ntnUSDCPairContract: ntnUSDCPairContract}, nil + uc := &UniswapClient{ + conf: conf, + client: client, + logger: logger, + atnUSDCPairContract: atnUSDCPairContract, + ntnUSDCPairContract: ntnUSDCPairContract, + atnTokenAddress: atnTokenAddress, + usdxTokenAddress: usdcTokenAddress, + doneCh: make(chan struct{}), + ticker: time.NewTicker(time.Second * 30), + lastAggregatedPrices: make(map[ecommon.Address]common.Price), + } + + uc.atnOrderBooks.SetCapacity(orderBookCapacity) + uc.ntnOrderBooks.SetCapacity(orderBookCapacity) + + if err = uc.EventSubscription(); err != nil { + return nil, err + } + + return uc, nil +} + +func (e *UniswapClient) EventSubscription() error { + // subscribe on-chain swap event of atn-usdc. + chAtnSwapEvent := make(chan *pair.PairSwap) + subAtnSwapEvent, err := e.atnUSDCPairContract.pairContract.WatchSwap(new(bind.WatchOpts), chAtnSwapEvent, nil, nil) + if err != nil { + e.logger.Error("cannot watch ATN USDC pair swap event", "error", err) + return err + } + + chNtnSwapEvent := make(chan *pair.PairSwap) + subNtnSwapEvent, err := e.ntnUSDCPairContract.pairContract.WatchSwap(new(bind.WatchOpts), chNtnSwapEvent, nil, nil) + if err != nil { + e.logger.Error("cannot watch NTN USDC pair swap event", "error", err) + return err + } + e.chAtnSwapEvent = chAtnSwapEvent + e.subAtnSwapEvent = subAtnSwapEvent + + e.chNtnSwapEvent = chNtnSwapEvent + e.subNtnSwapEvent = subNtnSwapEvent + + return nil +} + +func (e *UniswapClient) StartWatcher() { + for { + select { + case <-e.doneCh: + e.ticker.Stop() + e.logger.Info("uni-swap events watcher stopped") + return + case err := <-e.subAtnSwapEvent.Err(): + if err != nil { + e.logger.Info("subscription error of ATN-USDCx swap event", "error", err) + e.handleConnectivityError() + e.subAtnSwapEvent.Unsubscribe() + } + case err := <-e.subNtnSwapEvent.Err(): + if err != nil { + e.logger.Info("subscription error of NTN-USDCx swap event", "error", err) + e.handleConnectivityError() + e.subNtnSwapEvent.Unsubscribe() + } + case atnSwapEvent := <-e.chAtnSwapEvent: + e.logger.Debug("receiving an ATN-USDC swap event", "event", atnSwapEvent) + + if err := e.handleSwapEvent(e.atnTokenAddress, e.atnUSDCPairContract, atnSwapEvent, &e.atnOrderBooks); err != nil { + e.logger.Error("handle swap event failed", "error", err) + } + case ntnSwapEvent := <-e.chNtnSwapEvent: + e.logger.Debug("receiving a NTN-USDC swap event", "event", ntnSwapEvent) + if err := e.handleSwapEvent(NTNTokenAddress, e.ntnUSDCPairContract, ntnSwapEvent, &e.ntnOrderBooks); err != nil { + e.logger.Error("handle swap event failed", "error", err) + } + + case <-e.ticker.C: + e.checkHealth() + } + } } func bindWithPairContract(factoryContract *factory.Factory, client *ethclient.Client, tokenAddress1, tokenAddress2 ecommon.Address, logger hclog.Logger) (*WrappedPair, error) { @@ -105,46 +221,196 @@ func bindWithPairContract(factoryContract *factory.Factory, client *ethclient.Cl return nil, err } + reserves, err := pairContract.GetReserves(nil) + if err != nil { + logger.Error("cannot resolve reserves from liquidity pool", "error", err) + return nil, err + } + return &WrappedPair{ - pairContract: pairContract, - token0: token0, - token1: token1, + pairContract: pairContract, + pairAddress: pairAddress, + token0: token0, + token1: token1, + token0Reserves: reserves.Reserve0, + token1Reserves: reserves.Reserve1, }, nil } -func (e *UniswapClient) KeyRequired() bool { - return false +func (e *UniswapClient) handleSwapEvent(cryptoToken ecommon.Address, pair *WrappedPair, swap *pair.PairSwap, orderBook *ring.Ring) error { + var order Order + + if swap.Amount0Out.Cmp(common.Zero) > 0 { + // Subtract Amount0Out from token0Reserves + temp0 := new(big.Int).Set(pair.token0Reserves) + pair.token0Reserves.Set(temp0.Sub(temp0, swap.Amount0Out)) + + // Add Amount1In to token1Reserves + temp1 := new(big.Int).Set(pair.token1Reserves) + pair.token1Reserves.Set(temp1.Add(temp1, swap.Amount1In)) + + // token0 is ATN/NTN, token1 is USDCx + if pair.token0 == cryptoToken { + cryptoReserve := new(big.Int).Set(pair.token0Reserves) + usdcReserve := new(big.Int).Set(pair.token1Reserves) + + price, err := ratio(cryptoReserve, usdcReserve) + if err != nil { + return err + } + order.cryptoToUsdcPrice = price + order.volume = swap.Amount1In + } else { + // token0 is USDCx, token1 is ATN/NTN + cryptoReserve := new(big.Int).Set(pair.token1Reserves) + usdcReserve := new(big.Int).Set(pair.token0Reserves) + + price, err := ratio(cryptoReserve, usdcReserve) + if err != nil { + return err + } + order.cryptoToUsdcPrice = price + order.volume = swap.Amount0Out + } + } + + if swap.Amount1Out.Cmp(common.Zero) > 0 { + // update reserves. + temp0 := new(big.Int).Set(pair.token0Reserves) + temp1 := new(big.Int).Set(pair.token1Reserves) + pair.token0Reserves.Set(temp0.Add(temp0, swap.Amount0In)) + pair.token1Reserves.Set(temp1.Sub(temp1, swap.Amount1Out)) + + // token1 is ATN/NTN, token0 is USDCx + if pair.token1 == cryptoToken { + cryptoReserve := new(big.Int).Set(pair.token1Reserves) + usdcReserve := new(big.Int).Set(pair.token0Reserves) + price, err := ratio(cryptoReserve, usdcReserve) + if err != nil { + return err + } + order.cryptoToUsdcPrice = price + order.volume = swap.Amount0In + } else { + // token1 is USDCx, token0 is ATN/NTN + cryptoReserve := new(big.Int).Set(pair.token0Reserves) + usdcReserve := new(big.Int).Set(pair.token1Reserves) + price, err := ratio(cryptoReserve, usdcReserve) + if err != nil { + return err + } + order.cryptoToUsdcPrice = price + order.volume = swap.Amount1Out + } + } + + aggPrice, volumes, err := aggregatePrice(orderBook, order) + if err != nil { + e.logger.Error("aggregate atn-usdcx order book price failed", "error", err) + return err + } + + // update the last aggregated price. + e.updatePrice(cryptoToken, aggPrice.String(), volumes) + return nil } -func (e *UniswapClient) FetchPrice(_ []string) (common.Prices, error) { - var prices common.Prices +func (e *UniswapClient) updatePrice(tokenAddress ecommon.Address, price string, volumes *big.Int) { + e.priceMutex.Lock() + defer e.priceMutex.Unlock() - // todo: compute the volume weighted price for AMM by watching swap volumes, the aggregated price might help to - // get a better data point even if the severe price fluctuations happens. - atnUSDCPrice, err := e.fetchPrice(e.atnUSDCPairContract, ATNUSDC) - if err == nil { - prices = append(prices, atnUSDCPrice) - } else { - e.logger.Error("failed to fetch ATN-USDC price", "error", err) + symbol := ATNUSDC + if tokenAddress == NTNTokenAddress { + symbol = NTNUSDC } - ntnUSDCPrice, err := e.fetchPrice(e.ntnUSDCPairContract, NTNUSDC) - if err == nil { - prices = append(prices, ntnUSDCPrice) - } else { - e.logger.Error("failed to fetch NTN-USDC price", "error", err) + e.lastAggregatedPrices[tokenAddress] = common.Price{ + Symbol: symbol, + Price: price, + Volume: volumes.String(), } +} - if len(prices) == 2 { - ntnATNPrice, err := common.ComputeDerivedPrice(ntnUSDCPrice.Price, atnUSDCPrice.Price) +func aggregatePrice(orderBook *ring.Ring, order Order) (decimal.Decimal, *big.Int, error) { + orderBook.Enqueue(order) + recentOrders := orderBook.Values() + return volumeWeightedPrice(recentOrders) +} + +// volumeWeightedPrice calculates the volume-weighted exchange ratio of ATN or NTN to USDC. +func volumeWeightedPrice(orders []interface{}) (decimal.Decimal, *big.Int, error) { + var vwap decimal.Decimal + + totalValues := new(big.Int) + totalVol := new(big.Int) + + // Iterate through the orders to sum up the amounts + for _, orderInterface := range orders { + // Type assert to Order + order, ok := orderInterface.(Order) + if !ok { + return vwap, nil, fmt.Errorf("invalid order type") + } + + vol := decimal.NewFromBigInt(order.volume, 0) + valuePerSwap := order.cryptoToUsdcPrice.Mul(vol) + + totalValues.Add(totalValues, valuePerSwap.BigInt()) + totalVol.Add(totalVol, order.volume) + } + + // Check if totalVol is zero to avoid division by zero + if totalVol.Cmp(common.Zero) == 0 { + return vwap, nil, fmt.Errorf("total USDC amount is zero, cannot compute ratio") + } + + weightedRatio := new(big.Rat).SetFrac(totalValues, totalVol) + vwap, err := decimal.NewFromString(weightedRatio.FloatString(common.CryptoToUsdcDecimals)) + if err != nil { + return vwap, nil, err + } + + return vwap, totalVol, nil +} + +func (e *UniswapClient) checkHealth() { + if e.lostSync { + err := e.EventSubscription() if err != nil { - e.logger.Error("failed to compute NTN-ATN price", "error", err) - return prices, nil + e.logger.Info("rebuilding WS connectivity with L1 node", "error", err) + return } - prices = append(prices, ntnATNPrice) + + // re-sync reserves from pools. + atnUsdcReserves, err := e.atnUSDCPairContract.pairContract.GetReserves(nil) + if err != nil { + e.logger.Error("re-sync atn-usdcx pair contract", "error", err) + return + } + e.atnUSDCPairContract.token0Reserves = atnUsdcReserves.Reserve0 + e.atnUSDCPairContract.token1Reserves = atnUsdcReserves.Reserve1 + + ntnUsdcReservers, err := e.ntnUSDCPairContract.pairContract.GetReserves(nil) + if err != nil { + e.logger.Error("re-sync ntn-usdcx pair contract", "error", err) + return + } + e.ntnUSDCPairContract.token0Reserves = ntnUsdcReservers.Reserve0 + e.ntnUSDCPairContract.token1Reserves = ntnUsdcReservers.Reserve1 + + e.lostSync = false + return } - return prices, nil + e.logger.Debug("checking heart beat", "alive", !e.lostSync) +} + +func (e *UniswapClient) handleConnectivityError() { + e.lostSync = true +} + +func (e *UniswapClient) KeyRequired() bool { + return false } func (e *UniswapClient) fetchPrice(pair *WrappedPair, symbol string) (common.Price, error) { @@ -173,17 +439,75 @@ func (e *UniswapClient) fetchPrice(pair *WrappedPair, symbol string) (common.Pri usdcReserve = reserves.Reserve0 } - p, err := ComputeExchangeRatio(cryptoReserve, usdcReserve) + p, err := ratio(cryptoReserve, usdcReserve) if err != nil { e.logger.Error("cannot compute exchange ratio of ATN-USDC", "error", err) return price, err } price.Symbol = symbol - price.Price = p + price.Price = p.String() + price.Volume = types.DefaultVolume.String() return price, nil } +func (e *UniswapClient) FetchPrice(_ []string) (common.Prices, error) { + var prices common.Prices + + atnUSDCPrice, err := e.lastAggregatedPrice(e.atnTokenAddress) + if err == nil { + prices = append(prices, atnUSDCPrice) + } else { + e.logger.Debug("no aggregated ATN-USDCx price yet, going to fetch from pool", "error", err) + // no swap event accumulated, compute price from current pool reserves. + price, er := e.fetchPrice(e.atnUSDCPairContract, ATNUSDC) + if er == nil { + prices = append(prices, price) + } else { + e.logger.Error("failed to fetch ATN-USDC price", "error", er) + } + } + + ntnUSDCPrice, err := e.lastAggregatedPrice(NTNTokenAddress) + if err == nil { + prices = append(prices, ntnUSDCPrice) + } else { + e.logger.Debug("no aggregated NTN-USDCx price yet, going to fetch from pool", "error", err) + // no swap event accumulated, compute price from current pool reserves. + price, er := e.fetchPrice(e.ntnUSDCPairContract, NTNUSDC) + if er == nil { + prices = append(prices, price) + } else { + e.logger.Error("failed to fetch NTN-USDC price", "error", er) + } + } + + if len(prices) == 2 { + ntnATNPrice, err := common.ComputeDerivedPrice(prices[1].Price, prices[0].Price) + if err != nil { + e.logger.Error("failed to compute NTN-ATN price", "error", err) + return prices, nil + } + ntnATNPrice.Volume = prices[0].Volume + prices = append(prices, ntnATNPrice) + } + + return prices, nil +} + +func (e *UniswapClient) lastAggregatedPrice(tokenAddress ecommon.Address) (common.Price, error) { + e.priceMutex.RLock() + defer e.priceMutex.RUnlock() + + var price common.Price + latestPrice, ok := e.lastAggregatedPrices[tokenAddress] + if !ok { + return price, fmt.Errorf("no available price yet for token %s", tokenAddress.Hex()) + } + + return latestPrice, nil +} + func (e *UniswapClient) AvailableSymbols() ([]string, error) { return supportedSymbols, nil } @@ -192,12 +516,20 @@ func (e *UniswapClient) Close() { if e.client != nil { e.client.Close() } + e.subAtnSwapEvent.Unsubscribe() + e.subNtnSwapEvent.Unsubscribe() + e.doneCh <- struct{}{} } -// ComputeExchangeRatio calculates the exchange ratio from ATN or NTN to USDC -func ComputeExchangeRatio(cryptoReserve, usdcReserve *big.Int) (string, error) { +func ratio(cryptoReserve, usdcReserve *big.Int) (decimal.Decimal, error) { + var r decimal.Decimal + if usdcReserve.Cmp(common.Zero) == 0 { - return "", fmt.Errorf("usdcReserve is zero, cannot compute exchange ratio") + return r, fmt.Errorf("usdcReserve is zero, cannot compute exchange ratio") + } + + if cryptoReserve.Cmp(common.Zero) < 0 || usdcReserve.Cmp(common.Zero) < 0 { + return r, fmt.Errorf("negative reserve value") } // ratio == (cryptoReserve/cryptoDecimals) / (usdcReserve/usdcDecimals) @@ -206,8 +538,12 @@ func ComputeExchangeRatio(cryptoReserve, usdcReserve *big.Int) (string, error) { scaledUsdcReserve := new(big.Int).Mul(usdcReserve, big.NewInt(int64(math.Pow(10, float64(common.AutonityCryptoDecimals))))) // Calculate the exchange ratio as a big.Rat - ratio := new(big.Rat).SetFrac(scaledCryptoReserve, scaledUsdcReserve) + price := new(big.Rat).SetFrac(scaledCryptoReserve, scaledUsdcReserve) + + r, err := decimal.NewFromString(price.FloatString(common.CryptoToUsdcDecimals)) + if err != nil { + return r, err + } - // Return the string representation of the ratio - return ratio.FloatString(common.CryptoToUsdcDecimals), nil + return r, nil } diff --git a/plugins/crypto_uniswap/common/client_test.go b/plugins/crypto_uniswap/common/client_test.go index c2798ad..bb64a7c 100644 --- a/plugins/crypto_uniswap/common/client_test.go +++ b/plugins/crypto_uniswap/common/client_test.go @@ -24,6 +24,8 @@ func TestNewUniswapClient(t *testing.T) { client, err := NewUniswapClient(&config) require.NoError(t, err) + go client.StartWatcher() + defer client.Close() prices, err := client.FetchPrice(supportedSymbols) diff --git a/plugins/crypto_uniswap/uniswap_usdcx/crypto_uniswap_usdcx.go b/plugins/crypto_uniswap/uniswap_usdcx/crypto_uniswap_usdcx.go index 1772a18..e853643 100644 --- a/plugins/crypto_uniswap/uniswap_usdcx/crypto_uniswap_usdcx.go +++ b/plugins/crypto_uniswap/uniswap_usdcx/crypto_uniswap_usdcx.go @@ -27,6 +27,10 @@ func main() { if err != nil { return } + + // start the uniswapV2 event watching for price aggregation of ATN-USDCx & NTN-USDCx + go c.StartWatcher() + adapter := common.NewPlugin(conf, c, client.Version, types.SrcAMM, common.ChainIDPiccadilly) defer adapter.Close() common.PluginServe(adapter) diff --git a/plugins/forex_currencyfreaks/forex_currencyfreaks.go b/plugins/forex_currencyfreaks/forex_currencyfreaks.go index e7f50ef..80abad8 100644 --- a/plugins/forex_currencyfreaks/forex_currencyfreaks.go +++ b/plugins/forex_currencyfreaks/forex_currencyfreaks.go @@ -133,6 +133,7 @@ func (cf *CFClient) symbolsToPrice(s string, res *CFResult) (common.Price, error return price, fmt.Errorf("wrong base %s", to) } price.Symbol = s + price.Volume = types.DefaultVolume.String() switch from { case "EUR": pUE, err := decimal.NewFromString(res.Rates.EUR) diff --git a/plugins/forex_currencylayer/forex_currencylayer.go b/plugins/forex_currencylayer/forex_currencylayer.go index 4c09199..2936b50 100644 --- a/plugins/forex_currencylayer/forex_currencylayer.go +++ b/plugins/forex_currencylayer/forex_currencylayer.go @@ -139,6 +139,7 @@ func (cl *CLClient) symbolsToPrice(s string, res *CLResult) (common.Price, error } price.Symbol = s + price.Volume = types.DefaultVolume.String() switch from { case "EUR": price.Price = decimal.NewFromInt(1).Div(res.Quotes.USDEUR).String() diff --git a/plugins/forex_exchangerate/forex_exchangerate.go b/plugins/forex_exchangerate/forex_exchangerate.go index eca9043..b424eee 100644 --- a/plugins/forex_exchangerate/forex_exchangerate.go +++ b/plugins/forex_exchangerate/forex_exchangerate.go @@ -140,6 +140,7 @@ func (ex *EXClient) symbolsToPrice(s string, res *EXResult) (common.Price, error } price.Symbol = s + price.Volume = types.DefaultVolume.String() switch from { case "EUR": price.Price = decimal.NewFromInt(1).Div(res.Rates.EUR).String() diff --git a/plugins/forex_openexchange/forex_openexchange.go b/plugins/forex_openexchange/forex_openexchange.go index 5d01560..ca0d9d4 100644 --- a/plugins/forex_openexchange/forex_openexchange.go +++ b/plugins/forex_openexchange/forex_openexchange.go @@ -139,6 +139,7 @@ func (oe *OXClient) symbolsToPrice(s string, res *OEResult) (common.Price, error return price, fmt.Errorf("wrong base %s", to) } price.Symbol = s + price.Volume = types.DefaultVolume.String() switch from { case "EUR": price.Price = decimal.NewFromInt(1).Div(res.Rates.EUR).String() diff --git a/plugins/outlier_tester/outlier_tester.go b/plugins/outlier_tester/outlier_tester.go index 36010a9..6bf59c5 100644 --- a/plugins/outlier_tester/outlier_tester.go +++ b/plugins/outlier_tester/outlier_tester.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" "github.com/shopspring/decimal" + "math/big" "os" "time" ) @@ -77,16 +78,23 @@ func (g *OutlierTesterPlugin) FetchPrices(symbols []string) (types.PluginPriceRe now := time.Now().Unix() for _, v := range res { - dec, err := decimal.NewFromString(v.Price) + decPrice, err := decimal.NewFromString(v.Price) if err != nil { g.logger.Error("cannot convert price string to decimal: ", "price", v.Price, "error", err.Error()) continue } + decVol, ok := new(big.Int).SetString(v.Volume, 0) + if !ok { + g.logger.Error("cannot convert volume to big.Int: ", "volume", v.Volume) + continue + } + pr := types.Price{ Timestamp: now, Symbol: availableSymMap[v.Symbol], // set the symbol with the symbol style used in oracle server side. - Price: dec, + Price: decPrice, + Volume: decVol, } g.cachePrices[v.Symbol] = pr report.Prices = append(report.Prices, pr) @@ -201,6 +209,7 @@ func (tc *OutlierClient) FetchPrice(symbols []string) (common.Prices, error) { var prices common.Prices for _, s := range symbols { var price common.Price + price.Volume = types.DefaultVolume.String() price.Symbol = s // it is a malicious behaviour to set the price into an outlier range by multiply with 3.0 p := helpers.ResolveSimulatedPrice(s).Mul(decimal.RequireFromString("3.0")) diff --git a/plugins/pcgc_cax/pcgc_cax.go b/plugins/pcgc_cax/pcgc_cax.go index 8425e6d..7391a03 100644 --- a/plugins/pcgc_cax/pcgc_cax.go +++ b/plugins/pcgc_cax/pcgc_cax.go @@ -166,6 +166,7 @@ func (cc *CAXClient) fetchPrice(symbol string) (common.Price, error) { // the aggregated price takes the average value of ask and bid prices. price.Price = askPrice.Add(bidPrice).Div(decimal.NewFromInt(2)).String() price.Symbol = symbol + price.Volume = types.DefaultVolume.String() return price, nil } diff --git a/plugins/simulator_plugin/common/client.go b/plugins/simulator_plugin/common/client.go index 444e166..284cfc1 100644 --- a/plugins/simulator_plugin/common/client.go +++ b/plugins/simulator_plugin/common/client.go @@ -3,6 +3,7 @@ package common import ( "autonity-oracle/config" "autonity-oracle/plugins/common" + "autonity-oracle/types" "encoding/json" "github.com/hashicorp/go-hclog" "io" @@ -67,6 +68,10 @@ func (bi *SIMClient) FetchPrice(symbols []string) (common.Prices, error) { return nil, err } + for i := range prices { + prices[i].Volume = types.DefaultVolume.String() + } + return prices, nil } diff --git a/plugins/template_plugin/template_plugin.go b/plugins/template_plugin/template_plugin.go index 601a0df..182e56b 100644 --- a/plugins/template_plugin/template_plugin.go +++ b/plugins/template_plugin/template_plugin.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" "github.com/shopspring/decimal" + "math/big" "net/url" "os" "time" @@ -80,16 +81,23 @@ func (g *TemplatePlugin) FetchPrices(symbols []string) (types.PluginPriceReport, now := time.Now().Unix() for _, v := range res { - dec, err := decimal.NewFromString(v.Price) + decPrice, err := decimal.NewFromString(v.Price) if err != nil { g.logger.Error("cannot convert price string to decimal: ", "price", v.Price, "error", err.Error()) continue } + decVol, ok := new(big.Int).SetString(v.Volume, 0) + if !ok { + g.logger.Error("cannot convert price to decimal: ", "price", v.Price, "error", v.Volume) + continue + } + pr := types.Price{ Timestamp: now, Symbol: availableSymMap[v.Symbol], // set the symbol with the symbol style used in oracle server side. - Price: dec, + Price: decPrice, + Volume: decVol, } g.cachePrices[v.Symbol] = pr report.Prices = append(report.Prices, pr) @@ -235,6 +243,7 @@ func (tc *TemplateClient) FetchPrice(symbols []string) (common.Prices, error) { for _, s := range symbols { var price common.Price price.Symbol = s + price.Volume = types.DefaultVolume.String() price.Price = helpers.ResolveSimulatedPrice(s).String() prices = append(prices, price) } diff --git a/types/types.go b/types/types.go index 6b3110f..f45c77f 100644 --- a/types/types.go +++ b/types/types.go @@ -15,15 +15,15 @@ import ( var ( Deployer = common.Address{} + DefaultVolume = new(big.Int).SetInt64(1000000) // used by forex currency which does not have trade volumes. AutonityContractAddress = crypto.CreateAddress(Deployer, 0) OracleContractAddress = crypto.CreateAddress(Deployer, 2) - ErrPeerOnSync = errors.New("l1 node is on peer sync") - ErrNoAvailablePrice = errors.New("no available prices collected yet") - ErrNoSufficientPrices = errors.New("no sufficient num of prices were collected yet") - ErrNoDataRound = errors.New("no data collected at current round") - ErrNoSymbolsObserved = errors.New("no symbols observed from oracle contract") - ErrMissingServiceKey = errors.New("the key to access the data source is missing, please check the plugin config") + ErrPeerOnSync = errors.New("l1 node is on peer sync") + ErrNoAvailablePrice = errors.New("no available prices collected yet") + ErrNoDataRound = errors.New("no data collected at current round") + ErrNoSymbolsObserved = errors.New("no symbols observed from oracle contract") + ErrMissingServiceKey = errors.New("the key to access the data source is missing, please check the plugin config") ) // Price is the structure contains the exchange rate of a symbol with a timestamp at which the sampling happens. @@ -31,7 +31,8 @@ type Price struct { Timestamp int64 // TS on when the data is being sampled in time's seconds since Jan 1 1970 (Unix time). Symbol string Price decimal.Decimal - Confidence uint8 // to be resolved on the aggregation phase, depends on how many data samples. + Volume *big.Int // recent trade volume in quoto of USDCx. + Confidence uint8 // confidence resolved by the server. } // PriceBySymbol group the price by symbols.