Skip to content

Commit

Permalink
feature, VWAP aggregation for AMM and AFQ market data points. (#45)
Browse files Browse the repository at this point in the history
* feature, VWAP aggregation for AMM and AFQ market data points.

* lint, fix lint error.

* test, repair test.

* improvement, do the VWAP for recent samples.

* refine, code refine and add todo for confidence adjustment.

* refine, code refine and doc update for VWAP.

* test, fix test.

* improvement, confidence adjusted for historic round data.

* improvement, refine for oracle server.

* lint, no lint.
  • Loading branch information
Jason-Zhangxin-Chen authored Jan 8, 2025
1 parent 6c7a81b commit 05dd747
Show file tree
Hide file tree
Showing 25 changed files with 738 additions and 151 deletions.
4 changes: 2 additions & 2 deletions config/config_for_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ confidenceStrategy: 0 # 0: linear, 1: fixed
# type PluginConfig struct {
# Name string `json:"name" yaml:"name"` // the name of the plugin binary.
# Key string `json:"key" yaml:"key"` // the API key granted by your data provider to access their data API.
# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http or https.
# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http, https, ws or wss.
# Endpoint string `json:"endpoint" yaml:"endpoint"` // the data service endpoint url of the data provider.
# Timeout int `json:"timeout" yaml:"timeout"` // the timeout period in seconds that an API request is lasting for.
# DataUpdateInterval int `json:"refresh" yaml:"refresh"` // the interval in seconds to fetch data from data provider due to rate limit.
Expand Down Expand Up @@ -87,7 +87,7 @@ pluginConfigs:
refresh: 3600 # optional, buffered data within 3600s, recommended for API rate limited data source.
# Un-comment below lines to config the RPC endpoint of a Piccadilly Network Full Node for your AMM plugin which sources ATN & NTN market data from an on-chain AMM.
- name: crypto_uniswap
scheme: "wss" # Available values are: "http", "https", "ws" or "wss", default value is "wss".
scheme: "wss" # Available values are: "ws" or "wss", default value is "wss".
endpoint: "rpc-internal-1.piccadilly.autonity.org/ws" # The default URL might not be stable for public usage, we recommend you to change it with your validator node's RPC endpoint.

#Enable the metric collection for oracle server, supported TS-DB engines are influxDB v1 and v2.
Expand Down
17 changes: 9 additions & 8 deletions config/oracle_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ confidenceStrategy: 0 # 0: linear, 1: fixed
#
# The crypto data plugins are used to fetch market prices for the crypto currency pairs: ATN-USDC, NTN-USDC, NTN-ATN and
# USDC-USD. USDC liquidity is bridged to the Autonity public testnet from the Polygon Amoy testnet via a bridge service.
# Out-the-box plugins for collecting ATN-USDC and NTN-USDC market data are available for UniSwap V2 and AirSwap protocols. NTN-ATN market price is derived from
# that market data, and USDC pricing is converted to USD. ATN-NTN, ATN-USD, and NTN-USD prices are then submitted on-chain.
# To retrieve ATN and NTN prices, put the `crypto_uniswap` plugin and `crypto_airswap` plugin in your plugin directory.
# Oracle server can then discover and load them. Configuring the `crypto_uniswap` and `crypto_airswap` plugin does not
# require an API key, it is an open and free data source of a standard EVM RPC websocket service endpoint. The
# end user can connect to specific EVM RPC endpoint base on the blockchain which hosts the uniswap and airswap contracts.
# Out-the-box plugins for collecting ATN-USDC and NTN-USDC market data are available for UniSwap V2 and AirSwap protocols.
# NTN-ATN market price is derived from that market data, and USDC pricing is converted to USD. ATN-NTN, ATN-USD, and NTN-USD
# prices are then submitted on-chain. To retrieve ATN and NTN prices, put the `crypto_uniswap` plugin and `crypto_airswap`
# plugin in your plugin directory. Oracle server can then discover and load them. Configuring the `crypto_uniswap` and
#`crypto_airswap` plugin does not require an API key, it is an open and free data source of a standard EVM RPC websocket
# service endpoint. The end user can connect to specific EVM RPC endpoint base on the blockchain which hosts the uniswap
# and airswap contracts.

# USDC-USD prices are required by the protocol to convert the ATN-USDC and NTN-USDC to ATN-USD and NTN-USD. This enables
# the reporting of ATN and NTN prices in USD to the ASM. Three plugins are implemented to source the USDC-USD datapoint
Expand All @@ -57,7 +58,7 @@ confidenceStrategy: 0 # 0: linear, 1: fixed
# type PluginConfig struct {
# Name string `json:"name" yaml:"name"` // the name of the plugin binary.
# Key string `json:"key" yaml:"key"` // the API key granted by your data provider to access their data API.
# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http or https.
# Scheme string `json:"scheme" yaml:"scheme"` // the data service scheme, http, https, ws or wss.
# Endpoint string `json:"endpoint" yaml:"endpoint"` // the data service endpoint url of the data provider.
# Timeout int `json:"timeout" yaml:"timeout"` // the timeout period in seconds that an API request is lasting for.
# DataUpdateInterval int `json:"refresh" yaml:"refresh"` // the interval in seconds to fetch data from data provider due to rate limit.
Expand Down Expand Up @@ -87,7 +88,7 @@ confidenceStrategy: 0 # 0: linear, 1: fixed
# refresh: 3600 # optional, buffered data within 3600s, recommended for API rate limited data source.
# Un-comment below lines to config the RPC endpoint of a Piccadilly Network Full Node for your AMM plugin which sources ATN & NTN market data from an on-chain AMM.
# - name: crypto_uniswap
# scheme: "wss" # Available values are: "http", "https", "ws" or "wss", default value is "wss".
# scheme: "wss" # Only websocket please, available values are: "ws" or "wss", default value is "wss" for uniswap plugins.
# endpoint: "rpc-internal-1.piccadilly.autonity.org/ws" # The default URL might not be stable for public usage, we recommend you to change it with your validator node's RPC endpoint.

#Enable the metric collection for oracle server, supported TS-DB engines are influxDB v1 and v2.
Expand Down
38 changes: 38 additions & 0 deletions helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package helpers

import (
"encoding/csv"
"errors"
"fmt"
"github.com/shopspring/decimal"
"io"
"io/fs"
"io/ioutil" //nolint
"math/big"
"os"
"sort"
)
Expand Down Expand Up @@ -98,6 +100,42 @@ func Median(prices []decimal.Decimal) (decimal.Decimal, error) {
return prices[l/2], nil
}

// VWAP computes the volume weighted average price for the input prices with their corresponding volumes
func VWAP(prices []decimal.Decimal, volumes []*big.Int) (decimal.Decimal, *big.Int, error) {
if len(prices) == 0 || len(volumes) == 0 || len(prices) != len(volumes) {
return decimal.Zero, nil, errors.New("prices and volumes must be of the same non-zero length")
}

var totalWeightedPrice decimal.Decimal
totalVolume := big.NewInt(0)
highestVol := new(big.Int).Set(volumes[0])

for i := range prices {
if volumes[i].Cmp(highestVol) > 0 {
highestVol.Set(volumes[i])
}

// Convert volume to decimal.Decimal
volumeDecimal := decimal.NewFromBigInt(volumes[i], 0)

// Calculate weighted price for current price and volume
weightedPrice := prices[i].Mul(volumeDecimal) // Use decimal.Decimal for precision
totalWeightedPrice = totalWeightedPrice.Add(weightedPrice)

// Accumulate total volume
totalVolume.Add(totalVolume, volumes[i])
}

// Avoid division by zero
if totalVolume.Cmp(big.NewInt(0)) == 0 {
return decimal.Zero, nil, errors.New("total volume cannot be zero")
}

// Calculate VWAP
vwap := totalWeightedPrice.Div(decimal.NewFromBigInt(totalVolume, 0))
return vwap, highestVol, nil
}

// ListPlugins returns a mapping of file names to fs.FileInfo for executable files in the specified path.
func ListPlugins(path string) (map[string]fs.FileInfo, error) {
plugins := make(map[string]fs.FileInfo)
Expand Down
82 changes: 82 additions & 0 deletions helpers/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package helpers
import (
"github.com/shopspring/decimal"
"github.com/stretchr/testify/require"
"math/big"
"testing"
)

Expand Down Expand Up @@ -71,3 +72,84 @@ func TestMedian(t *testing.T) {
require.Error(t, err)
})
}

func TestVWAP(t *testing.T) {
tests := []struct {
prices []decimal.Decimal
volumes []*big.Int
expectedVWAP decimal.Decimal
expectedHighestVol *big.Int
expectError bool
}{
{
prices: []decimal.Decimal{
decimal.NewFromFloat(100.0),
decimal.NewFromFloat(200.0),
decimal.NewFromFloat(100.0),
},
volumes: []*big.Int{
big.NewInt(10),
big.NewInt(20),
big.NewInt(10),
},
expectedVWAP: decimal.NewFromFloatWithExponent(150, 0), // (100*10 + 200*20 + 150*30) / (10 + 20 + 30)
expectedHighestVol: big.NewInt(20),
expectError: false,
},
{
prices: []decimal.Decimal{
decimal.NewFromFloat(50.0),
decimal.NewFromFloat(75.0),
},
volumes: []*big.Int{
big.NewInt(5),
big.NewInt(0),
},
expectedVWAP: decimal.NewFromFloatWithExponent(50, 0),
expectedHighestVol: big.NewInt(5),
},
{
prices: []decimal.Decimal{},
volumes: []*big.Int{},
expectedVWAP: decimal.Zero,
expectedHighestVol: nil,
expectError: true,
},
{
prices: []decimal.Decimal{
decimal.NewFromFloat(100.0),
},
volumes: []*big.Int{
big.NewInt(10),
big.NewInt(20),
},
expectedVWAP: decimal.Zero,
expectedHighestVol: nil,
expectError: true,
},
}

for _, test := range tests {
vwap, highestVol, err := VWAP(test.prices, test.volumes)

if test.expectError {
if err == nil {
t.Errorf("Expected an error for prices: %v and volumes: %v, but got none", test.prices, test.volumes)
}
continue
}

if err != nil {
t.Errorf("Unexpected error for prices: %v and volumes: %v: %v", test.prices, test.volumes, err)
continue
}

if !vwap.Equal(test.expectedVWAP) {
t.Errorf("For prices: %v and volumes: %v, expected VWAP: %s, but got: %s", test.prices, test.volumes, test.expectedVWAP.String(), vwap.String())
}

if highestVol.Cmp(test.expectedHighestVol) != 0 {
t.Errorf("For prices: %v and volumes: %v, expected highest volume: %s, but got: %s", test.prices, test.volumes, test.expectedHighestVol.String(), highestVol.String())
}
}
}
87 changes: 82 additions & 5 deletions oracle_server/oracle_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,42 +777,96 @@ func (os *OracleServer) aggregateBridgedPrice(srcSymbol string, target int64, us
return p, nil
}

// aggregatePrice takes the symbol's aggregated data points from all the supported plugins, if there are multiple
// markets' datapoint, it will do a final VWAP aggregation to form the final reporting value.
func (os *OracleServer) aggregatePrice(s string, target int64) (*types.Price, error) {
var prices []decimal.Decimal
var volumes []*big.Int
for _, plugin := range os.runningPlugins {
p, err := plugin.GetAggregatedPrice(s, target)
p, err := plugin.AggregatedPrice(s, target)
if err != nil {
continue
}
prices = append(prices, p.Price)
volumes = append(volumes, p.Volume)
}

if len(prices) == 0 {
return nil, types.ErrNoDataRound
historicRoundPrice, err := os.queryHistoricRoundPrice(s)
if err != nil {
return nil, err
}

return confidenceAdjustedPrice(&historicRoundPrice, target)
}

// compute confidence of the symbol from the num of plugins' samples of it.
confidence := ComputeConfidence(s, len(prices), os.conf.ConfidenceStrategy)

price := &types.Price{
Timestamp: target,
Price: prices[0],
Volume: volumes[0],
Symbol: s,
Confidence: confidence,
}

// we have multiple provider provide prices for this symbol, we have to aggregate it.
if len(prices) > 1 {
_, isForex := ForexCurrencies[s]

// we have multiple markets' data for this forex symbol, update the price with median value.
if len(prices) > 1 && isForex {
p, err := helpers.Median(prices)
if err != nil {
return nil, err
}
price.Price = p
price.Volume = types.DefaultVolume
return price, nil
}

// we have multiple markets' data for this crypto symbol, update the price with VWAP.
if len(prices) > 1 && !isForex {
p, vol, err := helpers.VWAP(prices, volumes)
if err != nil {
return nil, err
}
price.Price = p
price.Volume = vol
}

return price, nil
}

// queryHistoricRoundPrice queries the last available price for a given symbol from the historic rounds.
func (os *OracleServer) queryHistoricRoundPrice(symbol string) (types.Price, error) {

if len(os.roundData) == 0 {
return types.Price{}, types.ErrNoDataRound
}

numOfRounds := len(os.roundData)
// Iterate from the current round backward
for i := 0; i < numOfRounds; i++ {
roundID := os.curRound - uint64(i) - 1 //nolint
// Get the round data for the current round ID
roundData, exists := os.roundData[roundID]
if !exists {
continue
}

if roundData == nil {
continue
}

// Check if the symbol exists in the Prices map
if price, found := roundData.Prices[symbol]; found {
return price, nil
}
}

// If no price was found after checking all rounds, return an error
return types.Price{}, types.ErrNoDataRound
}

func (os *OracleServer) samplePrice(symbols []string, ts int64) {
if os.lastSampledTS == ts {
return
Expand Down Expand Up @@ -1098,3 +1152,26 @@ func ComputeConfidence(symbol string, numOfSamples, strategy int) uint8 {

return uint8(weight) //nolint
}

func confidenceAdjustedPrice(historicRoundPrice *types.Price, target int64) (*types.Price, error) {
// by according to the spreading of price timestamp from the target timestamp,
// we reduce the confidence of the price, set the lowest confidence as 1.
// Calculate the time difference between the target timestamp and the historic price timestamp
timeDifference := target - historicRoundPrice.Timestamp

var reducedConfidence uint8
if timeDifference < 60 { // Less than 1 minute
reducedConfidence = historicRoundPrice.Confidence // Keep original confidence
} else if timeDifference < 3600 { // Less than 1 hour
reducedConfidence = historicRoundPrice.Confidence / 2 // Reduce confidence by half
} else {
reducedConfidence = 1 // Set the lowest confidence to 1 if more than 1 hour old
}

if reducedConfidence == 0 {
return nil, types.ErrNoAvailablePrice
}

historicRoundPrice.Confidence = reducedConfidence
return historicRoundPrice, nil
}
Loading

0 comments on commit 05dd747

Please sign in to comment.