Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature, VWAP aggregation for AMM and AFQ market data points. #45

Merged
merged 10 commits into from
Jan 8, 2025
4 changes: 2 additions & 2 deletions config/config_for_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ confidenceStrategy: 0 # 0: linear, 1: fixed
# type PluginConfig struct {
# Name string `json:"name" yaml:"name"` // the name of the plugin binary.
# Key string `json:"key" yaml:"key"` // the API key granted by your data provider to access their data API.
# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http or https.
# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http, https, ws or wss.
# Endpoint string `json:"endpoint" yaml:"endpoint"` // the data service endpoint url of the data provider.
# Timeout int `json:"timeout" yaml:"timeout"` // the timeout period in seconds that an API request is lasting for.
# DataUpdateInterval int `json:"refresh" yaml:"refresh"` // the interval in seconds to fetch data from data provider due to rate limit.
Expand Down Expand Up @@ -87,7 +87,7 @@ pluginConfigs:
refresh: 3600 # optional, buffered data within 3600s, recommended for API rate limited data source.
# Un-comment below lines to config the RPC endpoint of a Piccadilly Network Full Node for your AMM plugin which sources ATN & NTN market data from an on-chain AMM.
- name: crypto_uniswap
scheme: "wss" # Available values are: "http", "https", "ws" or "wss", default value is "wss".
scheme: "wss" # Available values are: "ws" or "wss", default value is "wss".
endpoint: "rpc-internal-1.piccadilly.autonity.org/ws" # The default URL might not be stable for public usage, we recommend you to change it with your validator node's RPC endpoint.

#Enable the metric collection for oracle server, supported TS-DB engines are influxDB v1 and v2.
Expand Down
17 changes: 9 additions & 8 deletions config/oracle_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ confidenceStrategy: 0 # 0: linear, 1: fixed
#
# The crypto data plugins are used to fetch market prices for the crypto currency pairs: ATN-USDC, NTN-USDC, NTN-ATN and
# USDC-USD. USDC liquidity is bridged to the Autonity public testnet from the Polygon Amoy testnet via a bridge service.
# Out-the-box plugins for collecting ATN-USDC and NTN-USDC market data are available for UniSwap V2 and AirSwap protocols. NTN-ATN market price is derived from
# that market data, and USDC pricing is converted to USD. ATN-NTN, ATN-USD, and NTN-USD prices are then submitted on-chain.
# To retrieve ATN and NTN prices, put the `crypto_uniswap` plugin and `crypto_airswap` plugin in your plugin directory.
# Oracle server can then discover and load them. Configuring the `crypto_uniswap` and `crypto_airswap` plugin does not
# require an API key, it is an open and free data source of a standard EVM RPC websocket service endpoint. The
# end user can connect to specific EVM RPC endpoint base on the blockchain which hosts the uniswap and airswap contracts.
# Out-the-box plugins for collecting ATN-USDC and NTN-USDC market data are available for UniSwap V2 and AirSwap protocols.
# NTN-ATN market price is derived from that market data, and USDC pricing is converted to USD. ATN-NTN, ATN-USD, and NTN-USD
# prices are then submitted on-chain. To retrieve ATN and NTN prices, put the `crypto_uniswap` plugin and `crypto_airswap`
# plugin in your plugin directory. Oracle server can then discover and load them. Configuring the `crypto_uniswap` and
#`crypto_airswap` plugin does not require an API key, it is an open and free data source of a standard EVM RPC websocket
# service endpoint. The end user can connect to specific EVM RPC endpoint base on the blockchain which hosts the uniswap
# and airswap contracts.

# USDC-USD prices are required by the protocol to convert the ATN-USDC and NTN-USDC to ATN-USD and NTN-USD. This enables
# the reporting of ATN and NTN prices in USD to the ASM. Three plugins are implemented to source the USDC-USD datapoint
Expand All @@ -57,7 +58,7 @@ confidenceStrategy: 0 # 0: linear, 1: fixed
# type PluginConfig struct {
# Name string `json:"name" yaml:"name"` // the name of the plugin binary.
# Key string `json:"key" yaml:"key"` // the API key granted by your data provider to access their data API.
# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http or https.
# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http, https, ws or wss.
# Endpoint string `json:"endpoint" yaml:"endpoint"` // the data service endpoint url of the data provider.
# Timeout int `json:"timeout" yaml:"timeout"` // the timeout period in seconds that an API request is lasting for.
# DataUpdateInterval int `json:"refresh" yaml:"refresh"` // the interval in seconds to fetch data from data provider due to rate limit.
Expand Down Expand Up @@ -87,7 +88,7 @@ confidenceStrategy: 0 # 0: linear, 1: fixed
# refresh: 3600 # optional, buffered data within 3600s, recommended for API rate limited data source.
# Un-comment below lines to config the RPC endpoint of a Piccadilly Network Full Node for your AMM plugin which sources ATN & NTN market data from an on-chain AMM.
# - name: crypto_uniswap
# scheme: "wss" # Available values are: "http", "https", "ws" or "wss", default value is "wss".
# scheme: "wss" # Only websocket please, available values are: "ws" or "wss", default value is "wss" for uniswap plugins.
# endpoint: "rpc-internal-1.piccadilly.autonity.org/ws" # The default URL might not be stable for public usage, we recommend you to change it with your validator node's RPC endpoint.

#Enable the metric collection for oracle server, supported TS-DB engines are influxDB v1 and v2.
Expand Down
38 changes: 38 additions & 0 deletions helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package helpers

import (
"encoding/csv"
"errors"
"fmt"
"github.com/shopspring/decimal"
"io"
"io/fs"
"io/ioutil" //nolint
"math/big"
"os"
"sort"
)
Expand Down Expand Up @@ -98,6 +100,42 @@ func Median(prices []decimal.Decimal) (decimal.Decimal, error) {
return prices[l/2], nil
}

// VWAP computes the volume weighted average price for the input prices with their corresponding volumes
func VWAP(prices []decimal.Decimal, volumes []*big.Int) (decimal.Decimal, *big.Int, error) {
if len(prices) == 0 || len(volumes) == 0 || len(prices) != len(volumes) {
return decimal.Zero, nil, errors.New("prices and volumes must be of the same non-zero length")
}

var totalWeightedPrice decimal.Decimal
totalVolume := big.NewInt(0)
highestVol := new(big.Int).Set(volumes[0])

for i := range prices {
if volumes[i].Cmp(highestVol) > 0 {
highestVol.Set(volumes[i])
}

// Convert volume to decimal.Decimal
volumeDecimal := decimal.NewFromBigInt(volumes[i], 0)

// Calculate weighted price for current price and volume
weightedPrice := prices[i].Mul(volumeDecimal) // Use decimal.Decimal for precision
totalWeightedPrice = totalWeightedPrice.Add(weightedPrice)

// Accumulate total volume
totalVolume.Add(totalVolume, volumes[i])
}

// Avoid division by zero
if totalVolume.Cmp(big.NewInt(0)) == 0 {
return decimal.Zero, nil, errors.New("total volume cannot be zero")
}

// Calculate VWAP
vwap := totalWeightedPrice.Div(decimal.NewFromBigInt(totalVolume, 0))
return vwap, highestVol, nil
}

// ListPlugins returns a mapping of file names to fs.FileInfo for executable files in the specified path.
func ListPlugins(path string) (map[string]fs.FileInfo, error) {
plugins := make(map[string]fs.FileInfo)
Expand Down
82 changes: 82 additions & 0 deletions helpers/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package helpers
import (
"github.com/shopspring/decimal"
"github.com/stretchr/testify/require"
"math/big"
"testing"
)

Expand Down Expand Up @@ -71,3 +72,84 @@ func TestMedian(t *testing.T) {
require.Error(t, err)
})
}

func TestVWAP(t *testing.T) {
tests := []struct {
prices []decimal.Decimal
volumes []*big.Int
expectedVWAP decimal.Decimal
expectedHighestVol *big.Int
expectError bool
}{
{
prices: []decimal.Decimal{
decimal.NewFromFloat(100.0),
decimal.NewFromFloat(200.0),
decimal.NewFromFloat(100.0),
},
volumes: []*big.Int{
big.NewInt(10),
big.NewInt(20),
big.NewInt(10),
},
expectedVWAP: decimal.NewFromFloatWithExponent(150, 0), // (100*10 + 200*20 + 150*30) / (10 + 20 + 30)
expectedHighestVol: big.NewInt(20),
expectError: false,
},
{
prices: []decimal.Decimal{
decimal.NewFromFloat(50.0),
decimal.NewFromFloat(75.0),
},
volumes: []*big.Int{
big.NewInt(5),
big.NewInt(0),
},
expectedVWAP: decimal.NewFromFloatWithExponent(50, 0),
expectedHighestVol: big.NewInt(5),
},
{
prices: []decimal.Decimal{},
volumes: []*big.Int{},
expectedVWAP: decimal.Zero,
expectedHighestVol: nil,
expectError: true,
},
{
prices: []decimal.Decimal{
decimal.NewFromFloat(100.0),
},
volumes: []*big.Int{
big.NewInt(10),
big.NewInt(20),
},
expectedVWAP: decimal.Zero,
expectedHighestVol: nil,
expectError: true,
},
}

for _, test := range tests {
vwap, highestVol, err := VWAP(test.prices, test.volumes)

if test.expectError {
if err == nil {
t.Errorf("Expected an error for prices: %v and volumes: %v, but got none", test.prices, test.volumes)
}
continue
}

if err != nil {
t.Errorf("Unexpected error for prices: %v and volumes: %v: %v", test.prices, test.volumes, err)
continue
}

if !vwap.Equal(test.expectedVWAP) {
t.Errorf("For prices: %v and volumes: %v, expected VWAP: %s, but got: %s", test.prices, test.volumes, test.expectedVWAP.String(), vwap.String())
}

if highestVol.Cmp(test.expectedHighestVol) != 0 {
t.Errorf("For prices: %v and volumes: %v, expected highest volume: %s, but got: %s", test.prices, test.volumes, test.expectedHighestVol.String(), highestVol.String())
}
}
}
87 changes: 82 additions & 5 deletions oracle_server/oracle_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,42 +777,96 @@ func (os *OracleServer) aggregateBridgedPrice(srcSymbol string, target int64, us
return p, nil
}

// aggregatePrice takes the symbol's aggregated data points from all the supported plugins, if there are multiple
// markets' datapoint, it will do a final VWAP aggregation to form the final reporting value.
func (os *OracleServer) aggregatePrice(s string, target int64) (*types.Price, error) {
var prices []decimal.Decimal
var volumes []*big.Int
for _, plugin := range os.runningPlugins {
p, err := plugin.GetAggregatedPrice(s, target)
p, err := plugin.AggregatedPrice(s, target)
if err != nil {
continue
}
prices = append(prices, p.Price)
volumes = append(volumes, p.Volume)
}

if len(prices) == 0 {
return nil, types.ErrNoDataRound
historicRoundPrice, err := os.queryHistoricRoundPrice(s)
if err != nil {
return nil, err
}

return confidenceAdjustedPrice(&historicRoundPrice, target)
}

// compute confidence of the symbol from the num of plugins' samples of it.
confidence := ComputeConfidence(s, len(prices), os.conf.ConfidenceStrategy)

price := &types.Price{
Timestamp: target,
Price: prices[0],
Volume: volumes[0],
Symbol: s,
Confidence: confidence,
}

// we have multiple provider provide prices for this symbol, we have to aggregate it.
if len(prices) > 1 {
_, isForex := ForexCurrencies[s]

// we have multiple markets' data for this forex symbol, update the price with median value.
if len(prices) > 1 && isForex {
p, err := helpers.Median(prices)
if err != nil {
return nil, err
}
price.Price = p
price.Volume = types.DefaultVolume
return price, nil
}

// we have multiple markets' data for this crypto symbol, update the price with VWAP.
if len(prices) > 1 && !isForex {
p, vol, err := helpers.VWAP(prices, volumes)
if err != nil {
return nil, err
}
price.Price = p
price.Volume = vol
}

return price, nil
}

// queryHistoricRoundPrice queries the last available price for a given symbol from the historic rounds.
func (os *OracleServer) queryHistoricRoundPrice(symbol string) (types.Price, error) {

if len(os.roundData) == 0 {
return types.Price{}, types.ErrNoDataRound
}

numOfRounds := len(os.roundData)
// Iterate from the current round backward
for i := 0; i < numOfRounds; i++ {
roundID := os.curRound - uint64(i) - 1 //nolint
// Get the round data for the current round ID
roundData, exists := os.roundData[roundID]
if !exists {
continue
}

if roundData == nil {
continue
}

// Check if the symbol exists in the Prices map
if price, found := roundData.Prices[symbol]; found {
return price, nil
}
}

// If no price was found after checking all rounds, return an error
return types.Price{}, types.ErrNoDataRound
}

func (os *OracleServer) samplePrice(symbols []string, ts int64) {
if os.lastSampledTS == ts {
return
Expand Down Expand Up @@ -1098,3 +1152,26 @@ func ComputeConfidence(symbol string, numOfSamples, strategy int) uint8 {

return uint8(weight) //nolint
}

func confidenceAdjustedPrice(historicRoundPrice *types.Price, target int64) (*types.Price, error) {
// by according to the spreading of price timestamp from the target timestamp,
// we reduce the confidence of the price, set the lowest confidence as 1.
// Calculate the time difference between the target timestamp and the historic price timestamp
timeDifference := target - historicRoundPrice.Timestamp

var reducedConfidence uint8
if timeDifference < 60 { // Less than 1 minute
reducedConfidence = historicRoundPrice.Confidence // Keep original confidence
} else if timeDifference < 3600 { // Less than 1 hour
reducedConfidence = historicRoundPrice.Confidence / 2 // Reduce confidence by half
} else {
reducedConfidence = 1 // Set the lowest confidence to 1 if more than 1 hour old
}

if reducedConfidence == 0 {
return nil, types.ErrNoAvailablePrice
}

historicRoundPrice.Confidence = reducedConfidence
return historicRoundPrice, nil
}
Loading
Loading