From 36313b11fa22c5585ffda7b14917117be0aaa6ea Mon Sep 17 00:00:00 2001 From: Matthew Weeks Date: Fri, 17 Jan 2025 16:49:06 -0500 Subject: [PATCH 1/2] Fix filterOrdersBySubaccountId param token and helper name --- .../src/codegen/dydxprotocol/clob/query.ts | 27 +- proto/dydxprotocol/clob/query.proto | 5 + .../streaming/full_node_streaming_manager.go | 123 ++++- .../full_node_streaming_manager_test.go | 476 ++++++++++++++++++ protocol/streaming/noop_streaming_manager.go | 1 + protocol/streaming/types/interface.go | 1 + protocol/streaming/util/util.go | 21 + protocol/streaming/util/util_test.go | 118 +++++ protocol/streaming/ws/websocket_server.go | 24 + .../x/clob/keeper/grpc_stream_orderbook.go | 1 + protocol/x/clob/types/query.pb.go | 266 ++++++---- 11 files changed, 932 insertions(+), 131 deletions(-) create mode 100644 protocol/streaming/full_node_streaming_manager_test.go create mode 100644 protocol/streaming/util/util_test.go diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts index b6032561973..ef7bf909526 100644 --- a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts @@ -278,6 +278,13 @@ export interface StreamOrderbookUpdatesRequest { /** Market ids for price updates. */ marketIds: number[]; + /** + * Filter order updates by subaccount IDs. + * If true, the orderbook updates only include orders from provided subaccount + * IDs. + */ + + filterOrdersBySubaccountId: boolean; } /** * StreamOrderbookUpdatesRequest is a request message for the @@ -293,6 +300,13 @@ export interface StreamOrderbookUpdatesRequestSDKType { /** Market ids for price updates. */ market_ids: number[]; + /** + * Filter order updates by subaccount IDs. + * If true, the orderbook updates only include orders from provided subaccount + * IDs. + */ + + filter_orders_by_subaccount_id: boolean; } /** * StreamOrderbookUpdatesResponse is a response message for the @@ -1298,7 +1312,8 @@ function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesReques return { clobPairId: [], subaccountIds: [], - marketIds: [] + marketIds: [], + filterOrdersBySubaccountId: false }; } @@ -1323,6 +1338,11 @@ export const StreamOrderbookUpdatesRequest = { } writer.ldelim(); + + if (message.filterOrdersBySubaccountId === true) { + writer.uint32(32).bool(message.filterOrdersBySubaccountId); + } + return writer; }, @@ -1365,6 +1385,10 @@ export const StreamOrderbookUpdatesRequest = { break; + case 4: + message.filterOrdersBySubaccountId = reader.bool(); + break; + default: reader.skipType(tag & 7); break; @@ -1379,6 +1403,7 @@ export const StreamOrderbookUpdatesRequest = { message.clobPairId = object.clobPairId?.map(e => e) || []; message.subaccountIds = object.subaccountIds?.map(e => SubaccountId.fromPartial(e)) || []; message.marketIds = object.marketIds?.map(e => e) || []; + message.filterOrdersBySubaccountId = object.filterOrdersBySubaccountId ?? false; return message; } diff --git a/proto/dydxprotocol/clob/query.proto b/proto/dydxprotocol/clob/query.proto index 5584a1e9506..7d6eeb1b612 100644 --- a/proto/dydxprotocol/clob/query.proto +++ b/proto/dydxprotocol/clob/query.proto @@ -186,6 +186,11 @@ message StreamOrderbookUpdatesRequest { // Market ids for price updates. repeated uint32 market_ids = 3; + + // Filter order updates by subaccount IDs. + // If true, the orderbook updates only include orders from provided subaccount + // IDs. + bool filter_orders_by_subaccount_id = 4; } // StreamOrderbookUpdatesResponse is a response message for the diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 939c7d29791..0a85bbe9209 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -2,27 +2,25 @@ package streaming import ( "fmt" + "slices" "sync" "sync/atomic" "time" - "github.com/dydxprotocol/v4-chain/protocol/lib" - pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" - satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" - "cosmossdk.io/log" storetypes "cosmossdk.io/store/types" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types" + "github.com/dydxprotocol/v4-chain/protocol/finalizeblock" + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" "github.com/dydxprotocol/v4-chain/protocol/streaming/types" streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" - - ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" - - "github.com/dydxprotocol/v4-chain/protocol/finalizeblock" + pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" ) var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil) @@ -96,6 +94,41 @@ type OrderbookSubscription struct { nextSnapshotBlock uint32 } +func NewOrderbookSubscription( + subscriptionId uint32, + clobPairIds []uint32, + subaccountIds []satypes.SubaccountId, + marketIds []uint32, + messageSender types.OutgoingMessageSender, + updatesChannel chan []clobtypes.StreamUpdate, +) *OrderbookSubscription { + return &OrderbookSubscription{ + subscriptionId: subscriptionId, + initialized: &atomic.Bool{}, // False by default. + clobPairIds: clobPairIds, + subaccountIds: subaccountIds, + marketIds: marketIds, + messageSender: messageSender, + updatesChannel: updatesChannel, + } +} + +func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription( + clobPairIds []uint32, + subaccountIds []satypes.SubaccountId, + marketIds []uint32, + messageSender types.OutgoingMessageSender, +) *OrderbookSubscription { + return NewOrderbookSubscription( + sm.getNextAvailableSubscriptionId(), + clobPairIds, + subaccountIds, + marketIds, + messageSender, + make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), + ) +} + func (sub *OrderbookSubscription) IsInitialized() bool { return sub.initialized.Load() } @@ -187,11 +220,58 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 return id } +func doFilterStreamUpdateBySubaccount( + orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate, + subaccountIdNumbers []uint32, + logger log.Logger, +) bool { + for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates { + orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate) + if err == nil { + if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) { + return true + } + } else { + logger.Error(err.Error()) + } + } + return false +} + +// Filter StreamUpdates for subaccountIdNumbers +// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message +// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new +// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts +func FilterStreamUpdateBySubaccount( + updates []clobtypes.StreamUpdate, + subaccountIdNumbers []uint32, + logger log.Logger, +) *[]clobtypes.StreamUpdate { + // If reflection becomes too expensive, split updatesChannel by message type + filteredUpdates := []clobtypes.StreamUpdate{} + for _, update := range updates { + switch updateMessage := update.UpdateMessage.(type) { + case *clobtypes.StreamUpdate_OrderbookUpdate: + if doFilterStreamUpdateBySubaccount(updateMessage, subaccountIdNumbers, logger) { + filteredUpdates = append(filteredUpdates, update) + } + default: + filteredUpdates = append(filteredUpdates, update) + } + } + + if len(filteredUpdates) > 0 { + return &filteredUpdates + } + return nil +} + // Subscribe subscribes to the orderbook updates stream. func (sm *FullNodeStreamingManagerImpl) Subscribe( clobPairIds []uint32, subaccountIds []*satypes.SubaccountId, marketIds []uint32, + filterOrdersBySubAccountId bool, messageSender types.OutgoingMessageSender, ) ( err error, @@ -200,24 +280,21 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 { return types.ErrInvalidStreamingRequest } + if filterOrdersBySubAccountId && (len(subaccountIds) == 0) { + sm.logger.Error("filterOrdersBySubaccountId with no subaccountIds") + return types.ErrInvalidStreamingRequest + } sm.Lock() sIds := make([]satypes.SubaccountId, len(subaccountIds)) + subaccountIdNumbers := make([]uint32, len(subaccountIds)) for i, subaccountId := range subaccountIds { sIds[i] = *subaccountId + subaccountIdNumbers[i] = subaccountId.Number } - subscriptionId := sm.getNextAvailableSubscriptionId() + subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender) - subscription := &OrderbookSubscription{ - subscriptionId: subscriptionId, - initialized: &atomic.Bool{}, // False by default. - clobPairIds: clobPairIds, - subaccountIds: sIds, - marketIds: marketIds, - messageSender: messageSender, - updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), - } for _, clobPairId := range clobPairIds { // if clobPairId exists in the map, append the subscription id to the slice // otherwise, create a new slice with the subscription id @@ -268,6 +345,12 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( // Use current goroutine to consistently poll subscription channel for updates // to send through stream. for updates := range subscription.updatesChannel { + if filterOrdersBySubAccountId { + filteredUpdates := FilterStreamUpdateBySubaccount(updates, subaccountIdNumbers, sm.logger) + if filteredUpdates != nil { + updates = *filteredUpdates + } + } metrics.IncrCounterWithLabels( metrics.GrpcSendResponseToSubscriberCount, 1, @@ -1080,12 +1163,12 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( sm.FlushStreamUpdatesWithLock() // Cache updates to sync local ops queue - sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( + syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) - sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds) + sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds) // Cache updates for finalized fills. fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills( diff --git a/protocol/streaming/full_node_streaming_manager_test.go b/protocol/streaming/full_node_streaming_manager_test.go new file mode 100644 index 00000000000..2f3546d337e --- /dev/null +++ b/protocol/streaming/full_node_streaming_manager_test.go @@ -0,0 +1,476 @@ +package streaming_test + +import ( + "testing" + "time" + + sdktypes "github.com/cosmos/cosmos-sdk/types" + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + pv1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types" + sharedtypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types" + "github.com/dydxprotocol/v4-chain/protocol/mocks" + streaming "github.com/dydxprotocol/v4-chain/protocol/streaming" + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + "github.com/stretchr/testify/require" +) + +const ( + maxSubscriptionChannelSize = 1024 + owner = "foo" + noMessagesMaxSleep = 10 * time.Millisecond +) + +func OpenOrder( + order *pv1types.IndexerOrder, + timestamp *time.Time, +) ocutypes.OffChainUpdateV1 { + return ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{ + OrderPlace: &ocutypes.OrderPlaceV1{ + Order: order, + PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_OPENED, + TimeStamp: timestamp, + }, + }, + } +} + +func CancelOrder( + removedOrderId *pv1types.IndexerOrderId, + timestamp *time.Time, +) ocutypes.OffChainUpdateV1 { + return ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderRemove{ + OrderRemove: &ocutypes.OrderRemoveV1{ + RemovedOrderId: removedOrderId, + Reason: sharedtypes.OrderRemovalReason(ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_CANCELED), + RemovalStatus: ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_CANCELED, + TimeStamp: timestamp, + }, + }, + } +} + +func ReplaceOrder( + oldOrderId *pv1types.IndexerOrderId, + newOrder *pv1types.IndexerOrder, + timestamp *time.Time, +) ocutypes.OffChainUpdateV1 { + return ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderReplace{ + OrderReplace: &ocutypes.OrderReplaceV1{ + OldOrderId: oldOrderId, + Order: newOrder, + PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_OPENED, + TimeStamp: timestamp, + }, + }, + } +} + +func UpdateOrder(orderId *pv1types.IndexerOrderId, totalFilledQuantums uint64) ocutypes.OffChainUpdateV1 { + return ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderUpdate{ + OrderUpdate: &ocutypes.OrderUpdateV1{ + OrderId: orderId, + TotalFilledQuantums: totalFilledQuantums, + }, + }, + } +} + +func toStreamUpdate(offChainUpdates ...ocutypes.OffChainUpdateV1) clobtypes.StreamUpdate { + return clobtypes.StreamUpdate{ + BlockHeight: uint32(0), + ExecMode: uint32(sdktypes.ExecModeFinalize), + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: offChainUpdates, + Snapshot: true, + }, + }, + } +} + +type MockMessageSender struct{} + +func (mms *MockMessageSender) Send(*clobtypes.StreamOrderbookUpdatesResponse) error { + return nil +} + +func NewOrderbookSubscription( + ids []uint32, + updatesChannel chan []clobtypes.StreamUpdate, +) *streaming.OrderbookSubscription { + sIds := []satypes.SubaccountId{} + for _, id := range ids { + sIds = append(sIds, satypes.SubaccountId{Owner: owner, Number: id}) + } + return streaming.NewOrderbookSubscription( + 0, + []uint32{}, + sIds, + []uint32{}, + &MockMessageSender{}, + updatesChannel, + ) +} + +func NewStreamOrderbookFill( + blockHeight uint32, + execMode uint32, +) *clobtypes.StreamUpdate { + return &clobtypes.StreamUpdate{ + BlockHeight: blockHeight, + ExecMode: execMode, + UpdateMessage: &clobtypes.StreamUpdate_OrderFill{ + OrderFill: nil, + }, + } +} + +func NewStreamTakerOrder( + blockHeight uint32, + execMode uint32, + order *clobtypes.Order, + remainingQuantums uint64, + optimisticallyFilledQuantums uint64, +) *clobtypes.StreamUpdate { + return &clobtypes.StreamUpdate{ + BlockHeight: blockHeight, + ExecMode: execMode, + UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{ + TakerOrder: &clobtypes.StreamTakerOrder{ + TakerOrder: &clobtypes.StreamTakerOrder_Order{ + Order: order, + }, + TakerOrderStatus: &clobtypes.StreamTakerOrderStatus{ + OrderStatus: uint32(clobtypes.Success), + RemainingQuantums: remainingQuantums, + OptimisticallyFilledQuantums: optimisticallyFilledQuantums, + }, + }, + }, + } +} + +func NewSubaccountUpdate( + blockHeight uint32, + execMode uint32, + subaccountId *satypes.SubaccountId, +) *clobtypes.StreamUpdate { + return &clobtypes.StreamUpdate{ + BlockHeight: blockHeight, + ExecMode: execMode, + UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{ + SubaccountUpdate: &satypes.StreamSubaccountUpdate{ + SubaccountId: subaccountId, + UpdatedPerpetualPositions: []*satypes.SubaccountPerpetualPosition{}, + UpdatedAssetPositions: []*satypes.SubaccountAssetPosition{}, + Snapshot: true, + }, + }, + } +} + +func NewPriceUpdate( + blockHeight uint32, + execMode uint32, +) *clobtypes.StreamUpdate { + return &clobtypes.StreamUpdate{ + BlockHeight: blockHeight, + ExecMode: execMode, + UpdateMessage: &clobtypes.StreamUpdate_PriceUpdate{ + PriceUpdate: &pricestypes.StreamPriceUpdate{ + MarketId: 1, + Price: pricestypes.MarketPrice{ + Id: 1, + Exponent: 6, + Price: 1, + }, + Snapshot: true, + }, + }, + } +} + +func NewIndexerOrderId(owner string, id uint32) pv1types.IndexerOrderId { + return pv1types.IndexerOrderId{ + SubaccountId: pv1types.IndexerSubaccountId{ + Owner: owner, + Number: id, + }, + ClientId: 0, + OrderFlags: 0, + ClobPairId: 0, + } +} + +func NewOrderId(owner string, id uint32) clobtypes.OrderId { + return clobtypes.OrderId{ + SubaccountId: satypes.SubaccountId{ + Owner: owner, + Number: id, + }, + ClientId: 0, + OrderFlags: 0, + ClobPairId: 0, + } +} + +func NewIndexerOrder(id pv1types.IndexerOrderId) pv1types.IndexerOrder { + return pv1types.IndexerOrder{ + OrderId: id, + Side: pv1types.IndexerOrder_SIDE_BUY, + Quantums: uint64(1_000_000), + Subticks: 1, + GoodTilOneof: &pv1types.IndexerOrder_GoodTilBlock{ + GoodTilBlock: 1_000_000_000, + }, + TimeInForce: 1_000_000_000, + ReduceOnly: false, + ClientMetadata: 0, + ConditionType: pv1types.IndexerOrder_CONDITION_TYPE_UNSPECIFIED, + ConditionalOrderTriggerSubticks: 0, + } +} + +func NewOrder(id clobtypes.OrderId) *clobtypes.Order { + return &clobtypes.Order{ + OrderId: id, + Side: clobtypes.Order_SIDE_BUY, + Quantums: uint64(1_000_000), + Subticks: 1, + GoodTilOneof: &clobtypes.Order_GoodTilBlock{ + GoodTilBlock: 1_000_000_000, + }, + TimeInForce: 1_000_000_000, + ReduceOnly: false, + ClientMetadata: 0, + ConditionType: clobtypes.Order_CONDITION_TYPE_UNSPECIFIED, + ConditionalOrderTriggerSubticks: 0, + } +} + +type TestCase struct { + updates []clobtypes.StreamUpdate + subaccountIds []uint32 + filteredUpdates *[]clobtypes.StreamUpdate +} + +func TestFilterStreamUpdates(t *testing.T) { + logger := &mocks.Logger{} + + subaccountIdNumber := uint32(1337) + orderId := NewIndexerOrderId("foo", subaccountIdNumber) + order := NewIndexerOrder(orderId) + + otherSubaccountIdNumber := uint32(2600) + otherOrderId := NewIndexerOrderId("bar", otherSubaccountIdNumber) + otherOrder := NewIndexerOrder(otherOrderId) + + newOrderId := order.OrderId + newOrderId.ClientId += 1 + newOrder := NewIndexerOrder(newOrderId) + + otherNewOrderId := otherOrder.OrderId + otherNewOrderId.ClientId += 1 + otherNewOrder := NewIndexerOrder(otherNewOrderId) + + orderPlaceTime := time.Date(2024, 12, 25, 0, 0, 0, 0, time.UTC) + openOrder := OpenOrder(&order, &orderPlaceTime) + orderCancelTime := orderPlaceTime.Add(time.Second) + cancelOrder := CancelOrder(&orderId, &orderCancelTime) + orderReplaceTime := orderPlaceTime.Add(time.Minute) + replaceOrder := ReplaceOrder(&orderId, &newOrder, &orderReplaceTime) + updateOrder := UpdateOrder(&orderId, uint64(1988)) + + otherOpenOrder := OpenOrder(&otherOrder, &orderPlaceTime) + otherCancelOrder := CancelOrder(&otherOrderId, &orderCancelTime) + otherReplaceOrder := ReplaceOrder(&otherOrderId, &otherNewOrder, &orderReplaceTime) + otherUpdateOrder := UpdateOrder(&otherOrderId, uint64(1999)) + + baseStreamUpdate := toStreamUpdate(openOrder, cancelOrder, replaceOrder, updateOrder) + otherStreamUpdate := toStreamUpdate(otherOpenOrder, otherCancelOrder, otherReplaceOrder, otherUpdateOrder) + bothStreamUpdate := toStreamUpdate( + openOrder, + cancelOrder, + replaceOrder, + updateOrder, + otherOpenOrder, + otherCancelOrder, + otherReplaceOrder, + otherUpdateOrder, + ) + + orderBookFillUpdate := NewStreamOrderbookFill(0, 0) + clobOrder := NewOrder(NewOrderId("foo", 23)) + takerOrderUpdate := NewStreamTakerOrder(0, 0, clobOrder, 0, 0) + subaccountUpdate := NewSubaccountUpdate( + 0, + 0, + (*satypes.SubaccountId)(&orderId.SubaccountId), + ) + priceUpdate := NewPriceUpdate(0, 0) + + tests := map[string]TestCase{ + "baseInScope": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate}, + subaccountIds: []uint32{orderId.SubaccountId.Number}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate}, + }, + "baseNotInScope": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate}, + subaccountIds: []uint32{0}, + filteredUpdates: nil, + }, + "otherInScope": { + updates: []clobtypes.StreamUpdate{otherStreamUpdate}, + subaccountIds: []uint32{otherSubaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{otherStreamUpdate}, + }, + "otherNotInScope": { + updates: []clobtypes.StreamUpdate{otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: nil, + }, + "bothInScope": { + updates: []clobtypes.StreamUpdate{bothStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber, otherSubaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, + }, + "bothOtherInScope": { + updates: []clobtypes.StreamUpdate{bothStreamUpdate}, + subaccountIds: []uint32{otherSubaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, + }, + "bothSequentiallyOtherInScope": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, otherStreamUpdate}, + subaccountIds: []uint32{otherSubaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{otherStreamUpdate}, + }, + "bothBaseInScope": { + updates: []clobtypes.StreamUpdate{bothStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, + }, + "bothSequentiallyBaseInScope": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate}, + }, + "bothNoneInScopeWrongId": { + updates: []clobtypes.StreamUpdate{bothStreamUpdate}, + subaccountIds: []uint32{404}, + filteredUpdates: nil, + }, + "bothNoneInScopeNoId": { + updates: []clobtypes.StreamUpdate{bothStreamUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: nil, + }, + "noUpdates": { + updates: []clobtypes.StreamUpdate{}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: nil, + }, + "noUpdatesNoId": { + updates: []clobtypes.StreamUpdate{}, + subaccountIds: []uint32{}, + filteredUpdates: nil, + }, + "orderBookFillUpdates": { + updates: []clobtypes.StreamUpdate{*orderBookFillUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*orderBookFillUpdate}, + }, + "orderBookFillUpdatesDropUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate, otherStreamUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*orderBookFillUpdate}, + }, + "orderBookFillUpdatesFilterUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate}, + }, + "orderBookFillUpdatesFilterAndDropUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate, otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate}, + }, + "takerOrderUpdates": { + updates: []clobtypes.StreamUpdate{*takerOrderUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*takerOrderUpdate}, + }, + "takerOrderUpdatesDropUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate, otherStreamUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*takerOrderUpdate}, + }, + "takerOrderUpdatesFilterUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate}, + }, + "takerOrderUpdatesFilterAndDropUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate, otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate}, + }, + "subaccountUpdates": { + updates: []clobtypes.StreamUpdate{*subaccountUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*subaccountUpdate}, + }, + "subaccountUpdatesDropUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate, otherStreamUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*subaccountUpdate}, + }, + "subaccountUpdatesFilterUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate}, + }, + "subaccountUpdatesFilterAndDropUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate, otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate}, + }, + "priceUpdates": { + updates: []clobtypes.StreamUpdate{*priceUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*priceUpdate}, + }, + "priceUpdatesDropUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate, otherStreamUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*priceUpdate}, + }, + "priceUpdatesFilterUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate}, + }, + "priceUpdatesFilterAndDropUpdate": { + updates: []clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate, otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate}, + }, + } + + for name, testCase := range tests { + t.Run(name, func(t *testing.T) { + filteredUpdates := streaming.FilterStreamUpdateBySubaccount(testCase.updates, testCase.subaccountIds, logger) + if testCase.filteredUpdates != nil { + require.Equal(t, *filteredUpdates, *testCase.filteredUpdates) + } else { + require.Nil(t, filteredUpdates) + } + }) + } +} diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index bb81af1b43a..fc743d3fe1e 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -24,6 +24,7 @@ func (sm *NoopGrpcStreamingManager) Subscribe( _ []uint32, _ []*satypes.SubaccountId, _ []uint32, + _ bool, _ types.OutgoingMessageSender, ) ( err error, diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index 41657e7301f..53ea59a1253 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -16,6 +16,7 @@ type FullNodeStreamingManager interface { clobPairIds []uint32, subaccountIds []*satypes.SubaccountId, marketIds []uint32, + filterOrders bool, srv OutgoingMessageSender, ) ( err error, diff --git a/protocol/streaming/util/util.go b/protocol/streaming/util/util.go index bbf37e3340e..bdec459c5ab 100644 --- a/protocol/streaming/util/util.go +++ b/protocol/streaming/util/util.go @@ -21,3 +21,24 @@ func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes } return v1updates } + +// Error expected if OffChainUpdateV1.UpdateMessage message type is extended to more order events +func GetOffChainUpdateV1SubaccountIdNumber(update ocutypes.OffChainUpdateV1) (uint32, error) { + var orderSubaccountIdNumber uint32 + switch updateMessage := update.UpdateMessage.(type) { + case *ocutypes.OffChainUpdateV1_OrderPlace: + orderSubaccountIdNumber = updateMessage.OrderPlace.Order.OrderId.SubaccountId.Number + case *ocutypes.OffChainUpdateV1_OrderRemove: + orderSubaccountIdNumber = updateMessage.OrderRemove.RemovedOrderId.SubaccountId.Number + case *ocutypes.OffChainUpdateV1_OrderUpdate: + orderSubaccountIdNumber = updateMessage.OrderUpdate.OrderId.SubaccountId.Number + case *ocutypes.OffChainUpdateV1_OrderReplace: + orderSubaccountIdNumber = updateMessage.OrderReplace.Order.OrderId.SubaccountId.Number + default: + return 0, fmt.Errorf( + "UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}: %+v", + updateMessage, + ) + } + return orderSubaccountIdNumber, nil +} diff --git a/protocol/streaming/util/util_test.go b/protocol/streaming/util/util_test.go new file mode 100644 index 00000000000..111cb8f4af2 --- /dev/null +++ b/protocol/streaming/util/util_test.go @@ -0,0 +1,118 @@ +package util_test + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + pv1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types" + stypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types" + "github.com/dydxprotocol/v4-chain/protocol/streaming/util" +) + +func _ToPtr[V any](v V) *V { + return &v +} + +func TestGetOffChainUpdateV1SubaccountIdNumber(t *testing.T) { + subaccountIdNumber := uint32(1337) + orderId := pv1types.IndexerOrderId{ + SubaccountId: pv1types.IndexerSubaccountId{ + Owner: "foo", + Number: uint32(subaccountIdNumber), + }, + ClientId: 0, + OrderFlags: 0, + ClobPairId: 0, + } + order := pv1types.IndexerOrder{ + OrderId: orderId, + Side: pv1types.IndexerOrder_SIDE_BUY, + Quantums: uint64(10 ^ 6), + Subticks: 1, + GoodTilOneof: &pv1types.IndexerOrder_GoodTilBlock{ + GoodTilBlock: 10 ^ 9, + }, + TimeInForce: 10 ^ 9, + ReduceOnly: false, + ClientMetadata: 0, + ConditionType: pv1types.IndexerOrder_CONDITION_TYPE_UNSPECIFIED, + ConditionalOrderTriggerSubticks: 0, + } + newOrder := order + newOrder.Quantums += 10 ^ 6 + + orderPlaceTime := time.Now() + fillQuantums := uint64(1988) + + tests := map[string]struct { + update ocutypes.OffChainUpdateV1 + id uint32 + err error + }{ + "OrderPlace": { + update: ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{ + OrderPlace: &ocutypes.OrderPlaceV1{ + Order: &order, + PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED, + TimeStamp: _ToPtr(orderPlaceTime), + }, + }, + }, + id: subaccountIdNumber, + err: nil, + }, + "OrderRemove": { + update: ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderRemove{ + OrderRemove: &ocutypes.OrderRemoveV1{ + RemovedOrderId: &orderId, + Reason: stypes.OrderRemovalReason_ORDER_REMOVAL_REASON_USER_CANCELED, + RemovalStatus: ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_CANCELED, + TimeStamp: _ToPtr(orderPlaceTime.Add(1 * time.Second)), + }, + }, + }, + id: subaccountIdNumber, + err: nil, + }, + "OrderUpdate": { + update: ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderUpdate{ + OrderUpdate: &ocutypes.OrderUpdateV1{ + OrderId: &orderId, + TotalFilledQuantums: fillQuantums, + }, + }, + }, + id: subaccountIdNumber, + err: nil, + }, + "OrderReplace": { + update: ocutypes.OffChainUpdateV1{ + UpdateMessage: &ocutypes.OffChainUpdateV1_OrderReplace{ + OrderReplace: &ocutypes.OrderReplaceV1{ + OldOrderId: &orderId, + Order: &newOrder, + PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_OPENED, + TimeStamp: _ToPtr(orderPlaceTime.Add(3 * time.Second)), + }, + }, + }, + id: subaccountIdNumber, + err: nil, + }, + } + for name, testCase := range tests { + t.Run(name, func(t *testing.T) { + id, err := util.GetOffChainUpdateV1SubaccountIdNumber(testCase.update) + fmt.Println("expected", id) + require.Equal(t, err, testCase.err) + require.Equal(t, id, testCase.id) + }) + } +} diff --git a/protocol/streaming/ws/websocket_server.go b/protocol/streaming/ws/websocket_server.go index 33a7434e427..c482ac5bd87 100644 --- a/protocol/streaming/ws/websocket_server.go +++ b/protocol/streaming/ws/websocket_server.go @@ -97,6 +97,16 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { return } + // Parse filterOrders from query parameters + filterOrders, err := parseFilterOrdersBySubaccountId(r) + if err != nil { + ws.logger.Error("Error parsing filterOrders", "err", err) + if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil { + ws.logger.Error("Error sending close message", "err", err) + } + return + } + websocketMessageSender := &WebsocketMessageSender{ cdc: ws.cdc, conn: conn, @@ -110,6 +120,7 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { clobPairIds, subaccountIds, marketIds, + filterOrders, websocketMessageSender, ) if err != nil { @@ -169,6 +180,19 @@ func parseSubaccountIds(r *http.Request) ([]*satypes.SubaccountId, error) { return subaccountIds, nil } +// parseFilterOrdersBySubaccountId is a helper function to parse the filterOrdersBySubaccountId flag from the query parameters. +func parseFilterOrdersBySubaccountId(r *http.Request) (bool, error) { + token := r.URL.Query().Get("filterOrdersBySubaccountId") + if token == "" { + return false, nil + } + value, err := strconv.ParseBool(token) + if err != nil { + return false, fmt.Errorf("invalid filterOrdersBySubaccountId: %s", token) + } + return value, nil +} + // parseUint32 is a helper function to parse the uint32 from the query parameters. func parseUint32(r *http.Request, queryParam string) ([]uint32, error) { param := r.URL.Query().Get(queryParam) diff --git a/protocol/x/clob/keeper/grpc_stream_orderbook.go b/protocol/x/clob/keeper/grpc_stream_orderbook.go index 029266901ac..0de49b3c815 100644 --- a/protocol/x/clob/keeper/grpc_stream_orderbook.go +++ b/protocol/x/clob/keeper/grpc_stream_orderbook.go @@ -12,6 +12,7 @@ func (k Keeper) StreamOrderbookUpdates( req.GetClobPairId(), req.GetSubaccountIds(), req.GetMarketIds(), + req.GetFilterOrdersBySubaccountId(), stream, ) if err != nil { diff --git a/protocol/x/clob/types/query.pb.go b/protocol/x/clob/types/query.pb.go index b2592069108..335430b2c04 100644 --- a/protocol/x/clob/types/query.pb.go +++ b/protocol/x/clob/types/query.pb.go @@ -858,6 +858,10 @@ type StreamOrderbookUpdatesRequest struct { SubaccountIds []*types.SubaccountId `protobuf:"bytes,2,rep,name=subaccount_ids,json=subaccountIds,proto3" json:"subaccount_ids,omitempty"` // Market ids for price updates. MarketIds []uint32 `protobuf:"varint,3,rep,packed,name=market_ids,json=marketIds,proto3" json:"market_ids,omitempty"` + // Filter order updates by subaccount IDs. + // If true, the orderbook updates only include orders from provided subaccount + // IDs. + FilterOrdersBySubaccountId bool `protobuf:"varint,4,opt,name=filter_orders_by_subaccount_id,json=filterOrdersBySubaccountId,proto3" json:"filter_orders_by_subaccount_id,omitempty"` } func (m *StreamOrderbookUpdatesRequest) Reset() { *m = StreamOrderbookUpdatesRequest{} } @@ -914,6 +918,13 @@ func (m *StreamOrderbookUpdatesRequest) GetMarketIds() []uint32 { return nil } +func (m *StreamOrderbookUpdatesRequest) GetFilterOrdersBySubaccountId() bool { + if m != nil { + return m.FilterOrdersBySubaccountId + } + return false +} + // StreamOrderbookUpdatesResponse is a response message for the // StreamOrderbookUpdates method. type StreamOrderbookUpdatesResponse struct { @@ -1440,118 +1451,120 @@ func init() { func init() { proto.RegisterFile("dydxprotocol/clob/query.proto", fileDescriptor_3365c195b25c5bc0) } var fileDescriptor_3365c195b25c5bc0 = []byte{ - // 1770 bytes of a gzipped FileDescriptorProto + // 1798 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x58, 0x4d, 0x6c, 0xdc, 0xc6, 0x15, 0x5e, 0x4a, 0xb2, 0x2d, 0xbd, 0xb5, 0x14, 0x69, 0x1c, 0x3b, 0x9b, 0x95, 0xbc, 0x92, 0xe9, - 0x58, 0xd6, 0x3a, 0xf1, 0x52, 0x56, 0x82, 0x20, 0xb5, 0x8b, 0x14, 0x96, 0x5b, 0x59, 0x42, 0xad, - 0x44, 0xa1, 0x14, 0x47, 0x68, 0x03, 0x10, 0xb3, 0xe4, 0x68, 0x35, 0x10, 0xc9, 0x59, 0x91, 0xc3, - 0x85, 0x84, 0xa2, 0x28, 0xd0, 0x43, 0x2e, 0x6d, 0x81, 0x02, 0x3d, 0xf4, 0x50, 0xf4, 0xd4, 0x73, - 0xd1, 0x5e, 0x7a, 0xec, 0xdf, 0x2d, 0x47, 0x03, 0xbd, 0xf4, 0x50, 0x14, 0x85, 0xdd, 0x73, 0x8f, - 0x3d, 0x07, 0x9c, 0x19, 0xee, 0x92, 0x4b, 0x72, 0x25, 0xeb, 0x22, 0x71, 0xde, 0xbc, 0xf7, 0xcd, - 0xf7, 0x66, 0xde, 0xbc, 0xf7, 0x66, 0xe1, 0xa6, 0x73, 0xea, 0x9c, 0x74, 0x03, 0xc6, 0x99, 0xcd, - 0x5c, 0xc3, 0x76, 0x59, 0xdb, 0x38, 0x8e, 0x48, 0x70, 0xda, 0x12, 0x32, 0x34, 0x97, 0x9e, 0x6e, - 0xc5, 0xd3, 0xf5, 0x37, 0x3b, 0xac, 0xc3, 0x84, 0xc8, 0x88, 0xbf, 0xa4, 0x62, 0x7d, 0xa1, 0xc3, - 0x58, 0xc7, 0x25, 0x06, 0xee, 0x52, 0x03, 0xfb, 0x3e, 0xe3, 0x98, 0x53, 0xe6, 0x87, 0x6a, 0xf6, - 0x9e, 0xcd, 0x42, 0x8f, 0x85, 0x46, 0x1b, 0x87, 0x44, 0xe2, 0x1b, 0xbd, 0x07, 0x6d, 0xc2, 0xf1, - 0x03, 0xa3, 0x8b, 0x3b, 0xd4, 0x17, 0xca, 0x4a, 0xd7, 0xc8, 0x33, 0x6a, 0xbb, 0xcc, 0x3e, 0xb2, - 0x02, 0xcc, 0x89, 0xe5, 0x52, 0x8f, 0x72, 0xcb, 0x66, 0xfe, 0x01, 0xed, 0x28, 0x83, 0x5b, 0x79, - 0x83, 0xf8, 0x8f, 0xd5, 0xc5, 0x34, 0x50, 0x2a, 0xab, 0x79, 0x15, 0x72, 0x1c, 0x51, 0x7e, 0x6a, - 0x71, 0x4a, 0x82, 0x22, 0xd0, 0x82, 0x7d, 0x61, 0x81, 0x43, 0x12, 0xc0, 0xc5, 0xfc, 0xb4, 0x87, - 0xb9, 0x7d, 0x48, 0x12, 0x8f, 0xdf, 0xcd, 0x2b, 0xb8, 0xf4, 0x38, 0xa2, 0x8e, 0xdc, 0x97, 0xec, - 0x62, 0xf3, 0x05, 0x68, 0xa4, 0xa7, 0x26, 0x3f, 0xce, 0x4c, 0x52, 0xdf, 0x21, 0x27, 0x24, 0x30, - 0xd8, 0xc1, 0x81, 0x65, 0x1f, 0x62, 0xea, 0x5b, 0x51, 0xd7, 0xc1, 0x9c, 0x84, 0x79, 0x89, 0xb2, - 0x5f, 0xc9, 0xd8, 0x87, 0x51, 0x1b, 0xdb, 0x36, 0x8b, 0x7c, 0x1e, 0x1a, 0x21, 0x0f, 0x08, 0xf6, - 0xa8, 0x9f, 0xd0, 0x68, 0x96, 0x6b, 0xf6, 0xbf, 0x95, 0xea, 0xed, 0x8c, 0x6a, 0x37, 0xa0, 0x36, - 0xc9, 0xe1, 0xe9, 0x4d, 0x78, 0xeb, 0xb3, 0xf8, 0xac, 0x9f, 0x12, 0xfe, 0xc4, 0x65, 0xed, 0x1d, - 0x4c, 0x03, 0x93, 0x1c, 0x47, 0x24, 0xe4, 0x68, 0x06, 0xc6, 0xa8, 0x53, 0xd3, 0x96, 0xb4, 0x95, - 0x69, 0x73, 0x8c, 0x3a, 0xfa, 0x17, 0x70, 0x5d, 0xa8, 0x0e, 0xf4, 0xc2, 0x2e, 0xf3, 0x43, 0x82, - 0x3e, 0x86, 0xa9, 0xfe, 0x61, 0x0a, 0xfd, 0xea, 0xda, 0x7c, 0x2b, 0x17, 0x94, 0xad, 0xc4, 0x6e, - 0x7d, 0xe2, 0xeb, 0x7f, 0x2f, 0x56, 0xcc, 0x49, 0x5b, 0x8d, 0x75, 0xac, 0x38, 0x3c, 0x76, 0xdd, - 0x61, 0x0e, 0x1b, 0x00, 0x83, 0xe0, 0x53, 0xd8, 0xcb, 0x2d, 0x19, 0xa9, 0xad, 0x38, 0x52, 0x5b, - 0xf2, 0x26, 0xa8, 0x48, 0x6d, 0xed, 0xe0, 0x0e, 0x51, 0xb6, 0x66, 0xca, 0x52, 0xff, 0x9d, 0x06, - 0xb5, 0x0c, 0xf9, 0xc7, 0xae, 0x5b, 0xc6, 0x7f, 0xfc, 0x35, 0xf9, 0xa3, 0xa7, 0x19, 0x92, 0x63, - 0x82, 0xe4, 0xdd, 0x33, 0x49, 0xca, 0xc5, 0x33, 0x2c, 0xff, 0xa5, 0xc1, 0xe2, 0x36, 0xe9, 0x7d, - 0xc2, 0x1c, 0xb2, 0xc7, 0xe2, 0xbf, 0x4f, 0xb0, 0x6b, 0x47, 0xae, 0x98, 0x4c, 0x76, 0xe4, 0x4b, - 0xb8, 0x21, 0xaf, 0x5a, 0x37, 0x60, 0x5d, 0x16, 0x92, 0xc0, 0x52, 0x41, 0xdd, 0xdf, 0x9d, 0x3c, - 0xf3, 0xe7, 0xd8, 0x8d, 0x83, 0x9a, 0x05, 0xdb, 0xa4, 0xb7, 0x2d, 0xb5, 0xcd, 0x37, 0x05, 0xca, - 0x8e, 0x02, 0x51, 0x52, 0xf4, 0x43, 0xb8, 0xde, 0x4b, 0x94, 0x2d, 0x8f, 0xf4, 0x2c, 0x8f, 0xf0, - 0x80, 0xda, 0x61, 0xdf, 0xab, 0x3c, 0x78, 0x86, 0xf0, 0xb6, 0x54, 0x37, 0xaf, 0xf5, 0xd2, 0x4b, - 0x4a, 0xa1, 0xfe, 0x3f, 0x0d, 0x96, 0xca, 0xdd, 0x53, 0x87, 0xd1, 0x81, 0x2b, 0x01, 0x09, 0x23, - 0x97, 0x87, 0xea, 0x28, 0x9e, 0x9e, 0xb5, 0x66, 0x01, 0x4a, 0xac, 0xf0, 0xd8, 0x77, 0x9e, 0x33, - 0x37, 0xf2, 0xc8, 0x0e, 0x09, 0xe2, 0xa3, 0x53, 0xc7, 0x96, 0xa0, 0xd7, 0x31, 0x5c, 0x2b, 0xd0, - 0x42, 0x4b, 0x70, 0xb5, 0x1f, 0x0c, 0x56, 0x3f, 0xfe, 0x21, 0x39, 0xec, 0x2d, 0x07, 0xcd, 0xc2, - 0xb8, 0x47, 0x7a, 0x62, 0x47, 0xc6, 0xcc, 0xf8, 0x13, 0xdd, 0x80, 0xcb, 0x3d, 0x01, 0x52, 0x1b, - 0x5f, 0xd2, 0x56, 0x26, 0x4c, 0x35, 0xd2, 0xef, 0xc1, 0x8a, 0x08, 0xba, 0xef, 0x89, 0x3c, 0xb6, - 0x47, 0x49, 0xf0, 0x2c, 0xce, 0x62, 0x4f, 0x44, 0x5e, 0x89, 0x82, 0xf4, 0xb9, 0xea, 0xbf, 0xd1, - 0xa0, 0x79, 0x0e, 0x65, 0xb5, 0x4b, 0x3e, 0xd4, 0xca, 0x92, 0xa3, 0x8a, 0x03, 0xa3, 0x60, 0xdb, - 0x46, 0x41, 0xab, 0xed, 0xb9, 0x4e, 0x8a, 0x74, 0xf4, 0x26, 0xdc, 0x15, 0xe4, 0xd6, 0xe3, 0xa0, - 0x31, 0x31, 0x27, 0xe5, 0x8e, 0xfc, 0x5a, 0x53, 0x5e, 0x8f, 0xd4, 0x55, 0x7e, 0x1c, 0xc1, 0x5b, - 0x25, 0x85, 0x43, 0xb9, 0xd1, 0x2a, 0x70, 0x63, 0x04, 0xb0, 0xf2, 0x42, 0x06, 0xf7, 0x90, 0x8a, - 0xbe, 0x0f, 0x6f, 0x0b, 0x62, 0xbb, 0x1c, 0x73, 0x72, 0x10, 0xb9, 0x9f, 0xc6, 0xc5, 0x22, 0xb9, - 0x57, 0x8f, 0x60, 0x52, 0x14, 0x8f, 0xe4, 0xcc, 0xab, 0x6b, 0xf5, 0x82, 0xa5, 0x85, 0xc9, 0x96, - 0x93, 0xc4, 0x12, 0x93, 0x43, 0xfd, 0x4f, 0x1a, 0xd4, 0x8b, 0xa0, 0x95, 0x97, 0xfb, 0xf0, 0x86, - 0xc4, 0xee, 0xba, 0xd8, 0x26, 0x1e, 0xf1, 0xb9, 0x5a, 0xa2, 0x59, 0xb0, 0xc4, 0x33, 0xe6, 0x77, - 0xf6, 0x48, 0xe0, 0x09, 0x88, 0x9d, 0xc4, 0x40, 0xad, 0x38, 0xc3, 0x32, 0x52, 0xb4, 0x08, 0xd5, - 0x03, 0xea, 0xba, 0x16, 0xf6, 0xe2, 0xc4, 0x2f, 0x62, 0x72, 0xc2, 0x84, 0x58, 0xf4, 0x58, 0x48, - 0xd0, 0x02, 0x4c, 0xf1, 0x80, 0x76, 0x3a, 0x24, 0x20, 0x8e, 0x88, 0xce, 0x49, 0x73, 0x20, 0xd0, - 0xef, 0xc2, 0x1d, 0x41, 0xfb, 0x59, 0xaa, 0xec, 0x15, 0x1e, 0xea, 0x57, 0x1a, 0x2c, 0x9f, 0xa5, - 0xa9, 0x9c, 0xfd, 0x12, 0xae, 0x15, 0x54, 0x51, 0xe5, 0xf0, 0x9d, 0x22, 0x87, 0x73, 0x90, 0xca, - 0x59, 0xe4, 0xe6, 0x66, 0xf4, 0x05, 0xb5, 0xd1, 0x9f, 0x90, 0x93, 0x7e, 0xc1, 0xda, 0x72, 0x12, - 0x9a, 0x9b, 0x30, 0x5f, 0x38, 0xab, 0xa8, 0x35, 0x61, 0xce, 0x27, 0x27, 0xdc, 0x2a, 0xb8, 0xe0, - 0x33, 0x7e, 0xc6, 0x44, 0xff, 0x83, 0x06, 0x37, 0x77, 0x45, 0xad, 0x14, 0xe7, 0xd0, 0x66, 0xec, - 0xe8, 0x73, 0x59, 0xb2, 0x93, 0x80, 0xc9, 0x27, 0x8a, 0xf1, 0xa1, 0x44, 0xb1, 0x0d, 0x33, 0x83, - 0xa2, 0x6c, 0x51, 0x27, 0xce, 0xa2, 0xe3, 0xf9, 0x14, 0x9d, 0x2a, 0xe2, 0xad, 0xdd, 0xfe, 0xf7, - 0x96, 0x63, 0x4e, 0x87, 0xa9, 0x51, 0x88, 0x6e, 0x02, 0x78, 0x38, 0x38, 0x22, 0x12, 0x6a, 0x5c, - 0x2c, 0x37, 0x25, 0x25, 0x5b, 0x4e, 0xa8, 0x63, 0x68, 0x94, 0x11, 0x56, 0xee, 0x7f, 0x07, 0xae, - 0xa8, 0xb6, 0x43, 0xa5, 0xd6, 0xc5, 0x82, 0xd3, 0x90, 0x18, 0xd2, 0x34, 0x09, 0x73, 0x65, 0xa5, - 0xff, 0x7f, 0x1c, 0xae, 0xa6, 0xe7, 0xd1, 0x2d, 0xb8, 0x2a, 0xaf, 0xef, 0x21, 0xa1, 0x9d, 0x43, - 0xae, 0xf6, 0xb2, 0x2a, 0x64, 0x9b, 0x42, 0x84, 0xe6, 0x61, 0x8a, 0x9c, 0x10, 0xdb, 0xf2, 0x98, - 0x43, 0x44, 0x7c, 0x4e, 0x9b, 0x93, 0xb1, 0x60, 0x9b, 0x39, 0x04, 0x7d, 0x0e, 0xb3, 0x2c, 0x61, - 0xab, 0x5a, 0x22, 0x11, 0xa4, 0xd5, 0xb5, 0x95, 0x52, 0x6a, 0x43, 0xee, 0x6d, 0x56, 0xcc, 0x37, - 0x58, 0x56, 0x14, 0x17, 0x64, 0x79, 0xdf, 0xe2, 0x8b, 0x50, 0x9b, 0x28, 0xad, 0x8b, 0x43, 0x80, - 0x1b, 0xd4, 0x75, 0x37, 0x2b, 0xe6, 0x94, 0xb0, 0x8d, 0x07, 0x68, 0x03, 0xaa, 0x1c, 0x1f, 0x91, - 0xc0, 0x12, 0xa2, 0xda, 0x25, 0x81, 0x74, 0xbb, 0x14, 0x69, 0x2f, 0xd6, 0x15, 0x70, 0x9b, 0x15, - 0x13, 0x78, 0x7f, 0x84, 0x2c, 0x98, 0x4b, 0x45, 0x82, 0x72, 0xf4, 0xb2, 0x40, 0x5b, 0x1d, 0x11, - 0x0c, 0x02, 0x74, 0x10, 0x12, 0x7d, 0x87, 0x67, 0xc3, 0x21, 0x19, 0xfa, 0x3e, 0x5c, 0x15, 0x0d, - 0x5e, 0x82, 0x7d, 0xa5, 0xc8, 0x67, 0xd9, 0x02, 0x2a, 0xd8, 0x9d, 0x78, 0xd0, 0x47, 0xac, 0x76, - 0x07, 0xc3, 0xf5, 0x59, 0x98, 0x91, 0x30, 0x96, 0x47, 0xc2, 0x10, 0x77, 0x88, 0xfe, 0x0b, 0x0d, - 0xae, 0x17, 0xee, 0x3e, 0xaa, 0xc3, 0x64, 0xe8, 0xe3, 0x6e, 0x78, 0xc8, 0xe4, 0xe9, 0x4f, 0x9a, - 0xfd, 0x31, 0xda, 0x1f, 0xc4, 0x9b, 0x0c, 0xfc, 0x8f, 0xb2, 0x7c, 0x54, 0x9f, 0xdc, 0xca, 0x77, - 0xc5, 0x9f, 0x1e, 0x1c, 0x3c, 0x89, 0x05, 0x72, 0x91, 0xe7, 0x0f, 0x86, 0x03, 0xf1, 0xf7, 0x1a, - 0x5c, 0x2b, 0x38, 0x3c, 0xf4, 0x08, 0xc4, 0xfd, 0x93, 0x2d, 0x91, 0x4a, 0x39, 0x0b, 0x25, 0xad, - 0x9c, 0x68, 0x79, 0x4c, 0xd1, 0xf9, 0x89, 0x4f, 0xf4, 0x21, 0x5c, 0x16, 0xc7, 0x9c, 0xb0, 0xad, - 0x95, 0xe5, 0x7f, 0xc5, 0x46, 0x69, 0xc7, 0x97, 0x20, 0x95, 0x83, 0xe5, 0xcd, 0x9c, 0x30, 0xab, - 0x83, 0x24, 0x1c, 0xea, 0x5f, 0x8d, 0xc1, 0xec, 0x70, 0x88, 0xa0, 0x55, 0xb8, 0x24, 0xc3, 0x4a, - 0xf2, 0x2c, 0x5d, 0x6e, 0xb3, 0x62, 0x4a, 0x45, 0xb4, 0x0f, 0x73, 0xa9, 0x94, 0xa8, 0x82, 0x72, - 0xac, 0xb4, 0x92, 0xc8, 0x15, 0x53, 0xe9, 0x35, 0x81, 0x9b, 0x75, 0x87, 0x64, 0xe8, 0x0b, 0x40, - 0xa9, 0x40, 0xb7, 0x42, 0x8e, 0x79, 0x14, 0xaa, 0xab, 0xd8, 0x3c, 0x47, 0xbc, 0xef, 0x0a, 0x03, - 0x73, 0x96, 0x0f, 0x49, 0xd6, 0xa7, 0x33, 0x37, 0x48, 0xff, 0xa3, 0x06, 0x37, 0x8a, 0x6d, 0xe3, - 0x6d, 0xcc, 0x2c, 0xae, 0x72, 0x09, 0x4b, 0xa9, 0xdc, 0x07, 0x14, 0x10, 0x0f, 0x53, 0x9f, 0xfa, - 0x1d, 0xeb, 0x38, 0xc2, 0x3e, 0x8f, 0xbc, 0x50, 0x15, 0xbd, 0xb9, 0xfe, 0xcc, 0x67, 0x6a, 0x02, - 0x7d, 0x17, 0x1a, 0xac, 0xcb, 0xa9, 0x47, 0x43, 0x4e, 0x6d, 0xec, 0xba, 0xa7, 0x22, 0x1f, 0x10, - 0x67, 0x60, 0x2a, 0xdb, 0xb5, 0x85, 0xac, 0xd6, 0x86, 0x50, 0x4a, 0x50, 0xd6, 0xfe, 0x52, 0x85, - 0x4b, 0xa2, 0xa8, 0xa0, 0x9f, 0x69, 0x30, 0x99, 0x94, 0x08, 0x74, 0xaf, 0x60, 0x57, 0x4a, 0x5e, - 0x52, 0xf5, 0x95, 0x32, 0xdd, 0xe1, 0xa7, 0x94, 0xde, 0xfc, 0xe9, 0x3f, 0xfe, 0xfb, 0xab, 0xb1, - 0xdb, 0xe8, 0x96, 0x31, 0xe2, 0xc1, 0x6c, 0xfc, 0x88, 0x3a, 0x3f, 0x46, 0x3f, 0xd7, 0xa0, 0x9a, - 0x7a, 0xcd, 0x94, 0x13, 0xca, 0x3f, 0xab, 0xea, 0xef, 0x9e, 0x45, 0x28, 0xf5, 0x3c, 0xd2, 0xdf, - 0x11, 0x9c, 0x1a, 0x68, 0x61, 0x14, 0x27, 0xf4, 0x57, 0x0d, 0x6a, 0x65, 0x6d, 0x39, 0x5a, 0x7b, - 0xad, 0x1e, 0x5e, 0x72, 0x7c, 0xff, 0x02, 0x7d, 0xbf, 0xfe, 0x50, 0x70, 0xfd, 0xe0, 0xa1, 0x76, - 0x4f, 0x37, 0x8c, 0xc2, 0x17, 0xbb, 0xe5, 0x33, 0x87, 0x58, 0x9c, 0xc9, 0xff, 0x76, 0x8a, 0xe4, - 0xdf, 0x35, 0x58, 0x18, 0xd5, 0x21, 0xa3, 0x47, 0x65, 0xbb, 0x76, 0x8e, 0xfe, 0xbe, 0xfe, 0xed, - 0x8b, 0x19, 0x2b, 0xbf, 0x96, 0x85, 0x5f, 0x4b, 0xa8, 0x61, 0x8c, 0xfc, 0x95, 0x04, 0xfd, 0x59, - 0x83, 0xf9, 0x11, 0xed, 0x31, 0x7a, 0x58, 0xc6, 0xe2, 0xec, 0xc6, 0xbe, 0xfe, 0xe8, 0x42, 0xb6, - 0xca, 0x81, 0x3b, 0xc2, 0x81, 0x45, 0x74, 0x73, 0xe4, 0x4f, 0x47, 0xe8, 0x6f, 0x1a, 0xbc, 0x5d, - 0xda, 0x62, 0xa2, 0x8f, 0xca, 0x18, 0x9c, 0xd5, 0xbf, 0xd6, 0xbf, 0x75, 0x01, 0x4b, 0xc5, 0xbc, - 0x25, 0x98, 0xaf, 0xa0, 0x65, 0xe3, 0x5c, 0x3f, 0x17, 0x21, 0x1f, 0xa6, 0x33, 0xaf, 0x00, 0xf4, - 0x5e, 0xd9, 0xda, 0x45, 0xef, 0x90, 0xfa, 0xfd, 0x73, 0x6a, 0x2b, 0x76, 0x15, 0xf4, 0x5b, 0x0d, - 0x66, 0xb2, 0xfd, 0x2e, 0x2a, 0xc5, 0x28, 0xec, 0x9a, 0xeb, 0xad, 0xf3, 0xaa, 0xab, 0x35, 0xdf, - 0x13, 0x3b, 0xb2, 0x8c, 0xde, 0x29, 0xd8, 0x91, 0x5c, 0x7f, 0x8d, 0x7e, 0x92, 0x64, 0xfc, 0xe1, - 0xbe, 0x14, 0xad, 0x9e, 0xb7, 0xc7, 0x4b, 0x7a, 0xee, 0xfa, 0x83, 0xd7, 0xb0, 0x90, 0x64, 0x57, - 0xb5, 0xf5, 0x9d, 0xaf, 0x5f, 0x36, 0xb4, 0x17, 0x2f, 0x1b, 0xda, 0x7f, 0x5e, 0x36, 0xb4, 0x5f, - 0xbe, 0x6a, 0x54, 0x5e, 0xbc, 0x6a, 0x54, 0xfe, 0xf9, 0xaa, 0x51, 0xf9, 0xc1, 0x87, 0x1d, 0xca, - 0x0f, 0xa3, 0x76, 0xcb, 0x66, 0x5e, 0xd6, 0x95, 0xde, 0x07, 0xf7, 0x45, 0x43, 0x62, 0xf4, 0x25, - 0x27, 0xd2, 0x3d, 0x7e, 0xda, 0x25, 0x61, 0xfb, 0xb2, 0x10, 0xbf, 0xff, 0x4d, 0x00, 0x00, 0x00, - 0xff, 0xff, 0x9d, 0x29, 0x28, 0xfb, 0x99, 0x15, 0x00, 0x00, + 0x58, 0xd6, 0x3a, 0xf1, 0x52, 0x56, 0x82, 0x20, 0xb5, 0x8b, 0x14, 0x96, 0x5b, 0x5b, 0x46, 0xad, + 0x44, 0xa1, 0x14, 0x47, 0x68, 0x03, 0x10, 0xb3, 0xe4, 0xec, 0x6a, 0x20, 0x92, 0xb3, 0x22, 0x87, + 0x0b, 0x09, 0x45, 0x51, 0xa0, 0x87, 0x5c, 0xda, 0x02, 0x05, 0x7a, 0xe8, 0xa1, 0xe8, 0xa9, 0xe7, + 0x02, 0xbd, 0xf4, 0xd8, 0xbf, 0x5b, 0x8e, 0x06, 0x7a, 0xe9, 0xa1, 0x28, 0x0a, 0xbb, 0xe7, 0x5e, + 0x0a, 0xf4, 0x1c, 0x70, 0x66, 0xb8, 0x4b, 0x2e, 0xc9, 0x95, 0xac, 0x8b, 0xc4, 0x79, 0xf3, 0xde, + 0x9b, 0xef, 0xfd, 0xcc, 0x7b, 0x6f, 0x16, 0xae, 0x3b, 0x27, 0xce, 0x71, 0x2f, 0x60, 0x9c, 0xd9, + 0xcc, 0x35, 0x6c, 0x97, 0xb5, 0x8d, 0xa3, 0x88, 0x04, 0x27, 0x2d, 0x41, 0x43, 0x0b, 0xe9, 0xed, + 0x56, 0xbc, 0x5d, 0x7f, 0xb3, 0xcb, 0xba, 0x4c, 0x90, 0x8c, 0xf8, 0x4b, 0x32, 0xd6, 0x97, 0xba, + 0x8c, 0x75, 0x5d, 0x62, 0xe0, 0x1e, 0x35, 0xb0, 0xef, 0x33, 0x8e, 0x39, 0x65, 0x7e, 0xa8, 0x76, + 0xef, 0xd8, 0x2c, 0xf4, 0x58, 0x68, 0xb4, 0x71, 0x48, 0xa4, 0x7e, 0xa3, 0x7f, 0xaf, 0x4d, 0x38, + 0xbe, 0x67, 0xf4, 0x70, 0x97, 0xfa, 0x82, 0x59, 0xf1, 0x1a, 0x79, 0x44, 0x6d, 0x97, 0xd9, 0x87, + 0x56, 0x80, 0x39, 0xb1, 0x5c, 0xea, 0x51, 0x6e, 0xd9, 0xcc, 0xef, 0xd0, 0xae, 0x12, 0xb8, 0x91, + 0x17, 0x88, 0xff, 0x58, 0x3d, 0x4c, 0x03, 0xc5, 0xb2, 0x9e, 0x67, 0x21, 0x47, 0x11, 0xe5, 0x27, + 0x16, 0xa7, 0x24, 0x28, 0x52, 0x5a, 0xe0, 0x17, 0x16, 0x38, 0x24, 0x51, 0xb8, 0x9c, 0xdf, 0xf6, + 0x30, 0xb7, 0x0f, 0x48, 0x62, 0xf1, 0xbb, 0x79, 0x06, 0x97, 0x1e, 0x45, 0xd4, 0x91, 0x7e, 0xc9, + 0x1e, 0xb6, 0x58, 0xa0, 0x8d, 0xf4, 0xd5, 0xe6, 0xc7, 0x99, 0x4d, 0xea, 0x3b, 0xe4, 0x98, 0x04, + 0x06, 0xeb, 0x74, 0x2c, 0xfb, 0x00, 0x53, 0xdf, 0x8a, 0x7a, 0x0e, 0xe6, 0x24, 0xcc, 0x53, 0x94, + 0xfc, 0x5a, 0x46, 0x3e, 0x8c, 0xda, 0xd8, 0xb6, 0x59, 0xe4, 0xf3, 0xd0, 0x08, 0x79, 0x40, 0xb0, + 0x47, 0xfd, 0x04, 0x46, 0xb3, 0x9c, 0x73, 0xf0, 0xad, 0x58, 0x6f, 0x66, 0x58, 0x7b, 0x01, 0xb5, + 0x49, 0x4e, 0x9f, 0xde, 0x84, 0xb7, 0x3e, 0x8b, 0x63, 0xfd, 0x84, 0xf0, 0x47, 0x2e, 0x6b, 0xef, + 0x60, 0x1a, 0x98, 0xe4, 0x28, 0x22, 0x21, 0x47, 0x73, 0x30, 0x41, 0x9d, 0x9a, 0xb6, 0xa2, 0xad, + 0xcd, 0x9a, 0x13, 0xd4, 0xd1, 0xbf, 0x80, 0xab, 0x82, 0x75, 0xc8, 0x17, 0xf6, 0x98, 0x1f, 0x12, + 0xf4, 0x31, 0xcc, 0x0c, 0x82, 0x29, 0xf8, 0xab, 0x1b, 0x8b, 0xad, 0x5c, 0x52, 0xb6, 0x12, 0xb9, + 0xcd, 0xa9, 0xaf, 0xff, 0xb5, 0x5c, 0x31, 0xa7, 0x6d, 0xb5, 0xd6, 0xb1, 0xc2, 0xf0, 0xd0, 0x75, + 0x47, 0x31, 0x3c, 0x06, 0x18, 0x26, 0x9f, 0xd2, 0xbd, 0xda, 0x92, 0x99, 0xda, 0x8a, 0x33, 0xb5, + 0x25, 0x6f, 0x82, 0xca, 0xd4, 0xd6, 0x0e, 0xee, 0x12, 0x25, 0x6b, 0xa6, 0x24, 0xf5, 0xdf, 0x69, + 0x50, 0xcb, 0x80, 0x7f, 0xe8, 0xba, 0x65, 0xf8, 0x27, 0x5f, 0x13, 0x3f, 0x7a, 0x92, 0x01, 0x39, + 0x21, 0x40, 0xde, 0x3e, 0x15, 0xa4, 0x3c, 0x3c, 0x83, 0xf2, 0x9f, 0x1a, 0x2c, 0x6f, 0x93, 0xfe, + 0x27, 0xcc, 0x21, 0x7b, 0x2c, 0xfe, 0xfb, 0x08, 0xbb, 0x76, 0xe4, 0x8a, 0xcd, 0xc4, 0x23, 0x5f, + 0xc2, 0x35, 0x79, 0xd5, 0x7a, 0x01, 0xeb, 0xb1, 0x90, 0x04, 0x96, 0x4a, 0xea, 0x81, 0x77, 0xf2, + 0xc8, 0x9f, 0x63, 0x37, 0x4e, 0x6a, 0x16, 0x6c, 0x93, 0xfe, 0xb6, 0xe4, 0x36, 0xdf, 0x14, 0x5a, + 0x76, 0x94, 0x12, 0x45, 0x45, 0x3f, 0x84, 0xab, 0xfd, 0x84, 0xd9, 0xf2, 0x48, 0xdf, 0xf2, 0x08, + 0x0f, 0xa8, 0x1d, 0x0e, 0xac, 0xca, 0x2b, 0xcf, 0x00, 0xde, 0x96, 0xec, 0xe6, 0x95, 0x7e, 0xfa, + 0x48, 0x49, 0xd4, 0xff, 0xab, 0xc1, 0x4a, 0xb9, 0x79, 0x2a, 0x18, 0x5d, 0xb8, 0x14, 0x90, 0x30, + 0x72, 0x79, 0xa8, 0x42, 0xf1, 0xe4, 0xb4, 0x33, 0x0b, 0xb4, 0xc4, 0x0c, 0x0f, 0x7d, 0xe7, 0x39, + 0x73, 0x23, 0x8f, 0xec, 0x90, 0x20, 0x0e, 0x9d, 0x0a, 0x5b, 0xa2, 0xbd, 0x8e, 0xe1, 0x4a, 0x01, + 0x17, 0x5a, 0x81, 0xcb, 0x83, 0x64, 0xb0, 0x06, 0xf9, 0x0f, 0x49, 0xb0, 0x9f, 0x3a, 0x68, 0x1e, + 0x26, 0x3d, 0xd2, 0x17, 0x1e, 0x99, 0x30, 0xe3, 0x4f, 0x74, 0x0d, 0x2e, 0xf6, 0x85, 0x92, 0xda, + 0xe4, 0x8a, 0xb6, 0x36, 0x65, 0xaa, 0x95, 0x7e, 0x07, 0xd6, 0x44, 0xd2, 0x7d, 0x4f, 0xd4, 0xb1, + 0x3d, 0x4a, 0x82, 0x67, 0x71, 0x15, 0x7b, 0x24, 0xea, 0x4a, 0x14, 0xa4, 0xe3, 0xaa, 0xff, 0x46, + 0x83, 0xe6, 0x19, 0x98, 0x95, 0x97, 0x7c, 0xa8, 0x95, 0x15, 0x47, 0x95, 0x07, 0x46, 0x81, 0xdb, + 0xc6, 0xa9, 0x56, 0xee, 0xb9, 0x4a, 0x8a, 0x78, 0xf4, 0x26, 0xdc, 0x16, 0xe0, 0x36, 0xe3, 0xa4, + 0x31, 0x31, 0x27, 0xe5, 0x86, 0xfc, 0x5a, 0x53, 0x56, 0x8f, 0xe5, 0x55, 0x76, 0x1c, 0xc2, 0x5b, + 0x25, 0x8d, 0x43, 0x99, 0xd1, 0x2a, 0x30, 0x63, 0x8c, 0x62, 0x65, 0x85, 0x4c, 0xee, 0x11, 0x16, + 0x7d, 0x1f, 0xde, 0x16, 0xc0, 0x76, 0x39, 0xe6, 0xa4, 0x13, 0xb9, 0x9f, 0xc6, 0xcd, 0x22, 0xb9, + 0x57, 0x0f, 0x60, 0x5a, 0x34, 0x8f, 0x24, 0xe6, 0xd5, 0x8d, 0x7a, 0xc1, 0xd1, 0x42, 0xe4, 0xa9, + 0x93, 0xe4, 0x12, 0x93, 0x4b, 0xfd, 0x8f, 0x1a, 0xd4, 0x8b, 0x54, 0x2b, 0x2b, 0xf7, 0xe1, 0x0d, + 0xa9, 0xbb, 0xe7, 0x62, 0x9b, 0x78, 0xc4, 0xe7, 0xea, 0x88, 0x66, 0xc1, 0x11, 0xcf, 0x98, 0xdf, + 0xdd, 0x23, 0x81, 0x27, 0x54, 0xec, 0x24, 0x02, 0xea, 0xc4, 0x39, 0x96, 0xa1, 0xa2, 0x65, 0xa8, + 0x76, 0xa8, 0xeb, 0x5a, 0xd8, 0x8b, 0x0b, 0xbf, 0xc8, 0xc9, 0x29, 0x13, 0x62, 0xd2, 0x43, 0x41, + 0x41, 0x4b, 0x30, 0xc3, 0x03, 0xda, 0xed, 0x92, 0x80, 0x38, 0x22, 0x3b, 0xa7, 0xcd, 0x21, 0x41, + 0xbf, 0x0d, 0xb7, 0x04, 0xec, 0x67, 0xa9, 0xb6, 0x57, 0x18, 0xd4, 0xaf, 0x34, 0x58, 0x3d, 0x8d, + 0x53, 0x19, 0xfb, 0x25, 0x5c, 0x29, 0xe8, 0xa2, 0xca, 0xe0, 0x5b, 0x45, 0x06, 0xe7, 0x54, 0x2a, + 0x63, 0x91, 0x9b, 0xdb, 0xd1, 0x97, 0x94, 0xa3, 0x3f, 0x21, 0xc7, 0x83, 0x86, 0xf5, 0xd4, 0x49, + 0x60, 0x6e, 0xc1, 0x62, 0xe1, 0xae, 0x82, 0xd6, 0x84, 0x05, 0x9f, 0x1c, 0x73, 0xab, 0xe0, 0x82, + 0xcf, 0xf9, 0x19, 0x11, 0xfd, 0x7f, 0x1a, 0x5c, 0xdf, 0x15, 0xbd, 0x52, 0xc4, 0xa1, 0xcd, 0xd8, + 0xe1, 0xe7, 0xb2, 0x65, 0x27, 0x09, 0x93, 0x2f, 0x14, 0x93, 0x23, 0x85, 0x62, 0x1b, 0xe6, 0x86, + 0x4d, 0xd9, 0xa2, 0x4e, 0x5c, 0x45, 0x27, 0xf3, 0x25, 0x3a, 0xd5, 0xc4, 0x5b, 0xbb, 0x83, 0xef, + 0xa7, 0x8e, 0x39, 0x1b, 0xa6, 0x56, 0x21, 0xba, 0x0e, 0xe0, 0xe1, 0xe0, 0x90, 0x48, 0x55, 0x93, + 0xe2, 0xb8, 0x19, 0x49, 0x89, 0xb7, 0x37, 0xa1, 0xd1, 0xa1, 0x2e, 0x27, 0x81, 0x25, 0x72, 0x24, + 0xb4, 0xda, 0x27, 0x56, 0xe6, 0xf8, 0xda, 0x94, 0x08, 0x7f, 0x5d, 0x72, 0x09, 0xb3, 0xc2, 0xcd, + 0x93, 0xf4, 0x89, 0x3a, 0x86, 0x46, 0x99, 0xd1, 0xca, 0x85, 0xdf, 0x81, 0x4b, 0x6a, 0x74, 0x51, + 0xe5, 0x79, 0xb9, 0x20, 0xa2, 0x52, 0x87, 0x14, 0x4d, 0xae, 0x8a, 0x92, 0xd2, 0xff, 0x3f, 0x09, + 0x97, 0xd3, 0xfb, 0xe8, 0x06, 0x5c, 0x96, 0x25, 0xe0, 0x80, 0xd0, 0xee, 0x01, 0x57, 0xf1, 0xa8, + 0x0a, 0xda, 0x96, 0x20, 0xa1, 0x45, 0x98, 0x21, 0xc7, 0xc4, 0xb6, 0x3c, 0xe6, 0x10, 0x91, 0xe3, + 0xb3, 0xe6, 0x74, 0x4c, 0xd8, 0x66, 0x0e, 0x41, 0x9f, 0xc3, 0x3c, 0x4b, 0xd0, 0xaa, 0xb1, 0x4a, + 0x24, 0x7a, 0x75, 0x63, 0xad, 0x14, 0xda, 0x88, 0x79, 0x5b, 0x15, 0xf3, 0x0d, 0x96, 0x25, 0xc5, + 0x4d, 0x5d, 0xde, 0xd9, 0xf8, 0x32, 0x09, 0xd7, 0x15, 0xf7, 0xd6, 0x11, 0x85, 0x8f, 0xa9, 0xeb, + 0x6e, 0x55, 0xcc, 0x19, 0x21, 0x1b, 0x2f, 0xd0, 0x63, 0xa8, 0x72, 0x7c, 0x98, 0x84, 0xa5, 0x76, + 0x41, 0x68, 0xba, 0x59, 0xaa, 0x69, 0x2f, 0xe6, 0x15, 0xea, 0xb6, 0x2a, 0x26, 0xf0, 0xc1, 0x0a, + 0x59, 0xb0, 0x90, 0x0a, 0xa7, 0x32, 0xf4, 0xa2, 0xd0, 0xb6, 0x3e, 0x26, 0xa1, 0x84, 0xd2, 0x61, + 0x90, 0x07, 0x06, 0xcf, 0x87, 0x23, 0x34, 0xf4, 0x7d, 0xb8, 0x2c, 0x86, 0xc4, 0x44, 0xf7, 0xa5, + 0x22, 0x9b, 0xe5, 0x18, 0xa9, 0xd4, 0xee, 0xc4, 0x8b, 0x81, 0xc6, 0x6a, 0x6f, 0xb8, 0xdc, 0x9c, + 0x87, 0x39, 0xa9, 0xc6, 0xf2, 0x48, 0x18, 0xe2, 0x2e, 0xd1, 0x7f, 0xa1, 0xc1, 0xd5, 0x42, 0xef, + 0xa3, 0x3a, 0x4c, 0x87, 0x3e, 0xee, 0x85, 0x07, 0x4c, 0x46, 0x7f, 0xda, 0x1c, 0xac, 0xd1, 0xfe, + 0x30, 0xdf, 0xe4, 0xe5, 0xf9, 0x28, 0x8b, 0x47, 0xcd, 0xda, 0xad, 0xfc, 0x64, 0xfd, 0x69, 0xa7, + 0xf3, 0x28, 0x26, 0xc8, 0x43, 0x9e, 0xdf, 0x1b, 0x4d, 0xc4, 0xdf, 0x6b, 0x70, 0xa5, 0x20, 0x78, + 0xe8, 0x01, 0x88, 0x3b, 0x2c, 0xc7, 0x2a, 0x55, 0xb6, 0x96, 0x4a, 0xc6, 0x41, 0x31, 0x36, 0x99, + 0x62, 0x7a, 0x14, 0x9f, 0xe8, 0x43, 0xb8, 0x28, 0x6f, 0x9f, 0x42, 0x5b, 0x2b, 0xeb, 0x21, 0x0a, + 0x8d, 0xe2, 0x8e, 0x2f, 0x41, 0xaa, 0x8e, 0xcb, 0xdb, 0x3d, 0x65, 0x56, 0x87, 0x85, 0x3c, 0xd4, + 0xbf, 0x9a, 0x80, 0xf9, 0xd1, 0x14, 0x41, 0xeb, 0x70, 0x41, 0xa6, 0x95, 0xc4, 0x59, 0x7a, 0xdc, + 0x56, 0xc5, 0x94, 0x8c, 0x68, 0x1f, 0x16, 0x52, 0x65, 0x55, 0x25, 0xe5, 0x44, 0x69, 0x37, 0x92, + 0x27, 0xa6, 0x4a, 0x74, 0xa2, 0x6e, 0xde, 0x1d, 0xa1, 0xa1, 0x2f, 0x00, 0xa5, 0x12, 0xdd, 0x0a, + 0x39, 0xe6, 0x51, 0xa8, 0xae, 0x62, 0xf3, 0x0c, 0xf9, 0xbe, 0x2b, 0x04, 0xcc, 0x79, 0x3e, 0x42, + 0xd9, 0x9c, 0xcd, 0xdc, 0x20, 0xfd, 0x0f, 0x1a, 0x5c, 0x2b, 0x96, 0x8d, 0xdd, 0x98, 0x39, 0x5c, + 0xd5, 0x12, 0x96, 0x62, 0xb9, 0x0b, 0x28, 0x20, 0x1e, 0xa6, 0x3e, 0xf5, 0xbb, 0xd6, 0x51, 0x84, + 0x7d, 0x1e, 0x79, 0xa1, 0x6a, 0x9c, 0x0b, 0x83, 0x9d, 0xcf, 0xd4, 0x06, 0xfa, 0x2e, 0x34, 0x58, + 0x8f, 0x53, 0x8f, 0x86, 0x9c, 0xda, 0xd8, 0x75, 0x4f, 0x44, 0x3d, 0x20, 0xce, 0x50, 0x54, 0x8e, + 0x7c, 0x4b, 0x59, 0xae, 0xc7, 0x82, 0x29, 0xd1, 0xb2, 0xf1, 0xe7, 0x2a, 0x5c, 0x10, 0x8d, 0x09, + 0xfd, 0x4c, 0x83, 0xe9, 0xa4, 0xcd, 0xa0, 0x3b, 0x05, 0x5e, 0x29, 0x79, 0x8d, 0xd5, 0xd7, 0xca, + 0x78, 0x47, 0x9f, 0x63, 0x7a, 0xf3, 0xa7, 0x7f, 0xff, 0xcf, 0xaf, 0x26, 0x6e, 0xa2, 0x1b, 0xc6, + 0x98, 0x47, 0xb7, 0xf1, 0x23, 0xea, 0xfc, 0x18, 0xfd, 0x5c, 0x83, 0x6a, 0xea, 0x45, 0x54, 0x0e, + 0x28, 0xff, 0x34, 0xab, 0xbf, 0x7b, 0x1a, 0xa0, 0xd4, 0x13, 0x4b, 0x7f, 0x47, 0x60, 0x6a, 0xa0, + 0xa5, 0x71, 0x98, 0xd0, 0x5f, 0x34, 0xa8, 0x95, 0x8d, 0xf6, 0x68, 0xe3, 0xb5, 0xde, 0x01, 0x12, + 0xe3, 0xfb, 0xe7, 0x78, 0x3b, 0xe8, 0xf7, 0x05, 0xd6, 0x0f, 0xee, 0x6b, 0x77, 0x74, 0xc3, 0x28, + 0x7c, 0xf5, 0x5b, 0x3e, 0x73, 0x88, 0xc5, 0x99, 0xfc, 0x6f, 0xa7, 0x40, 0xfe, 0x4d, 0x83, 0xa5, + 0x71, 0x53, 0x36, 0x7a, 0x50, 0xe6, 0xb5, 0x33, 0xbc, 0x11, 0xea, 0xdf, 0x3e, 0x9f, 0xb0, 0xb2, + 0x6b, 0x55, 0xd8, 0xb5, 0x82, 0x1a, 0xc6, 0xd8, 0x5f, 0x5a, 0xd0, 0x9f, 0x34, 0x58, 0x1c, 0x33, + 0x62, 0xa3, 0xfb, 0x65, 0x28, 0x4e, 0x7f, 0x1c, 0xd4, 0x1f, 0x9c, 0x4b, 0x56, 0x19, 0x70, 0x4b, + 0x18, 0xb0, 0x8c, 0xae, 0x8f, 0xfd, 0xf9, 0x09, 0xfd, 0x55, 0x83, 0xb7, 0x4b, 0xc7, 0x54, 0xf4, + 0x51, 0x19, 0x82, 0xd3, 0x66, 0xe0, 0xfa, 0xb7, 0xce, 0x21, 0xa9, 0x90, 0xb7, 0x04, 0xf2, 0x35, + 0xb4, 0x6a, 0x9c, 0xe9, 0x27, 0x27, 0xe4, 0xc3, 0x6c, 0xe6, 0x25, 0x81, 0xde, 0x2b, 0x3b, 0xbb, + 0xe8, 0x2d, 0x53, 0xbf, 0x7b, 0x46, 0x6e, 0x85, 0xae, 0x82, 0x7e, 0xab, 0xc1, 0x5c, 0x76, 0x66, + 0x46, 0xa5, 0x3a, 0x0a, 0x27, 0xef, 0x7a, 0xeb, 0xac, 0xec, 0xea, 0xcc, 0xf7, 0x84, 0x47, 0x56, + 0xd1, 0x3b, 0x05, 0x1e, 0xc9, 0xcd, 0xe8, 0xe8, 0x27, 0x49, 0xc5, 0x1f, 0x9d, 0x4b, 0xd1, 0xfa, + 0x59, 0x67, 0xbc, 0x64, 0x6e, 0xaf, 0xdf, 0x7b, 0x0d, 0x09, 0x09, 0x76, 0x5d, 0xdb, 0xdc, 0xf9, + 0xfa, 0x65, 0x43, 0x7b, 0xf1, 0xb2, 0xa1, 0xfd, 0xfb, 0x65, 0x43, 0xfb, 0xe5, 0xab, 0x46, 0xe5, + 0xc5, 0xab, 0x46, 0xe5, 0x1f, 0xaf, 0x1a, 0x95, 0x1f, 0x7c, 0xd8, 0xa5, 0xfc, 0x20, 0x6a, 0xb7, + 0x6c, 0xe6, 0x65, 0x4d, 0xe9, 0x7f, 0x70, 0x57, 0x0c, 0x24, 0xc6, 0x80, 0x72, 0x2c, 0xcd, 0xe3, + 0x27, 0x3d, 0x12, 0xb6, 0x2f, 0x0a, 0xf2, 0xfb, 0xdf, 0x04, 0x00, 0x00, 0xff, 0xff, 0x55, 0xfb, + 0x6d, 0x6f, 0xdd, 0x15, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -2558,6 +2571,16 @@ func (m *StreamOrderbookUpdatesRequest) MarshalToSizedBuffer(dAtA []byte) (int, _ = i var l int _ = l + if m.FilterOrdersBySubaccountId { + i-- + if m.FilterOrdersBySubaccountId { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x20 + } if len(m.MarketIds) > 0 { dAtA12 := make([]byte, len(m.MarketIds)*10) var j11 int @@ -3284,6 +3307,9 @@ func (m *StreamOrderbookUpdatesRequest) Size() (n int) { } n += 1 + sovQuery(uint64(l)) + l } + if m.FilterOrdersBySubaccountId { + n += 2 + } return n } @@ -5089,6 +5115,26 @@ func (m *StreamOrderbookUpdatesRequest) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field MarketIds", wireType) } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field FilterOrdersBySubaccountId", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.FilterOrdersBySubaccountId = bool(v != 0) default: iNdEx = preIndex skippy, err := skipQuery(dAtA[iNdEx:]) From 69375fbdb7b98fd9e45f66907d2373bde0b1c3ab Mon Sep 17 00:00:00 2001 From: Matthew Weeks Date: Sat, 25 Jan 2025 10:14:21 -0500 Subject: [PATCH 2/2] fix: filter on subaccount id owner --- .../streaming/full_node_streaming_manager.go | 32 +++-- .../full_node_streaming_manager_test.go | 134 ++++++++++-------- protocol/streaming/util/util.go | 19 +-- protocol/streaming/util/util_test.go | 47 +++--- protocol/streaming/ws/websocket_server.go | 3 +- 5 files changed, 134 insertions(+), 101 deletions(-) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 0a85bbe9209..463c2a25327 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -222,17 +222,15 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 func doFilterStreamUpdateBySubaccount( orderBookUpdate *clobtypes.StreamUpdate_OrderbookUpdate, - subaccountIdNumbers []uint32, + subaccountIds []satypes.SubaccountId, logger log.Logger, ) bool { for _, orderBookUpdate := range orderBookUpdate.OrderbookUpdate.Updates { - orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate) - if err == nil { - if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) { - return true - } - } else { + orderBookUpdateSubaccountId, err := streaming_util.GetOffChainUpdateV1SubaccountId(orderBookUpdate) + if err != nil { logger.Error(err.Error()) + } else if slices.Contains(subaccountIds, *orderBookUpdateSubaccountId) { + return true } } return false @@ -244,18 +242,21 @@ func doFilterStreamUpdateBySubaccount( // StreamUpdate_OrderUpdate with updates only for subscribed subaccounts func FilterStreamUpdateBySubaccount( updates []clobtypes.StreamUpdate, - subaccountIdNumbers []uint32, + subaccountIds []satypes.SubaccountId, logger log.Logger, ) *[]clobtypes.StreamUpdate { // If reflection becomes too expensive, split updatesChannel by message type filteredUpdates := []clobtypes.StreamUpdate{} + var doFilter bool for _, update := range updates { + doFilter = true switch updateMessage := update.UpdateMessage.(type) { case *clobtypes.StreamUpdate_OrderbookUpdate: - if doFilterStreamUpdateBySubaccount(updateMessage, subaccountIdNumbers, logger) { - filteredUpdates = append(filteredUpdates, update) + if !updateMessage.OrderbookUpdate.Snapshot && !doFilterStreamUpdateBySubaccount(updateMessage, subaccountIds, logger) { + doFilter = false } - default: + } + if doFilter { filteredUpdates = append(filteredUpdates, update) } } @@ -287,10 +288,8 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( sm.Lock() sIds := make([]satypes.SubaccountId, len(subaccountIds)) - subaccountIdNumbers := make([]uint32, len(subaccountIds)) for i, subaccountId := range subaccountIds { sIds[i] = *subaccountId - subaccountIdNumbers[i] = subaccountId.Number } subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender) @@ -331,10 +330,11 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( sm.logger.Info( fmt.Sprintf( - "New subscription id %+v for clob pair ids: %+v and subaccount ids: %+v", + "New subscription id %+v for clob pair ids: %+v and subaccount ids: %+v. filter orders by subaccount ids: %+v", subscription.subscriptionId, clobPairIds, subaccountIds, + filterOrdersBySubAccountId, ), ) sm.orderbookSubscriptions[subscription.subscriptionId] = subscription @@ -346,9 +346,11 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( // to send through stream. for updates := range subscription.updatesChannel { if filterOrdersBySubAccountId { - filteredUpdates := FilterStreamUpdateBySubaccount(updates, subaccountIdNumbers, sm.logger) + filteredUpdates := FilterStreamUpdateBySubaccount(updates, sIds, sm.logger) if filteredUpdates != nil { updates = *filteredUpdates + } else { + continue } } metrics.IncrCounterWithLabels( diff --git a/protocol/streaming/full_node_streaming_manager_test.go b/protocol/streaming/full_node_streaming_manager_test.go index 2f3546d337e..002f02f3908 100644 --- a/protocol/streaming/full_node_streaming_manager_test.go +++ b/protocol/streaming/full_node_streaming_manager_test.go @@ -6,13 +6,14 @@ import ( sdktypes "github.com/cosmos/cosmos-sdk/types" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" - pv1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types" + v1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types" sharedtypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types" "github.com/dydxprotocol/v4-chain/protocol/mocks" streaming "github.com/dydxprotocol/v4-chain/protocol/streaming" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -23,7 +24,7 @@ const ( ) func OpenOrder( - order *pv1types.IndexerOrder, + order *v1types.IndexerOrder, timestamp *time.Time, ) ocutypes.OffChainUpdateV1 { return ocutypes.OffChainUpdateV1{ @@ -38,7 +39,7 @@ func OpenOrder( } func CancelOrder( - removedOrderId *pv1types.IndexerOrderId, + removedOrderId *v1types.IndexerOrderId, timestamp *time.Time, ) ocutypes.OffChainUpdateV1 { return ocutypes.OffChainUpdateV1{ @@ -54,8 +55,8 @@ func CancelOrder( } func ReplaceOrder( - oldOrderId *pv1types.IndexerOrderId, - newOrder *pv1types.IndexerOrder, + oldOrderId *v1types.IndexerOrderId, + newOrder *v1types.IndexerOrder, timestamp *time.Time, ) ocutypes.OffChainUpdateV1 { return ocutypes.OffChainUpdateV1{ @@ -70,7 +71,7 @@ func ReplaceOrder( } } -func UpdateOrder(orderId *pv1types.IndexerOrderId, totalFilledQuantums uint64) ocutypes.OffChainUpdateV1 { +func UpdateOrder(orderId *v1types.IndexerOrderId, totalFilledQuantums uint64) ocutypes.OffChainUpdateV1 { return ocutypes.OffChainUpdateV1{ UpdateMessage: &ocutypes.OffChainUpdateV1_OrderUpdate{ OrderUpdate: &ocutypes.OrderUpdateV1{ @@ -81,14 +82,14 @@ func UpdateOrder(orderId *pv1types.IndexerOrderId, totalFilledQuantums uint64) o } } -func toStreamUpdate(offChainUpdates ...ocutypes.OffChainUpdateV1) clobtypes.StreamUpdate { +func toStreamUpdate(snapshot bool, offChainUpdates ...ocutypes.OffChainUpdateV1) clobtypes.StreamUpdate { return clobtypes.StreamUpdate{ BlockHeight: uint32(0), ExecMode: uint32(sdktypes.ExecModeFinalize), UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ Updates: offChainUpdates, - Snapshot: true, + Snapshot: snapshot, }, }, } @@ -169,7 +170,7 @@ func NewSubaccountUpdate( SubaccountId: subaccountId, UpdatedPerpetualPositions: []*satypes.SubaccountPerpetualPosition{}, UpdatedAssetPositions: []*satypes.SubaccountAssetPosition{}, - Snapshot: true, + Snapshot: false, }, }, } @@ -190,15 +191,15 @@ func NewPriceUpdate( Exponent: 6, Price: 1, }, - Snapshot: true, + Snapshot: false, }, }, } } -func NewIndexerOrderId(owner string, id uint32) pv1types.IndexerOrderId { - return pv1types.IndexerOrderId{ - SubaccountId: pv1types.IndexerSubaccountId{ +func NewIndexerOrderId(owner string, id uint32) v1types.IndexerOrderId { + return v1types.IndexerOrderId{ + SubaccountId: v1types.IndexerSubaccountId{ Owner: owner, Number: id, }, @@ -220,19 +221,19 @@ func NewOrderId(owner string, id uint32) clobtypes.OrderId { } } -func NewIndexerOrder(id pv1types.IndexerOrderId) pv1types.IndexerOrder { - return pv1types.IndexerOrder{ +func NewIndexerOrder(id v1types.IndexerOrderId) v1types.IndexerOrder { + return v1types.IndexerOrder{ OrderId: id, - Side: pv1types.IndexerOrder_SIDE_BUY, + Side: v1types.IndexerOrder_SIDE_BUY, Quantums: uint64(1_000_000), Subticks: 1, - GoodTilOneof: &pv1types.IndexerOrder_GoodTilBlock{ + GoodTilOneof: &v1types.IndexerOrder_GoodTilBlock{ GoodTilBlock: 1_000_000_000, }, TimeInForce: 1_000_000_000, ReduceOnly: false, ClientMetadata: 0, - ConditionType: pv1types.IndexerOrder_CONDITION_TYPE_UNSPECIFIED, + ConditionType: v1types.IndexerOrder_CONDITION_TYPE_UNSPECIFIED, ConditionalOrderTriggerSubticks: 0, } } @@ -254,23 +255,32 @@ func NewOrder(id clobtypes.OrderId) *clobtypes.Order { } } +func NewLogger() *mocks.Logger { + logger := mocks.Logger{} + logger.On("Info", mock.Anything, mock.Anything, mock.Anything) + logger.On("Error", mock.Anything, mock.Anything, mock.Anything) + return &logger +} + type TestCase struct { updates []clobtypes.StreamUpdate - subaccountIds []uint32 + subaccountIds []satypes.SubaccountId filteredUpdates *[]clobtypes.StreamUpdate } func TestFilterStreamUpdates(t *testing.T) { - logger := &mocks.Logger{} + logger := NewLogger() - subaccountIdNumber := uint32(1337) - orderId := NewIndexerOrderId("foo", subaccountIdNumber) + subaccountId := satypes.SubaccountId{Owner: "me", Number: 1337} + orderId := NewIndexerOrderId(subaccountId.Owner, subaccountId.Number) order := NewIndexerOrder(orderId) - otherSubaccountIdNumber := uint32(2600) - otherOrderId := NewIndexerOrderId("bar", otherSubaccountIdNumber) + otherSubaccountId := satypes.SubaccountId{Owner: "we", Number: 2600} + otherOrderId := NewIndexerOrderId(otherSubaccountId.Owner, otherSubaccountId.Number) otherOrder := NewIndexerOrder(otherOrderId) + neverInScopeSubaccountId := satypes.SubaccountId{Owner: "them", Number: 404} + newOrderId := order.OrderId newOrderId.ClientId += 1 newOrder := NewIndexerOrder(newOrderId) @@ -292,9 +302,11 @@ func TestFilterStreamUpdates(t *testing.T) { otherReplaceOrder := ReplaceOrder(&otherOrderId, &otherNewOrder, &orderReplaceTime) otherUpdateOrder := UpdateOrder(&otherOrderId, uint64(1999)) - baseStreamUpdate := toStreamUpdate(openOrder, cancelOrder, replaceOrder, updateOrder) - otherStreamUpdate := toStreamUpdate(otherOpenOrder, otherCancelOrder, otherReplaceOrder, otherUpdateOrder) + baseStreamUpdate := toStreamUpdate(false, openOrder, cancelOrder, replaceOrder, updateOrder) + snapshotBaseStreamUpdate := toStreamUpdate(true, openOrder, cancelOrder, replaceOrder, updateOrder) + otherStreamUpdate := toStreamUpdate(false, otherOpenOrder, otherCancelOrder, otherReplaceOrder, otherUpdateOrder) bothStreamUpdate := toStreamUpdate( + false, openOrder, cancelOrder, replaceOrder, @@ -316,149 +328,159 @@ func TestFilterStreamUpdates(t *testing.T) { priceUpdate := NewPriceUpdate(0, 0) tests := map[string]TestCase{ + "snapshotNotInScope": { + updates: []clobtypes.StreamUpdate{snapshotBaseStreamUpdate}, + subaccountIds: []satypes.SubaccountId{neverInScopeSubaccountId}, + filteredUpdates: &[]clobtypes.StreamUpdate{snapshotBaseStreamUpdate}, + }, + "snapshotNoScope": { + updates: []clobtypes.StreamUpdate{snapshotBaseStreamUpdate}, + subaccountIds: []satypes.SubaccountId{}, + filteredUpdates: &[]clobtypes.StreamUpdate{snapshotBaseStreamUpdate}, + }, "baseInScope": { updates: []clobtypes.StreamUpdate{baseStreamUpdate}, - subaccountIds: []uint32{orderId.SubaccountId.Number}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate}, }, "baseNotInScope": { updates: []clobtypes.StreamUpdate{baseStreamUpdate}, - subaccountIds: []uint32{0}, + subaccountIds: []satypes.SubaccountId{neverInScopeSubaccountId}, filteredUpdates: nil, }, "otherInScope": { updates: []clobtypes.StreamUpdate{otherStreamUpdate}, - subaccountIds: []uint32{otherSubaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{otherSubaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{otherStreamUpdate}, }, "otherNotInScope": { updates: []clobtypes.StreamUpdate{otherStreamUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: nil, }, "bothInScope": { updates: []clobtypes.StreamUpdate{bothStreamUpdate}, - subaccountIds: []uint32{subaccountIdNumber, otherSubaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId, otherSubaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, }, "bothOtherInScope": { updates: []clobtypes.StreamUpdate{bothStreamUpdate}, - subaccountIds: []uint32{otherSubaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{otherSubaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, }, "bothSequentiallyOtherInScope": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, otherStreamUpdate}, - subaccountIds: []uint32{otherSubaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{otherSubaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{otherStreamUpdate}, }, "bothBaseInScope": { updates: []clobtypes.StreamUpdate{bothStreamUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, }, "bothSequentiallyBaseInScope": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, otherStreamUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate}, }, "bothNoneInScopeWrongId": { updates: []clobtypes.StreamUpdate{bothStreamUpdate}, - subaccountIds: []uint32{404}, + subaccountIds: []satypes.SubaccountId{neverInScopeSubaccountId}, filteredUpdates: nil, }, "bothNoneInScopeNoId": { updates: []clobtypes.StreamUpdate{bothStreamUpdate}, - subaccountIds: []uint32{}, + subaccountIds: []satypes.SubaccountId{}, filteredUpdates: nil, }, "noUpdates": { updates: []clobtypes.StreamUpdate{}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: nil, }, "noUpdatesNoId": { updates: []clobtypes.StreamUpdate{}, - subaccountIds: []uint32{}, + subaccountIds: []satypes.SubaccountId{}, filteredUpdates: nil, }, "orderBookFillUpdates": { updates: []clobtypes.StreamUpdate{*orderBookFillUpdate}, - subaccountIds: []uint32{}, + subaccountIds: []satypes.SubaccountId{}, filteredUpdates: &[]clobtypes.StreamUpdate{*orderBookFillUpdate}, }, "orderBookFillUpdatesDropUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate, otherStreamUpdate}, - subaccountIds: []uint32{}, + subaccountIds: []satypes.SubaccountId{}, filteredUpdates: &[]clobtypes.StreamUpdate{*orderBookFillUpdate}, }, "orderBookFillUpdatesFilterUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate}, }, "orderBookFillUpdatesFilterAndDropUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate, otherStreamUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate}, }, "takerOrderUpdates": { updates: []clobtypes.StreamUpdate{*takerOrderUpdate}, - subaccountIds: []uint32{}, + subaccountIds: []satypes.SubaccountId{}, filteredUpdates: &[]clobtypes.StreamUpdate{*takerOrderUpdate}, }, "takerOrderUpdatesDropUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate, otherStreamUpdate}, - subaccountIds: []uint32{}, + subaccountIds: []satypes.SubaccountId{}, filteredUpdates: &[]clobtypes.StreamUpdate{*takerOrderUpdate}, }, "takerOrderUpdatesFilterUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate}, }, "takerOrderUpdatesFilterAndDropUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate, otherStreamUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate}, }, "subaccountUpdates": { updates: []clobtypes.StreamUpdate{*subaccountUpdate}, - subaccountIds: []uint32{}, + subaccountIds: []satypes.SubaccountId{}, filteredUpdates: &[]clobtypes.StreamUpdate{*subaccountUpdate}, }, "subaccountUpdatesDropUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate, otherStreamUpdate}, - subaccountIds: []uint32{}, + subaccountIds: []satypes.SubaccountId{}, filteredUpdates: &[]clobtypes.StreamUpdate{*subaccountUpdate}, }, "subaccountUpdatesFilterUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate}, }, "subaccountUpdatesFilterAndDropUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate, otherStreamUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate}, }, "priceUpdates": { updates: []clobtypes.StreamUpdate{*priceUpdate}, - subaccountIds: []uint32{}, + subaccountIds: []satypes.SubaccountId{}, filteredUpdates: &[]clobtypes.StreamUpdate{*priceUpdate}, }, "priceUpdatesDropUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate, otherStreamUpdate}, - subaccountIds: []uint32{}, + subaccountIds: []satypes.SubaccountId{}, filteredUpdates: &[]clobtypes.StreamUpdate{*priceUpdate}, }, "priceUpdatesFilterUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate}, }, "priceUpdatesFilterAndDropUpdate": { updates: []clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate, otherStreamUpdate}, - subaccountIds: []uint32{subaccountIdNumber}, + subaccountIds: []satypes.SubaccountId{subaccountId}, filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate}, }, } @@ -467,7 +489,7 @@ func TestFilterStreamUpdates(t *testing.T) { t.Run(name, func(t *testing.T) { filteredUpdates := streaming.FilterStreamUpdateBySubaccount(testCase.updates, testCase.subaccountIds, logger) if testCase.filteredUpdates != nil { - require.Equal(t, *filteredUpdates, *testCase.filteredUpdates) + require.Equal(t, *testCase.filteredUpdates, *filteredUpdates) } else { require.Nil(t, filteredUpdates) } diff --git a/protocol/streaming/util/util.go b/protocol/streaming/util/util.go index bdec459c5ab..be9ea99fdf2 100644 --- a/protocol/streaming/util/util.go +++ b/protocol/streaming/util/util.go @@ -5,7 +5,9 @@ import ( "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + v1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" ) // GetOffchainUpdatesV1 unmarshals messages in offchain updates to OffchainUpdateV1. @@ -23,22 +25,23 @@ func GetOffchainUpdatesV1(offchainUpdates *clobtypes.OffchainUpdates) []ocutypes } // Error expected if OffChainUpdateV1.UpdateMessage message type is extended to more order events -func GetOffChainUpdateV1SubaccountIdNumber(update ocutypes.OffChainUpdateV1) (uint32, error) { - var orderSubaccountIdNumber uint32 +func GetOffChainUpdateV1SubaccountId(update ocutypes.OffChainUpdateV1) (*satypes.SubaccountId, error) { + var orderSubaccountId v1types.IndexerSubaccountId switch updateMessage := update.UpdateMessage.(type) { case *ocutypes.OffChainUpdateV1_OrderPlace: - orderSubaccountIdNumber = updateMessage.OrderPlace.Order.OrderId.SubaccountId.Number + orderSubaccountId = updateMessage.OrderPlace.Order.OrderId.SubaccountId case *ocutypes.OffChainUpdateV1_OrderRemove: - orderSubaccountIdNumber = updateMessage.OrderRemove.RemovedOrderId.SubaccountId.Number + orderSubaccountId = updateMessage.OrderRemove.RemovedOrderId.SubaccountId case *ocutypes.OffChainUpdateV1_OrderUpdate: - orderSubaccountIdNumber = updateMessage.OrderUpdate.OrderId.SubaccountId.Number + orderSubaccountId = updateMessage.OrderUpdate.OrderId.SubaccountId case *ocutypes.OffChainUpdateV1_OrderReplace: - orderSubaccountIdNumber = updateMessage.OrderReplace.Order.OrderId.SubaccountId.Number + orderSubaccountId = updateMessage.OrderReplace.Order.OrderId.SubaccountId default: - return 0, fmt.Errorf( + return nil, fmt.Errorf( "UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}: %+v", updateMessage, ) } - return orderSubaccountIdNumber, nil + subaccountId := satypes.SubaccountId{Owner: orderSubaccountId.Owner, Number: orderSubaccountId.Number} + return &subaccountId, nil } diff --git a/protocol/streaming/util/util_test.go b/protocol/streaming/util/util_test.go index 111cb8f4af2..179cc8902e8 100644 --- a/protocol/streaming/util/util_test.go +++ b/protocol/streaming/util/util_test.go @@ -11,46 +11,51 @@ import ( pv1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types" stypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types" "github.com/dydxprotocol/v4-chain/protocol/streaming/util" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" ) func _ToPtr[V any](v V) *V { return &v } -func TestGetOffChainUpdateV1SubaccountIdNumber(t *testing.T) { - subaccountIdNumber := uint32(1337) +func TestGetOffChainUpdateV1SubaccountId(t *testing.T) { + indexerSubaccountId := pv1types.IndexerSubaccountId{ + Owner: "dydx1gm0w9nymewm9z4u7wtyw6auru562xkhtftk80p", + Number: uint32(1337), + } + subaccountId := satypes.SubaccountId{ + Owner: "dydx1gm0w9nymewm9z4u7wtyw6auru562xkhtftk80p", + Number: uint32(1337), + } orderId := pv1types.IndexerOrderId{ - SubaccountId: pv1types.IndexerSubaccountId{ - Owner: "foo", - Number: uint32(subaccountIdNumber), - }, - ClientId: 0, - OrderFlags: 0, - ClobPairId: 0, + SubaccountId: indexerSubaccountId, + ClientId: 0, + OrderFlags: 0, + ClobPairId: 0, } order := pv1types.IndexerOrder{ OrderId: orderId, Side: pv1types.IndexerOrder_SIDE_BUY, - Quantums: uint64(10 ^ 6), + Quantums: uint64(1_000_000), Subticks: 1, GoodTilOneof: &pv1types.IndexerOrder_GoodTilBlock{ - GoodTilBlock: 10 ^ 9, + GoodTilBlock: 1_000_000_000, }, - TimeInForce: 10 ^ 9, + TimeInForce: 1_000_000_000, ReduceOnly: false, ClientMetadata: 0, ConditionType: pv1types.IndexerOrder_CONDITION_TYPE_UNSPECIFIED, ConditionalOrderTriggerSubticks: 0, } newOrder := order - newOrder.Quantums += 10 ^ 6 + newOrder.Quantums += 1_000_000 orderPlaceTime := time.Now() fillQuantums := uint64(1988) tests := map[string]struct { update ocutypes.OffChainUpdateV1 - id uint32 + id satypes.SubaccountId err error }{ "OrderPlace": { @@ -63,7 +68,7 @@ func TestGetOffChainUpdateV1SubaccountIdNumber(t *testing.T) { }, }, }, - id: subaccountIdNumber, + id: subaccountId, err: nil, }, "OrderRemove": { @@ -77,7 +82,7 @@ func TestGetOffChainUpdateV1SubaccountIdNumber(t *testing.T) { }, }, }, - id: subaccountIdNumber, + id: subaccountId, err: nil, }, "OrderUpdate": { @@ -89,7 +94,7 @@ func TestGetOffChainUpdateV1SubaccountIdNumber(t *testing.T) { }, }, }, - id: subaccountIdNumber, + id: subaccountId, err: nil, }, "OrderReplace": { @@ -103,16 +108,16 @@ func TestGetOffChainUpdateV1SubaccountIdNumber(t *testing.T) { }, }, }, - id: subaccountIdNumber, + id: subaccountId, err: nil, }, } for name, testCase := range tests { t.Run(name, func(t *testing.T) { - id, err := util.GetOffChainUpdateV1SubaccountIdNumber(testCase.update) + id, err := util.GetOffChainUpdateV1SubaccountId(testCase.update) fmt.Println("expected", id) - require.Equal(t, err, testCase.err) - require.Equal(t, id, testCase.id) + require.Equal(t, testCase.err, err) + require.Equal(t, testCase.id, *id) }) } } diff --git a/protocol/streaming/ws/websocket_server.go b/protocol/streaming/ws/websocket_server.go index c482ac5bd87..ced3456aef4 100644 --- a/protocol/streaming/ws/websocket_server.go +++ b/protocol/streaming/ws/websocket_server.go @@ -180,7 +180,8 @@ func parseSubaccountIds(r *http.Request) ([]*satypes.SubaccountId, error) { return subaccountIds, nil } -// parseFilterOrdersBySubaccountId is a helper function to parse the filterOrdersBySubaccountId flag from the query parameters. +// parseFilterOrdersBySubaccountId is a helper function to parse the filterOrdersBySubaccountId flag +// from the query parameters. func parseFilterOrdersBySubaccountId(r *http.Request) (bool, error) { token := r.URL.Query().Get("filterOrdersBySubaccountId") if token == "" {