Skip to content

Commit

Permalink
Full Node Streaming Order Filtering by Subaccount impl and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
UnbornAztecKing committed Jan 25, 2025
1 parent afa37ec commit 3bb3a07
Show file tree
Hide file tree
Showing 11 changed files with 909 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ export interface StreamOrderbookUpdatesRequest {
/** Market ids for price updates. */

marketIds: number[];
/**
* Filter order updates by subaccount IDs.
* If true, the orderbook updates only include orders from provided subaccount
* IDs.
*/

filterOrdersBySubaccountId: boolean;
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
Expand All @@ -293,6 +300,13 @@ export interface StreamOrderbookUpdatesRequestSDKType {
/** Market ids for price updates. */

market_ids: number[];
/**
* Filter order updates by subaccount IDs.
* If true, the orderbook updates only include orders from provided subaccount
* IDs.
*/

filter_orders_by_subaccount_id: boolean;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
Expand Down Expand Up @@ -1298,7 +1312,8 @@ function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesReques
return {
clobPairId: [],
subaccountIds: [],
marketIds: []
marketIds: [],
filterOrdersBySubaccountId: false
};
}

Expand All @@ -1323,6 +1338,11 @@ export const StreamOrderbookUpdatesRequest = {
}

writer.ldelim();

if (message.filterOrdersBySubaccountId === true) {
writer.uint32(32).bool(message.filterOrdersBySubaccountId);
}

return writer;
},

Expand Down Expand Up @@ -1365,6 +1385,10 @@ export const StreamOrderbookUpdatesRequest = {

break;

case 4:
message.filterOrdersBySubaccountId = reader.bool();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -1379,6 +1403,7 @@ export const StreamOrderbookUpdatesRequest = {
message.clobPairId = object.clobPairId?.map(e => e) || [];
message.subaccountIds = object.subaccountIds?.map(e => SubaccountId.fromPartial(e)) || [];
message.marketIds = object.marketIds?.map(e => e) || [];
message.filterOrdersBySubaccountId = object.filterOrdersBySubaccountId ?? false;
return message;
}

Expand Down
5 changes: 5 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ message StreamOrderbookUpdatesRequest {

// Market ids for price updates.
repeated uint32 market_ids = 3;

// Filter order updates by subaccount IDs.
// If true, the orderbook updates only include orders from provided subaccount
// IDs.
bool filter_orders_by_subaccount_id = 4;
}

// StreamOrderbookUpdatesResponse is a response message for the
Expand Down
105 changes: 84 additions & 21 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,25 @@ package streaming

import (
"fmt"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/dydxprotocol/v4-chain/protocol/lib"
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/streaming/types"
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"

ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"

"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
)

var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
Expand Down Expand Up @@ -96,6 +94,23 @@ type OrderbookSubscription struct {
nextSnapshotBlock uint32
}

func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription(
clobPairIds []uint32,
subaccountIds []satypes.SubaccountId,
marketIds []uint32,
messageSender types.OutgoingMessageSender,
) *OrderbookSubscription {
return &OrderbookSubscription{
subscriptionId: sm.getNextAvailableSubscriptionId(),
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: subaccountIds,
marketIds: marketIds,
messageSender: messageSender,
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
}
}

func (sub *OrderbookSubscription) IsInitialized() bool {
return sub.initialized.Load()
}
Expand Down Expand Up @@ -187,11 +202,57 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32
return id
}

func doFilterStreamUpdateBySubaccount(
orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate,
subaccountIds []satypes.SubaccountId,
) (bool, error) {
for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates {
orderBookUpdateSubaccountId, err := streaming_util.GetOffChainUpdateV1SubaccountId(orderBookUpdate)
if err != nil {
return false, err
} else if slices.Contains(subaccountIds, orderBookUpdateSubaccountId) {
return true, nil
}
}
return false, nil
}

// If UpdateMessage is not a StreamUpdate_OrderUpdate, filter it
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, filter it
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop it
// If checking subaccount ids in a StreamUpdate_OrderUpdate results in an error, log error and drop it
func FilterStreamUpdateBySubaccount(
updates []clobtypes.StreamUpdate,
subaccountIds []satypes.SubaccountId,
logger log.Logger,
) []clobtypes.StreamUpdate {
// If reflection becomes too expensive, split updatesChannel by message type
filteredUpdates := []clobtypes.StreamUpdate{}
for _, update := range updates {
switch updateMessage := update.UpdateMessage.(type) {
case *clobtypes.StreamUpdate_OrderbookUpdate:
if updateMessage.OrderbookUpdate.Snapshot {
break
}
doFilter, err := doFilterStreamUpdateBySubaccount(updateMessage, subaccountIds)
if err != nil {
logger.Error(err.Error())
}
if !doFilter {
continue
}
}
filteredUpdates = append(filteredUpdates, update)
}
return filteredUpdates
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *FullNodeStreamingManagerImpl) Subscribe(
clobPairIds []uint32,
subaccountIds []*satypes.SubaccountId,
marketIds []uint32,
filterOrdersBySubAccountId bool,
messageSender types.OutgoingMessageSender,
) (
err error,
Expand All @@ -200,24 +261,19 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 {
return types.ErrInvalidStreamingRequest
}
if filterOrdersBySubAccountId && (len(subaccountIds) == 0) {
sm.logger.Error("filterOrdersBySubaccountId with no subaccountIds")
return types.ErrInvalidStreamingRequest
}

sm.Lock()
sIds := make([]satypes.SubaccountId, len(subaccountIds))
for i, subaccountId := range subaccountIds {
sIds[i] = *subaccountId
}

subscriptionId := sm.getNextAvailableSubscriptionId()
subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender)

subscription := &OrderbookSubscription{
subscriptionId: subscriptionId,
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: sIds,
marketIds: marketIds,
messageSender: messageSender,
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
}
for _, clobPairId := range clobPairIds {
// if clobPairId exists in the map, append the subscription id to the slice
// otherwise, create a new slice with the subscription id
Expand Down Expand Up @@ -254,10 +310,11 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(

sm.logger.Info(
fmt.Sprintf(
"New subscription id %+v for clob pair ids: %+v and subaccount ids: %+v",
"New subscription id %+v for clob pair ids: %+v and subaccount ids: %+v. filter orders by subaccount ids: %+v",
subscription.subscriptionId,
clobPairIds,
subaccountIds,
filterOrdersBySubAccountId,
),
)
sm.orderbookSubscriptions[subscription.subscriptionId] = subscription
Expand All @@ -268,6 +325,12 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range subscription.updatesChannel {
if filterOrdersBySubAccountId {
updates = FilterStreamUpdateBySubaccount(updates, sIds, sm.logger)
}
if len(updates) == 0 {
continue
}
metrics.IncrCounterWithLabels(
metrics.GrpcSendResponseToSubscriberCount,
1,
Expand Down Expand Up @@ -1080,12 +1143,12 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
sm.FlushStreamUpdatesWithLock()

// Cache updates to sync local ops queue
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)
sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds)

// Cache updates for finalized fills.
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(
Expand Down
Loading

0 comments on commit 3bb3a07

Please sign in to comment.