From 55631cb759e4648a5c65f2900469555640216b16 Mon Sep 17 00:00:00 2001 From: jayy04 <103467857+jayy04@users.noreply.github.com> Date: Wed, 20 Mar 2024 19:48:41 -0400 Subject: [PATCH 01/11] [CT-700] separate indexer and grpc streaming events (#1209) * [CT-700] separate indexer and grpc streaming events * fix tests * comments * update --- protocol/app/app.go | 5 +- protocol/mocks/ClobKeeper.go | 5 ++ protocol/mocks/MemClob.go | 60 +++++++++++++++++++ protocol/mocks/MemClobKeeper.go | 5 ++ .../streaming/grpc/grpc_streaming_manager.go | 20 ++++--- protocol/testutil/memclob/keeper.go | 6 ++ protocol/x/clob/abci.go | 12 ++++ protocol/x/clob/keeper/keeper.go | 14 ++++- protocol/x/clob/keeper/orders.go | 2 - protocol/x/clob/memclob/memclob.go | 35 +++++++++-- .../x/clob/memclob/memclob_grpc_streaming.go | 54 +++++++++++++++-- .../memclob/memclob_grpc_streaming_test.go | 14 +++-- ...emclob_purge_invalid_memclob_state_test.go | 19 +++++- .../clob/memclob/memclob_remove_order_test.go | 1 + protocol/x/clob/types/clob_keeper.go | 5 ++ protocol/x/clob/types/mem_clob_keeper.go | 4 ++ protocol/x/clob/types/memclob.go | 12 ++++ 17 files changed, 241 insertions(+), 32 deletions(-) diff --git a/protocol/app/app.go b/protocol/app/app.go index 5e191efd0a..c21e4029ff 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -885,9 +885,8 @@ func New( clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts) logger.Info("Parsed CLOB flags", "Flags", clobFlags) - memClob := clobmodulememclob.NewMemClobPriceTimePriority( - app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(), - ) + memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled()) + memClob.SetGenerateOrderbookUpdates(app.GrpcStreamingManager.Enabled()) app.ClobKeeper = clobmodulekeeper.NewKeeper( appCodec, diff --git a/protocol/mocks/ClobKeeper.go b/protocol/mocks/ClobKeeper.go index ba2a9e6f27..a906480372 100644 --- a/protocol/mocks/ClobKeeper.go +++ b/protocol/mocks/ClobKeeper.go @@ -812,6 +812,11 @@ func (_m *ClobKeeper) RemoveOrderFillAmount(ctx types.Context, orderId clobtypes _m.Called(ctx, orderId) } +// SendOrderbookUpdates provides a mock function with given fields: offchainUpdates, snapshot +func (_m *ClobKeeper) SendOrderbookUpdates(offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { + _m.Called(offchainUpdates, snapshot) +} + // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight func (_m *ClobKeeper) SetLongTermOrderPlacement(ctx types.Context, order clobtypes.Order, blockHeight uint32) { _m.Called(ctx, order, blockHeight) diff --git a/protocol/mocks/MemClob.go b/protocol/mocks/MemClob.go index ffd1830df7..8e3549383f 100644 --- a/protocol/mocks/MemClob.go +++ b/protocol/mocks/MemClob.go @@ -254,6 +254,66 @@ func (_m *MemClob) GetOrderRemainingAmount(ctx types.Context, order clobtypes.Or return r0, r1 } +// GetOrderbookUpdatesForOrderPlacement provides a mock function with given fields: ctx, order +func (_m *MemClob) GetOrderbookUpdatesForOrderPlacement(ctx types.Context, order clobtypes.Order) *clobtypes.OffchainUpdates { + ret := _m.Called(ctx, order) + + if len(ret) == 0 { + panic("no return value specified for GetOrderbookUpdatesForOrderPlacement") + } + + var r0 *clobtypes.OffchainUpdates + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.Order) *clobtypes.OffchainUpdates); ok { + r0 = rf(ctx, order) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clobtypes.OffchainUpdates) + } + } + + return r0 +} + +// GetOrderbookUpdatesForOrderRemoval provides a mock function with given fields: ctx, orderId +func (_m *MemClob) GetOrderbookUpdatesForOrderRemoval(ctx types.Context, orderId clobtypes.OrderId) *clobtypes.OffchainUpdates { + ret := _m.Called(ctx, orderId) + + if len(ret) == 0 { + panic("no return value specified for GetOrderbookUpdatesForOrderRemoval") + } + + var r0 *clobtypes.OffchainUpdates + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.OrderId) *clobtypes.OffchainUpdates); ok { + r0 = rf(ctx, orderId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clobtypes.OffchainUpdates) + } + } + + return r0 +} + +// GetOrderbookUpdatesForOrderUpdate provides a mock function with given fields: ctx, orderId +func (_m *MemClob) GetOrderbookUpdatesForOrderUpdate(ctx types.Context, orderId clobtypes.OrderId) *clobtypes.OffchainUpdates { + ret := _m.Called(ctx, orderId) + + if len(ret) == 0 { + panic("no return value specified for GetOrderbookUpdatesForOrderUpdate") + } + + var r0 *clobtypes.OffchainUpdates + if rf, ok := ret.Get(0).(func(types.Context, clobtypes.OrderId) *clobtypes.OffchainUpdates); ok { + r0 = rf(ctx, orderId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*clobtypes.OffchainUpdates) + } + } + + return r0 +} + // GetPricePremium provides a mock function with given fields: ctx, clobPair, params func (_m *MemClob) GetPricePremium(ctx types.Context, clobPair clobtypes.ClobPair, params perpetualstypes.GetPricePremiumParams) (int32, error) { ret := _m.Called(ctx, clobPair, params) diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 32c78c5663..9dacd88c53 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -355,6 +355,11 @@ func (_m *MemClobKeeper) ReplayPlaceOrder(ctx types.Context, msg *clobtypes.MsgP return r0, r1, r2, r3 } +// SendOrderbookUpdates provides a mock function with given fields: offchainUpdates, snapshot +func (_m *MemClobKeeper) SendOrderbookUpdates(offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { + _m.Called(offchainUpdates, snapshot) +} + // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight func (_m *MemClobKeeper) SetLongTermOrderPlacement(ctx types.Context, order clobtypes.Order, blockHeight uint32) { _m.Called(ctx, order, blockHeight) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 95ae0a984e..8db6daca6b 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -103,19 +103,21 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( // Send updates to subscribers. idsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { + updatesToSend := make([]ocutypes.OffChainUpdateV1, 0) for _, clobPairId := range subscription.clobPairIds { if updates, ok := v1updates[clobPairId]; ok { - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: updates, - Snapshot: snapshot, - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) - break - } + updatesToSend = append(updatesToSend, updates...) } } + + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updatesToSend, + Snapshot: snapshot, + }, + ); err != nil { + idsToRemove = append(idsToRemove, id) + } } // Clean up subscriptions that have been closed. diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index 3cc9df6377..491e3165ca 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -503,3 +503,9 @@ func (f *FakeMemClobKeeper) ValidateSubaccountEquityTierLimitForNewOrder(ctx sdk func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger { return ctx.Logger() } + +func (f *FakeMemClobKeeper) SendOrderbookUpdates( + offchainUpdates *types.OffchainUpdates, + snapshot bool, +) { +} diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 5c2a5a7ce3..99a9f36993 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -152,6 +152,18 @@ func PrepareCheckState( offchainUpdates, ) + // For orders that are filled in the last block, send an orderbook update to the grpc streams. + if keeper.GetGrpcStreamingManager().Enabled() { + allUpdates := types.NewOffchainUpdates() + for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock { + if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists { + orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) + allUpdates.Append(orderbookUpdate) + } + } + keeper.SendOrderbookUpdates(allUpdates, false) + } + // 3. Place all stateful order placements included in the last block on the memclob. // Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock` // is called within `PlaceConditionalOrdersTriggeredInLastBlock`. diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index bb02c92867..01098cd8af 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -235,5 +235,17 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) { allUpdates.Append(update) } - streamingManager.SendOrderbookUpdates(allUpdates, true) + k.SendOrderbookUpdates(allUpdates, true) +} + +// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager. +func (k Keeper) SendOrderbookUpdates( + offchainUpdates *types.OffchainUpdates, + snapshot bool, +) { + if len(offchainUpdates.Messages) == 0 { + return + } + + k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, snapshot) } diff --git a/protocol/x/clob/keeper/orders.go b/protocol/x/clob/keeper/orders.go index 0ae4ab9cd4..5046c51ff2 100644 --- a/protocol/x/clob/keeper/orders.go +++ b/protocol/x/clob/keeper/orders.go @@ -1270,8 +1270,6 @@ func (k Keeper) SendOffchainMessages( } k.GetIndexerEventManager().SendOffchainData(update) } - - k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, false) } // getPessimisticCollateralCheckPrice returns the price in subticks we should use for collateralization checks. diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index f02fba7f74..5b9951569e 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -45,6 +45,9 @@ type MemClobPriceTimePriority struct { // ---- Fields for determining if off-chain update messages should be generated ---- generateOffchainUpdates bool + + // ---- Fields for determining if orderbook updates should be generated ---- + generateOrderbookUpdates bool } type OrderWithRemovalReason struct { @@ -56,10 +59,11 @@ func NewMemClobPriceTimePriority( generateOffchainUpdates bool, ) *MemClobPriceTimePriority { return &MemClobPriceTimePriority{ - openOrders: newMemclobOpenOrders(), - cancels: newMemclobCancels(), - operationsToPropose: *types.NewOperationsToPropose(), - generateOffchainUpdates: generateOffchainUpdates, + openOrders: newMemclobOpenOrders(), + cancels: newMemclobCancels(), + operationsToPropose: *types.NewOperationsToPropose(), + generateOffchainUpdates: generateOffchainUpdates, + generateOrderbookUpdates: false, } } @@ -71,6 +75,11 @@ func (m *MemClobPriceTimePriority) SetClobKeeper(clobKeeper types.MemClobKeeper) m.clobKeeper = clobKeeper } +// SetGenerateOffchainUpdates sets the `generateOffchainUpdates` field of the MemClob. +func (m *MemClobPriceTimePriority) SetGenerateOrderbookUpdates(generateOrderbookUpdates bool) { + m.generateOrderbookUpdates = generateOrderbookUpdates +} + // CancelOrder removes a Short-Term order by `OrderId` (if it exists) from all order-related data structures // in the memclob. This method manages only Short-Term cancellations. For stateful cancellations, see // `msg_server_cancel_orders.go`. @@ -1482,6 +1491,12 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook( } m.openOrders.mustAddOrderToOrderbook(ctx, newOrder, forceToFrontOfLevel) + + if m.generateOrderbookUpdates { + // Send an orderbook update to grpc streams. + orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + } } // mustPerformTakerOrderMatching performs matching using the provided taker order while the order @@ -1917,6 +1932,12 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder( !m.operationsToPropose.IsOrderPlacementInOperationsQueue(order) { m.operationsToPropose.RemoveShortTermOrderTxBytes(order) } + + if m.generateOrderbookUpdates { + // Send an orderbook update to grpc streams. + orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + } } // mustUpdateOrderbookStateWithMatchedMakerOrder updates the orderbook with a matched maker order. @@ -1934,6 +1955,12 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder panic("Total filled size of maker order greater than the order size") } + // Send an orderbook update for the order's new total filled amount. + if m.generateOrderbookUpdates { + orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) + m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + } + // If the order is fully filled, remove it from the orderbook. // Note we shouldn't remove Short-Term order hashes from `ShortTermOrderTxBytes` here since // the order was matched. diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming.go b/protocol/x/clob/memclob/memclob_grpc_streaming.go index 6cc8844bc5..16075a6a9e 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming.go @@ -3,6 +3,8 @@ package memclob import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates" + ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" + indexersharedtypes "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" ) @@ -27,7 +29,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( level.LevelOrders.Front.Each( func(order types.ClobOrder) { offchainUpdates.Append( - m.GetOffchainUpdatesForOrder(ctx, order.Order), + m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order), ) }, ) @@ -44,7 +46,7 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( level.LevelOrders.Front.Each( func(order types.ClobOrder) { offchainUpdates.Append( - m.GetOffchainUpdatesForOrder(ctx, order.Order), + m.GetOrderbookUpdatesForOrderPlacement(ctx, order.Order), ) }, ) @@ -54,10 +56,10 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrderbookSnapshot( return offchainUpdates } -// GetOffchainUpdatesForOrder returns a place order offchain message and -// a update order offchain message used to construct an order for -// the orderbook snapshot grpc stream. -func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder( +// GetOrderbookUpdatesForOrderPlacement returns a place order offchain message and +// a update order offchain message used to add an order for +// the orderbook grpc stream. +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderPlacement( ctx sdk.Context, order types.Order, ) (offchainUpdates *types.OffchainUpdates) { @@ -86,3 +88,43 @@ func (m *MemClobPriceTimePriority) GetOffchainUpdatesForOrder( return offchainUpdates } + +// GetOrderbookUpdatesForOrderRemoval returns a remove order offchain message +// used to remove an order for the orderbook grpc stream. +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderRemoval( + ctx sdk.Context, + orderId types.OrderId, +) (offchainUpdates *types.OffchainUpdates) { + offchainUpdates = types.NewOffchainUpdates() + if message, success := off_chain_updates.CreateOrderRemoveMessageWithReason( + ctx, + orderId, + indexersharedtypes.OrderRemovalReason_ORDER_REMOVAL_REASON_UNSPECIFIED, + ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED, + ); success { + offchainUpdates.AddRemoveMessage(orderId, message) + } + return offchainUpdates +} + +// GetOrderbookUpdatesForOrderUpdate returns an update order offchain message +// used to update an order for the orderbook grpc stream. +func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate( + ctx sdk.Context, + orderId types.OrderId, +) (offchainUpdates *types.OffchainUpdates) { + offchainUpdates = types.NewOffchainUpdates() + + // Get the current fill amount of the order. + fillAmount := m.GetOrderFilledAmount(ctx, orderId) + + // Generate an update message updating the total filled amount of order. + if message, success := off_chain_updates.CreateOrderUpdateMessage( + ctx, + orderId, + fillAmount, + ); success { + offchainUpdates.AddUpdateMessage(orderId, message) + } + return offchainUpdates +} diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming_test.go b/protocol/x/clob/memclob/memclob_grpc_streaming_test.go index 586c120d4d..07471e3b22 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming_test.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming_test.go @@ -25,6 +25,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) { "Logger", mock.Anything, ).Return(ctx.Logger()) + clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return() memclob := NewMemClobPriceTimePriority(false) memclob.SetClobKeeper(clobKeeper) @@ -48,9 +49,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Buy(t *testing.T) { expected := types.NewOffchainUpdates() // Buy orders are in descending order. - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1])) require.Equal(t, expected, offchainUpdates) } @@ -68,6 +69,7 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) { "Logger", mock.Anything, ).Return(ctx.Logger()) + clobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return() memclob := NewMemClobPriceTimePriority(false) memclob.SetClobKeeper(clobKeeper) @@ -91,9 +93,9 @@ func TestGetOffchainUpdatesForOrderbookSnapshot_Sell(t *testing.T) { expected := types.NewOffchainUpdates() // Sell orders are in ascending order. - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[1])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[2])) - expected.Append(memclob.GetOffchainUpdatesForOrder(ctx, orders[0])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[1])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[2])) + expected.Append(memclob.GetOrderbookUpdatesForOrderPlacement(ctx, orders[0])) require.Equal(t, expected, offchainUpdates) } diff --git a/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go b/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go index a206b9f159..8d80998aa5 100644 --- a/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go +++ b/protocol/x/clob/memclob/memclob_purge_invalid_memclob_state_test.go @@ -249,10 +249,11 @@ func TestPurgeInvalidMemclobState(t *testing.T) { // Setup memclob state. ctx, _, _ := sdktest.NewSdkContextWithMultistore() ctx = ctx.WithIsCheckTx(true) - mockMemClobKeeper := &mocks.MemClobKeeper{} memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} memclob.SetClobKeeper(mockMemClobKeeper) mockMemClobKeeper.On("Logger", mock.Anything).Return(log.NewNopLogger()).Maybe() + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() for _, operation := range tc.placedOperations { switch operation.Operation.(type) { @@ -339,6 +340,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenCalledWithDuplicateCanceledStatefulO ctx, _, _ := sdktest.NewSdkContextWithMultistore() ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + canceledStatefulOrderIds := []types.OrderId{ constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, @@ -368,6 +373,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenNonStatefulOrderIsCanceled(t *testin ctx, _, _ := sdktest.NewSdkContextWithMultistore() ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + shortTermOrderId := constants.Order_Alice_Num0_Id0_Clob2_Buy5_Price10_GTB15.OrderId require.PanicsWithValue( @@ -395,6 +404,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenCalledWithDuplicateExpiredStatefulOr ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + expiredStatefulOrderIds := []types.OrderId{ constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, constants.LongTermOrder_Alice_Num0_Id0_Clob0_Buy5_Price10_GTBT15.OrderId, @@ -425,6 +438,10 @@ func TestPurgeInvalidMemclobState_PanicsWhenCalledWithShortTermExpiredStatefulOr ctx = ctx.WithIsCheckTx(true) memclob := NewMemClobPriceTimePriority(true) + mockMemClobKeeper := &mocks.MemClobKeeper{} + memclob.SetClobKeeper(mockMemClobKeeper) + mockMemClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() + shortTermOrderId := constants.Order_Alice_Num0_Id0_Clob2_Buy5_Price10_GTB15.OrderId require.PanicsWithValue( diff --git a/protocol/x/clob/memclob/memclob_remove_order_test.go b/protocol/x/clob/memclob/memclob_remove_order_test.go index 7d3aa4e0b9..15c1d88803 100644 --- a/protocol/x/clob/memclob/memclob_remove_order_test.go +++ b/protocol/x/clob/memclob/memclob_remove_order_test.go @@ -330,6 +330,7 @@ func TestRemoveOrderIfFilled(t *testing.T) { memClobKeeper.On("AddOrderToOrderbookCollatCheck", mock.Anything, mock.Anything, mock.Anything). Return(true, make(map[satypes.SubaccountId]satypes.UpdateResult)) memClobKeeper.On("ValidateSubaccountEquityTierLimitForNewOrder", mock.Anything, mock.Anything).Return(nil) + memClobKeeper.On("SendOrderbookUpdates", mock.Anything, mock.Anything).Return().Maybe() // Set initial fill amount to `0` for all orders. initialCall := memClobKeeper.On("GetOrderFillAmount", mock.Anything, mock.Anything). diff --git a/protocol/x/clob/types/clob_keeper.go b/protocol/x/clob/types/clob_keeper.go index c31b741155..900451d6c5 100644 --- a/protocol/x/clob/types/clob_keeper.go +++ b/protocol/x/clob/types/clob_keeper.go @@ -127,5 +127,10 @@ type ClobKeeper interface { clobPair ClobPair, ) error UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error + // Gprc streaming InitializeNewGrpcStreams(ctx sdk.Context) + SendOrderbookUpdates( + offchainUpdates *OffchainUpdates, + snapshot bool, + ) } diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index 381c5870d1..7b944ab12a 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -111,4 +111,8 @@ type MemClobKeeper interface { Logger( ctx sdk.Context, ) log.Logger + SendOrderbookUpdates( + offchainUpdates *OffchainUpdates, + snapshot bool, + ) } diff --git a/protocol/x/clob/types/memclob.go b/protocol/x/clob/types/memclob.go index 2a0539b884..347c2f3254 100644 --- a/protocol/x/clob/types/memclob.go +++ b/protocol/x/clob/types/memclob.go @@ -141,4 +141,16 @@ type MemClob interface { ctx sdk.Context, clobPairId ClobPairId, ) (offchainUpdates *OffchainUpdates) + GetOrderbookUpdatesForOrderPlacement( + ctx sdk.Context, + order Order, + ) (offchainUpdates *OffchainUpdates) + GetOrderbookUpdatesForOrderRemoval( + ctx sdk.Context, + orderId OrderId, + ) (offchainUpdates *OffchainUpdates) + GetOrderbookUpdatesForOrderUpdate( + ctx sdk.Context, + orderId OrderId, + ) (offchainUpdates *OffchainUpdates) } From a87e4c16cef5df1ec66da0f4135b11a46309f88d Mon Sep 17 00:00:00 2001 From: jayy04 <103467857+jayy04@users.noreply.github.com> Date: Thu, 21 Mar 2024 10:01:58 -0400 Subject: [PATCH 02/11] [CT-700] only send response when there is at least one update (#1216) --- .../streaming/grpc/grpc_streaming_manager.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index 8db6daca6b..b4cea80b57 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -110,13 +110,15 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( } } - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: updatesToSend, - Snapshot: snapshot, - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) + if len(updatesToSend) > 0 { + if err := subscription.srv.Send( + &clobtypes.StreamOrderbookUpdatesResponse{ + Updates: updatesToSend, + Snapshot: snapshot, + }, + ); err != nil { + idsToRemove = append(idsToRemove, id) + } } } From b235b807d4f69bbaa07efcae4bc9b9e440f6e8c7 Mon Sep 17 00:00:00 2001 From: jayy04 <103467857+jayy04@users.noreply.github.com> Date: Tue, 26 Mar 2024 16:37:12 -0400 Subject: [PATCH 03/11] [CT-712] send order update when short term order state fill amounts are pruned (#1241) --- protocol/x/clob/keeper/order_state.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/protocol/x/clob/keeper/order_state.go b/protocol/x/clob/keeper/order_state.go index 1f28be8f67..fd9b499526 100644 --- a/protocol/x/clob/keeper/order_state.go +++ b/protocol/x/clob/keeper/order_state.go @@ -259,5 +259,22 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders( blockHeight := lib.MustConvertIntegerToUint32(ctx.BlockHeight()) // Prune all fill amounts from state which have a pruneable block height of the current `blockHeight`. - k.PruneOrdersForBlockHeight(ctx, blockHeight) + prunedOrderIds := k.PruneOrdersForBlockHeight(ctx, blockHeight) + + // Send an orderbook update for each pruned order for grpc streams. + // This is needed because short term orders are pruned in PrepareCheckState using + // keeper.MemClob.openOrders.blockExpirationsForOrders, which can fall out of sync with state fill amount + // pruning when there's replacement. + // Long-term fix would be to add logic to keep them in sync. + // TODO(CT-722): add logic to keep state fill amount pruning and order pruning in sync. + if k.GetGrpcStreamingManager().Enabled() { + allUpdates := types.NewOffchainUpdates() + for _, orderId := range prunedOrderIds { + if _, exists := k.MemClob.GetOrder(ctx, orderId); exists { + orderbookUpdate := k.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) + allUpdates.Append(orderbookUpdate) + } + } + k.SendOrderbookUpdates(allUpdates, false) + } } From 2aa2bdbb88d3385892090d1dff88cc0497f133b1 Mon Sep 17 00:00:00 2001 From: jayy04 <103467857+jayy04@users.noreply.github.com> Date: Tue, 26 Mar 2024 17:19:49 -0400 Subject: [PATCH 04/11] [CT-712] send fill amount updates for reverted operations (#1240) --- protocol/x/clob/abci.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 99a9f36993..a1989b347a 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -155,7 +155,26 @@ func PrepareCheckState( // For orders that are filled in the last block, send an orderbook update to the grpc streams. if keeper.GetGrpcStreamingManager().Enabled() { allUpdates := types.NewOffchainUpdates() + orderIdsToSend := make(map[types.OrderId]bool) + + // Send an update for reverted local operations. + for _, operation := range localValidatorOperationsQueue { + if match := operation.GetMatch(); match != nil { + orderIdsToSend[match.GetMatchOrders().TakerOrderId] = true + + for _, fill := range match.GetMatchOrders().Fills { + orderIdsToSend[fill.MakerOrderId] = true + } + } + } + + // Send an update for orders that were proposed. for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock { + orderIdsToSend[orderId] = true + } + + // Send update. + for orderId := range orderIdsToSend { if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists { orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) allUpdates.Append(orderbookUpdate) From 8e80f0b5f94a7d5c888ff04aa3b555e526cca9c5 Mon Sep 17 00:00:00 2001 From: jayy04 <103467857+jayy04@users.noreply.github.com> Date: Wed, 27 Mar 2024 08:46:28 -0400 Subject: [PATCH 05/11] [CT-723] add block number + stage to grpc updates (#1252) * [CT-723] add block number + stage to grpc updates * add indexer changes --- .../src/codegen/dydxprotocol/clob/query.ts | 40 +++- proto/dydxprotocol/clob/query.proto | 7 + protocol/lib/context.go | 7 + protocol/mocks/ClobKeeper.go | 6 +- protocol/mocks/MemClobKeeper.go | 6 +- .../streaming/grpc/grpc_streaming_manager.go | 8 +- .../streaming/grpc/noop_streaming_manager.go | 2 + protocol/streaming/grpc/types/manager.go | 2 + protocol/testutil/memclob/keeper.go | 1 + protocol/x/clob/abci.go | 7 +- protocol/x/clob/keeper/keeper.go | 11 +- protocol/x/clob/keeper/order_state.go | 2 +- protocol/x/clob/memclob/memclob.go | 6 +- .../x/clob/memclob/memclob_grpc_streaming.go | 4 +- protocol/x/clob/types/clob_keeper.go | 1 + protocol/x/clob/types/mem_clob_keeper.go | 1 + protocol/x/clob/types/query.pb.go | 213 ++++++++++++------ 17 files changed, 237 insertions(+), 87 deletions(-) diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts index ddf8e9bd2d..aa31787820 100644 --- a/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts @@ -233,6 +233,15 @@ export interface StreamOrderbookUpdatesResponse { */ snapshot: boolean; + /** + * ---Additional fields used to debug issues--- + * Block height of the updates. + */ + + blockHeight: number; + /** Exec mode of the updates. */ + + execMode: number; } /** * StreamOrderbookUpdatesResponse is a response message for the @@ -250,6 +259,15 @@ export interface StreamOrderbookUpdatesResponseSDKType { */ snapshot: boolean; + /** + * ---Additional fields used to debug issues--- + * Block height of the updates. + */ + + block_height: number; + /** Exec mode of the updates. */ + + exec_mode: number; } function createBaseQueryGetClobPairRequest(): QueryGetClobPairRequest { @@ -904,7 +922,9 @@ export const StreamOrderbookUpdatesRequest = { function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse { return { updates: [], - snapshot: false + snapshot: false, + blockHeight: 0, + execMode: 0 }; } @@ -918,6 +938,14 @@ export const StreamOrderbookUpdatesResponse = { writer.uint32(16).bool(message.snapshot); } + if (message.blockHeight !== 0) { + writer.uint32(24).uint32(message.blockHeight); + } + + if (message.execMode !== 0) { + writer.uint32(32).uint32(message.execMode); + } + return writer; }, @@ -938,6 +966,14 @@ export const StreamOrderbookUpdatesResponse = { message.snapshot = reader.bool(); break; + case 3: + message.blockHeight = reader.uint32(); + break; + + case 4: + message.execMode = reader.uint32(); + break; + default: reader.skipType(tag & 7); break; @@ -951,6 +987,8 @@ export const StreamOrderbookUpdatesResponse = { const message = createBaseStreamOrderbookUpdatesResponse(); message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || []; message.snapshot = object.snapshot ?? false; + message.blockHeight = object.blockHeight ?? 0; + message.execMode = object.execMode ?? 0; return message; } diff --git a/proto/dydxprotocol/clob/query.proto b/proto/dydxprotocol/clob/query.proto index b4a61e2062..3745756894 100644 --- a/proto/dydxprotocol/clob/query.proto +++ b/proto/dydxprotocol/clob/query.proto @@ -153,4 +153,11 @@ message StreamOrderbookUpdatesResponse { // Note that if the snapshot is true, then all previous entries should be // discarded and the orderbook should be resynced. bool snapshot = 2; + + // ---Additional fields used to debug issues--- + // Block height of the updates. + uint32 block_height = 3; + + // Exec mode of the updates. + uint32 exec_mode = 4; } diff --git a/protocol/lib/context.go b/protocol/lib/context.go index 4380ea6ce7..3a78471e23 100644 --- a/protocol/lib/context.go +++ b/protocol/lib/context.go @@ -5,6 +5,13 @@ import ( "github.com/cometbft/cometbft/crypto/tmhash" ) +// Custom exec modes +const ( + ExecModeBeginBlock = 100 + ExecModeEndBlock = 101 + ExecModePrepareCheckState = 102 +) + type TxHash string func GetTxHash(tx []byte) TxHash { diff --git a/protocol/mocks/ClobKeeper.go b/protocol/mocks/ClobKeeper.go index a906480372..df09418670 100644 --- a/protocol/mocks/ClobKeeper.go +++ b/protocol/mocks/ClobKeeper.go @@ -812,9 +812,9 @@ func (_m *ClobKeeper) RemoveOrderFillAmount(ctx types.Context, orderId clobtypes _m.Called(ctx, orderId) } -// SendOrderbookUpdates provides a mock function with given fields: offchainUpdates, snapshot -func (_m *ClobKeeper) SendOrderbookUpdates(offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { - _m.Called(offchainUpdates, snapshot) +// SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates, snapshot +func (_m *ClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { + _m.Called(ctx, offchainUpdates, snapshot) } // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight diff --git a/protocol/mocks/MemClobKeeper.go b/protocol/mocks/MemClobKeeper.go index 9dacd88c53..a7cfd906bd 100644 --- a/protocol/mocks/MemClobKeeper.go +++ b/protocol/mocks/MemClobKeeper.go @@ -355,9 +355,9 @@ func (_m *MemClobKeeper) ReplayPlaceOrder(ctx types.Context, msg *clobtypes.MsgP return r0, r1, r2, r3 } -// SendOrderbookUpdates provides a mock function with given fields: offchainUpdates, snapshot -func (_m *MemClobKeeper) SendOrderbookUpdates(offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { - _m.Called(offchainUpdates, snapshot) +// SendOrderbookUpdates provides a mock function with given fields: ctx, offchainUpdates, snapshot +func (_m *MemClobKeeper) SendOrderbookUpdates(ctx types.Context, offchainUpdates *clobtypes.OffchainUpdates, snapshot bool) { + _m.Called(ctx, offchainUpdates, snapshot) } // SetLongTermOrderPlacement provides a mock function with given fields: ctx, order, blockHeight diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index b4cea80b57..501d7f96f5 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -76,6 +76,8 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, snapshot bool, + blockHeight uint32, + execMode uint32, ) { // Group updates by clob pair id. updates := make(map[uint32]*clobtypes.OffchainUpdates) @@ -113,8 +115,10 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( if len(updatesToSend) > 0 { if err := subscription.srv.Send( &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: updatesToSend, - Snapshot: snapshot, + Updates: updatesToSend, + Snapshot: snapshot, + BlockHeight: blockHeight, + ExecMode: execMode, }, ); err != nil { idsToRemove = append(idsToRemove, id) diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go index f8cc229772..0d74704ace 100644 --- a/protocol/streaming/grpc/noop_streaming_manager.go +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -29,6 +29,8 @@ func (sm *NoopGrpcStreamingManager) Subscribe( func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( updates *clobtypes.OffchainUpdates, snapshot bool, + blockHeight uint32, + execMode uint32, ) { } diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go index 0027358e79..a2b5b35501 100644 --- a/protocol/streaming/grpc/types/manager.go +++ b/protocol/streaming/grpc/types/manager.go @@ -18,5 +18,7 @@ type GrpcStreamingManager interface { SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, snapshot bool, + blockHeight uint32, + execMode uint32, ) } diff --git a/protocol/testutil/memclob/keeper.go b/protocol/testutil/memclob/keeper.go index 491e3165ca..849bdc3673 100644 --- a/protocol/testutil/memclob/keeper.go +++ b/protocol/testutil/memclob/keeper.go @@ -505,6 +505,7 @@ func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger { } func (f *FakeMemClobKeeper) SendOrderbookUpdates( + ctx sdk.Context, offchainUpdates *types.OffchainUpdates, snapshot bool, ) { diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index a1989b347a..546b4ec04e 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -20,6 +20,7 @@ func BeginBlocker( ctx sdk.Context, keeper types.ClobKeeper, ) { + ctx = ctx.WithValue("ExecMode", lib.ExecModeBeginBlock) // Initialize the set of process proposer match events for the next block effectively // removing any events that occurred in the last block. keeper.MustSetProcessProposerMatchesEvents( @@ -35,6 +36,8 @@ func EndBlocker( ctx sdk.Context, keeper keeper.Keeper, ) { + ctx = ctx.WithValue("ExecMode", lib.ExecModeEndBlock) + processProposerMatchesEvents := keeper.GetProcessProposerMatchesEvents(ctx) // Prune any fill amounts from state which are now past their `pruneableBlockHeight`. @@ -117,6 +120,8 @@ func PrepareCheckState( ctx sdk.Context, keeper *keeper.Keeper, ) { + ctx = ctx.WithValue("ExecMode", lib.ExecModePrepareCheckState) + // Get the events generated from processing the matches in the latest block. processProposerMatchesEvents := keeper.GetProcessProposerMatchesEvents(ctx) if ctx.BlockHeight() != int64(processProposerMatchesEvents.BlockHeight) { @@ -180,7 +185,7 @@ func PrepareCheckState( allUpdates.Append(orderbookUpdate) } } - keeper.SendOrderbookUpdates(allUpdates, false) + keeper.SendOrderbookUpdates(ctx, allUpdates, false) } // 3. Place all stateful order placements included in the last block on the memclob. diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 01098cd8af..1c09052f6f 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -235,11 +235,12 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) { allUpdates.Append(update) } - k.SendOrderbookUpdates(allUpdates, true) + k.SendOrderbookUpdates(ctx, allUpdates, true) } // SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager. func (k Keeper) SendOrderbookUpdates( + ctx sdk.Context, offchainUpdates *types.OffchainUpdates, snapshot bool, ) { @@ -247,5 +248,11 @@ func (k Keeper) SendOrderbookUpdates( return } - k.GetGrpcStreamingManager().SendOrderbookUpdates(offchainUpdates, snapshot) + execMode, _ := ctx.Value("ExecMode").(uint32) + k.GetGrpcStreamingManager().SendOrderbookUpdates( + offchainUpdates, + snapshot, + lib.MustConvertIntegerToUint32(ctx.BlockHeight()), + execMode, + ) } diff --git a/protocol/x/clob/keeper/order_state.go b/protocol/x/clob/keeper/order_state.go index fd9b499526..d48996dc5f 100644 --- a/protocol/x/clob/keeper/order_state.go +++ b/protocol/x/clob/keeper/order_state.go @@ -275,6 +275,6 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders( allUpdates.Append(orderbookUpdate) } } - k.SendOrderbookUpdates(allUpdates, false) + k.SendOrderbookUpdates(ctx, allUpdates, false) } } diff --git a/protocol/x/clob/memclob/memclob.go b/protocol/x/clob/memclob/memclob.go index 5b9951569e..3ae395f23b 100644 --- a/protocol/x/clob/memclob/memclob.go +++ b/protocol/x/clob/memclob/memclob.go @@ -1495,7 +1495,7 @@ func (m *MemClobPriceTimePriority) mustAddOrderToOrderbook( if m.generateOrderbookUpdates { // Send an orderbook update to grpc streams. orderbookUpdate := m.GetOrderbookUpdatesForOrderPlacement(ctx, newOrder) - m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false) } } @@ -1936,7 +1936,7 @@ func (m *MemClobPriceTimePriority) mustRemoveOrder( if m.generateOrderbookUpdates { // Send an orderbook update to grpc streams. orderbookUpdate := m.GetOrderbookUpdatesForOrderRemoval(ctx, order.OrderId) - m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false) } } @@ -1958,7 +1958,7 @@ func (m *MemClobPriceTimePriority) mustUpdateOrderbookStateWithMatchedMakerOrder // Send an orderbook update for the order's new total filled amount. if m.generateOrderbookUpdates { orderbookUpdate := m.GetOrderbookUpdatesForOrderUpdate(ctx, makerOrder.OrderId) - m.clobKeeper.SendOrderbookUpdates(orderbookUpdate, false) + m.clobKeeper.SendOrderbookUpdates(ctx, orderbookUpdate, false) } // If the order is fully filled, remove it from the orderbook. diff --git a/protocol/x/clob/memclob/memclob_grpc_streaming.go b/protocol/x/clob/memclob/memclob_grpc_streaming.go index 16075a6a9e..1ab7c03ad3 100644 --- a/protocol/x/clob/memclob/memclob_grpc_streaming.go +++ b/protocol/x/clob/memclob/memclob_grpc_streaming.go @@ -97,7 +97,7 @@ func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderRemoval( ) (offchainUpdates *types.OffchainUpdates) { offchainUpdates = types.NewOffchainUpdates() if message, success := off_chain_updates.CreateOrderRemoveMessageWithReason( - ctx, + m.clobKeeper.Logger(ctx), orderId, indexersharedtypes.OrderRemovalReason_ORDER_REMOVAL_REASON_UNSPECIFIED, ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_BEST_EFFORT_CANCELED, @@ -120,7 +120,7 @@ func (m *MemClobPriceTimePriority) GetOrderbookUpdatesForOrderUpdate( // Generate an update message updating the total filled amount of order. if message, success := off_chain_updates.CreateOrderUpdateMessage( - ctx, + m.clobKeeper.Logger(ctx), orderId, fillAmount, ); success { diff --git a/protocol/x/clob/types/clob_keeper.go b/protocol/x/clob/types/clob_keeper.go index 900451d6c5..5d1ae91b6e 100644 --- a/protocol/x/clob/types/clob_keeper.go +++ b/protocol/x/clob/types/clob_keeper.go @@ -130,6 +130,7 @@ type ClobKeeper interface { // Gprc streaming InitializeNewGrpcStreams(ctx sdk.Context) SendOrderbookUpdates( + ctx sdk.Context, offchainUpdates *OffchainUpdates, snapshot bool, ) diff --git a/protocol/x/clob/types/mem_clob_keeper.go b/protocol/x/clob/types/mem_clob_keeper.go index 7b944ab12a..d959698d61 100644 --- a/protocol/x/clob/types/mem_clob_keeper.go +++ b/protocol/x/clob/types/mem_clob_keeper.go @@ -112,6 +112,7 @@ type MemClobKeeper interface { ctx sdk.Context, ) log.Logger SendOrderbookUpdates( + ctx sdk.Context, offchainUpdates *OffchainUpdates, snapshot bool, ) diff --git a/protocol/x/clob/types/query.pb.go b/protocol/x/clob/types/query.pb.go index 7f48a50310..ee500746b5 100644 --- a/protocol/x/clob/types/query.pb.go +++ b/protocol/x/clob/types/query.pb.go @@ -711,6 +711,11 @@ type StreamOrderbookUpdatesResponse struct { // Note that if the snapshot is true, then all previous entries should be // discarded and the orderbook should be resynced. Snapshot bool `protobuf:"varint,2,opt,name=snapshot,proto3" json:"snapshot,omitempty"` + // ---Additional fields used to debug issues--- + // Block height of the updates. + BlockHeight uint32 `protobuf:"varint,3,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` + // Exec mode of the updates. + ExecMode uint32 `protobuf:"varint,4,opt,name=exec_mode,json=execMode,proto3" json:"exec_mode,omitempty"` } func (m *StreamOrderbookUpdatesResponse) Reset() { *m = StreamOrderbookUpdatesResponse{} } @@ -760,6 +765,20 @@ func (m *StreamOrderbookUpdatesResponse) GetSnapshot() bool { return false } +func (m *StreamOrderbookUpdatesResponse) GetBlockHeight() uint32 { + if m != nil { + return m.BlockHeight + } + return 0 +} + +func (m *StreamOrderbookUpdatesResponse) GetExecMode() uint32 { + if m != nil { + return m.ExecMode + } + return 0 +} + func init() { proto.RegisterType((*QueryGetClobPairRequest)(nil), "dydxprotocol.clob.QueryGetClobPairRequest") proto.RegisterType((*QueryClobPairResponse)(nil), "dydxprotocol.clob.QueryClobPairResponse") @@ -781,75 +800,77 @@ func init() { func init() { proto.RegisterFile("dydxprotocol/clob/query.proto", fileDescriptor_3365c195b25c5bc0) } var fileDescriptor_3365c195b25c5bc0 = []byte{ - // 1073 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xcf, 0x6f, 0xdc, 0x44, - 0x14, 0xce, 0x24, 0xa5, 0x4d, 0xa7, 0x80, 0x60, 0xd2, 0xa4, 0x8b, 0x93, 0x6c, 0xb6, 0x86, 0x24, - 0x9b, 0x54, 0xd8, 0x49, 0x5a, 0xa1, 0x92, 0xa2, 0x4a, 0x49, 0x04, 0x11, 0x52, 0x43, 0x17, 0x53, - 0x02, 0x82, 0x4a, 0xd6, 0xac, 0x3d, 0xbb, 0x19, 0xc5, 0xf6, 0x38, 0xf6, 0xd8, 0x4a, 0x84, 0x10, - 0x12, 0x07, 0x2e, 0x70, 0x40, 0x42, 0x82, 0x03, 0x47, 0xee, 0xfc, 0x07, 0x08, 0xb8, 0xf5, 0x58, - 0x89, 0x0b, 0x07, 0x84, 0x50, 0xc2, 0x99, 0xbf, 0x01, 0x79, 0x3c, 0xbb, 0x5d, 0xc7, 0x3f, 0x36, - 0xc9, 0x65, 0xd7, 0x9e, 0xf9, 0xde, 0xf3, 0xf7, 0xbd, 0xf7, 0xfc, 0x8d, 0xe1, 0xac, 0x7d, 0x64, - 0x1f, 0xfa, 0x01, 0xe3, 0xcc, 0x62, 0x8e, 0x6e, 0x39, 0xac, 0xad, 0x1f, 0x44, 0x24, 0x38, 0xd2, - 0xc4, 0x1a, 0x7a, 0x79, 0x70, 0x5b, 0x4b, 0xb6, 0x95, 0xeb, 0x5d, 0xd6, 0x65, 0x62, 0x49, 0x4f, - 0xae, 0x52, 0xa0, 0x32, 0xd3, 0x65, 0xac, 0xeb, 0x10, 0x1d, 0xfb, 0x54, 0xc7, 0x9e, 0xc7, 0x38, - 0xe6, 0x94, 0x79, 0xa1, 0xdc, 0x5d, 0xb6, 0x58, 0xe8, 0xb2, 0x50, 0x6f, 0xe3, 0x90, 0xa4, 0xf9, - 0xf5, 0x78, 0xb5, 0x4d, 0x38, 0x5e, 0xd5, 0x7d, 0xdc, 0xa5, 0x9e, 0x00, 0x4b, 0xac, 0x9e, 0x67, - 0xd4, 0x76, 0x98, 0xb5, 0x6f, 0x06, 0x98, 0x13, 0xd3, 0xa1, 0x2e, 0xe5, 0xa6, 0xc5, 0xbc, 0x0e, - 0xed, 0xca, 0x80, 0x9b, 0xf9, 0x80, 0xe4, 0xc7, 0xf4, 0x31, 0x0d, 0x24, 0x64, 0x25, 0x0f, 0x21, - 0x07, 0x11, 0xe5, 0x47, 0x26, 0xa7, 0x24, 0x28, 0x4a, 0x7a, 0x2b, 0x1f, 0xe1, 0xd0, 0x83, 0x88, - 0xda, 0xa9, 0xae, 0x2c, 0x78, 0x3a, 0x0f, 0x76, 0x49, 0x2c, 0x37, 0xef, 0x67, 0x36, 0xa9, 0x67, - 0x93, 0x43, 0x12, 0xe8, 0xac, 0xd3, 0x31, 0xad, 0x3d, 0x4c, 0x3d, 0x33, 0xf2, 0x6d, 0xcc, 0x49, - 0x98, 0x5f, 0x49, 0xe3, 0xd5, 0x25, 0x78, 0xe3, 0xfd, 0xa4, 0x62, 0xdb, 0x84, 0x6f, 0x39, 0xac, - 0xdd, 0xc2, 0x34, 0x30, 0xc8, 0x41, 0x44, 0x42, 0x8e, 0x5e, 0x84, 0xa3, 0xd4, 0xae, 0x81, 0x06, - 0x68, 0xbe, 0x60, 0x8c, 0x52, 0x5b, 0xfd, 0x08, 0x4e, 0x0a, 0xe8, 0x33, 0x5c, 0xe8, 0x33, 0x2f, - 0x24, 0xe8, 0x3e, 0xbc, 0xda, 0x2f, 0x89, 0xc0, 0x5f, 0x5b, 0x9b, 0xd6, 0x72, 0xad, 0xd5, 0x7a, - 0x71, 0x9b, 0x97, 0x9e, 0xfc, 0x3d, 0x37, 0x62, 0x8c, 0x5b, 0xf2, 0x5e, 0xc5, 0x92, 0xc3, 0x86, - 0xe3, 0x9c, 0xe6, 0xf0, 0x0e, 0x84, 0xcf, 0x5a, 0x28, 0x73, 0x2f, 0x68, 0x69, 0xbf, 0xb5, 0xa4, - 0xdf, 0x5a, 0x3a, 0x4f, 0xb2, 0xdf, 0x5a, 0x0b, 0x77, 0x89, 0x8c, 0x35, 0x06, 0x22, 0xd5, 0x9f, - 0x00, 0xac, 0x65, 0xc8, 0x6f, 0x38, 0x4e, 0x19, 0xff, 0xb1, 0x73, 0xf2, 0x47, 0xdb, 0x19, 0x92, - 0xa3, 0x82, 0xe4, 0xe2, 0x50, 0x92, 0xe9, 0xc3, 0x33, 0x2c, 0xff, 0x02, 0x70, 0x6e, 0x87, 0xc4, - 0xef, 0x31, 0x9b, 0x3c, 0x62, 0xc9, 0xef, 0x16, 0x76, 0xac, 0xc8, 0x11, 0x9b, 0xbd, 0x8a, 0x3c, - 0x86, 0x53, 0xe9, 0xc0, 0xfa, 0x01, 0xf3, 0x59, 0x48, 0x02, 0xd3, 0xc5, 0xdc, 0xda, 0x23, 0x61, - 0xbf, 0x3a, 0x79, 0xe6, 0xbb, 0xd8, 0x49, 0x46, 0x8b, 0x05, 0x3b, 0x24, 0xde, 0x49, 0xd1, 0xc6, - 0x75, 0x91, 0xa5, 0x25, 0x93, 0xc8, 0x55, 0xf4, 0x29, 0x9c, 0x8c, 0x7b, 0x60, 0xd3, 0x25, 0xb1, - 0xe9, 0x12, 0x1e, 0x50, 0x2b, 0xec, 0xab, 0xca, 0x27, 0xcf, 0x10, 0xde, 0x49, 0xe1, 0xc6, 0x44, - 0x3c, 0xf8, 0xc8, 0x74, 0x51, 0xfd, 0x0f, 0xc0, 0x46, 0xb9, 0x3c, 0xd9, 0x8c, 0x2e, 0xbc, 0x12, - 0x90, 0x30, 0x72, 0x78, 0x28, 0x5b, 0xb1, 0x3d, 0xec, 0x99, 0x05, 0x59, 0x12, 0xc0, 0x86, 0x67, - 0xef, 0x32, 0x27, 0x72, 0x49, 0x8b, 0x04, 0x49, 0xeb, 0x64, 0xdb, 0x7a, 0xd9, 0x15, 0x0c, 0x27, - 0x0a, 0x50, 0xa8, 0x01, 0x9f, 0xef, 0x0f, 0x83, 0xd9, 0x9f, 0x7f, 0xd8, 0x6b, 0xf6, 0xbb, 0x36, - 0x7a, 0x09, 0x8e, 0xb9, 0x24, 0x16, 0x15, 0x19, 0x35, 0x92, 0x4b, 0x34, 0x05, 0x2f, 0xc7, 0x22, - 0x49, 0x6d, 0xac, 0x01, 0x9a, 0x97, 0x0c, 0x79, 0xa7, 0x2e, 0xc3, 0xa6, 0x18, 0xba, 0xb7, 0x85, - 0x1b, 0x3c, 0xa2, 0x24, 0x78, 0x90, 0x78, 0xc1, 0x96, 0x78, 0xbb, 0xa3, 0x60, 0xb0, 0xaf, 0xea, - 0x8f, 0x00, 0x2e, 0x9d, 0x01, 0x2c, 0xab, 0xe4, 0xc1, 0x5a, 0x99, 0xc5, 0xc8, 0x39, 0xd0, 0x0b, - 0xca, 0x56, 0x95, 0x5a, 0x96, 0x67, 0x92, 0x14, 0x61, 0xd4, 0x25, 0xb8, 0x28, 0xc8, 0x6d, 0x26, - 0x43, 0x63, 0x60, 0x4e, 0xca, 0x85, 0xfc, 0x00, 0xa4, 0xea, 0x4a, 0xac, 0xd4, 0xb1, 0x0f, 0x6f, - 0x94, 0xd8, 0xaf, 0x94, 0xa1, 0x15, 0xc8, 0xa8, 0x48, 0x2c, 0x55, 0xa4, 0xc3, 0x7d, 0x0a, 0xa2, - 0x2e, 0xc2, 0x79, 0x41, 0xec, 0xc1, 0x80, 0xd5, 0x16, 0x4a, 0xf8, 0x0a, 0xc0, 0x85, 0x61, 0x48, - 0x29, 0xe0, 0x31, 0x9c, 0x28, 0x70, 0x6e, 0x49, 0x7e, 0xbe, 0x80, 0x7c, 0x3e, 0xa5, 0xe4, 0x8c, - 0x9c, 0xdc, 0x8e, 0xba, 0x01, 0x67, 0x3f, 0xe0, 0x01, 0xc1, 0xee, 0xc3, 0xc0, 0x26, 0x41, 0x9b, - 0xb1, 0xfd, 0x0f, 0x53, 0xf7, 0xee, 0xb9, 0x41, 0x7e, 0x5a, 0xc7, 0xb2, 0xd3, 0xaa, 0x7e, 0x0f, - 0x60, 0xbd, 0x2c, 0x87, 0xd4, 0xf0, 0x31, 0xbc, 0x22, 0x0f, 0x05, 0xf9, 0xca, 0xdd, 0xcd, 0xf2, - 0x96, 0xa7, 0x8a, 0x96, 0x3f, 0x43, 0x1e, 0x76, 0x3a, 0x5b, 0xc9, 0x42, 0x9a, 0x71, 0x77, 0xb5, - 0xf7, 0x8e, 0xc9, 0x7d, 0xa4, 0xc0, 0xf1, 0xd0, 0xc3, 0x7e, 0xb8, 0xc7, 0xb8, 0x78, 0x5f, 0xc6, - 0x8d, 0xfe, 0xfd, 0xda, 0xcf, 0x57, 0xe1, 0x73, 0xa2, 0xc8, 0xe8, 0x6b, 0x00, 0xc7, 0x7b, 0xe6, - 0x8a, 0x96, 0x0b, 0x6a, 0x56, 0x72, 0x42, 0x29, 0xcd, 0x32, 0xec, 0xe9, 0x23, 0x4a, 0x5d, 0xfa, - 0xf2, 0x8f, 0x7f, 0xbf, 0x1b, 0x7d, 0x15, 0xdd, 0xd4, 0x2b, 0x8e, 0x73, 0xfd, 0x33, 0x6a, 0x7f, - 0x8e, 0xbe, 0x01, 0xf0, 0xda, 0xc0, 0x29, 0x51, 0x4e, 0x28, 0x7f, 0x5c, 0x29, 0xb7, 0x86, 0x11, - 0x1a, 0x38, 0x76, 0xd4, 0xd7, 0x04, 0xa7, 0x3a, 0x9a, 0xa9, 0xe2, 0x84, 0x7e, 0x05, 0xb0, 0x56, - 0x66, 0x77, 0x68, 0xed, 0x5c, 0xde, 0x98, 0x72, 0xbc, 0x7d, 0x01, 0x3f, 0x55, 0xd7, 0x05, 0xd7, - 0x3b, 0xeb, 0x60, 0x59, 0xd5, 0xf5, 0xc2, 0xef, 0x11, 0xd3, 0x63, 0x36, 0x31, 0x39, 0x4b, 0xff, - 0xad, 0x01, 0x92, 0xbf, 0x03, 0x38, 0x53, 0xe5, 0x3c, 0xe8, 0x5e, 0x59, 0xd5, 0xce, 0xe0, 0x9b, - 0xca, 0x5b, 0x17, 0x0b, 0x96, 0xba, 0x16, 0x84, 0xae, 0x06, 0xaa, 0xeb, 0x95, 0xdf, 0x70, 0xe8, - 0x17, 0x00, 0xa7, 0x2b, 0x6c, 0x07, 0xad, 0x97, 0xb1, 0x18, 0x6e, 0x98, 0xca, 0xbd, 0x0b, 0xc5, - 0x4a, 0x01, 0xf3, 0x42, 0xc0, 0x1c, 0x9a, 0xad, 0xfc, 0xb0, 0x45, 0xbf, 0x01, 0xf8, 0x4a, 0xa9, - 0x99, 0xa1, 0xbb, 0x65, 0x0c, 0x86, 0x39, 0xa5, 0xf2, 0xe6, 0x05, 0x22, 0x25, 0x73, 0x4d, 0x30, - 0x6f, 0xa2, 0x05, 0xfd, 0x4c, 0x1f, 0xc3, 0xe8, 0x0b, 0x38, 0x55, 0xec, 0x63, 0x68, 0xa5, 0x80, - 0x44, 0xa5, 0x6d, 0x2a, 0xab, 0xe7, 0x88, 0x48, 0xe9, 0xae, 0x80, 0xcd, 0xd6, 0x93, 0xe3, 0x3a, - 0x78, 0x7a, 0x5c, 0x07, 0xff, 0x1c, 0xd7, 0xc1, 0xb7, 0x27, 0xf5, 0x91, 0xa7, 0x27, 0xf5, 0x91, - 0x3f, 0x4f, 0xea, 0x23, 0x9f, 0xbc, 0xd1, 0xa5, 0x7c, 0x2f, 0x6a, 0x6b, 0x16, 0x73, 0xb3, 0x62, - 0xe2, 0x3b, 0xaf, 0x0b, 0xc3, 0xd4, 0xfb, 0x2b, 0x87, 0xa9, 0x40, 0x7e, 0xe4, 0x93, 0xb0, 0x7d, - 0x59, 0x2c, 0xdf, 0xfe, 0x3f, 0x00, 0x00, 0xff, 0xff, 0x3b, 0xb1, 0xe8, 0xed, 0x27, 0x0d, 0x00, - 0x00, + // 1112 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xcd, 0x4f, 0x24, 0x45, + 0x14, 0xa7, 0x00, 0x77, 0xe1, 0xb1, 0x6b, 0xb4, 0x58, 0xd8, 0x71, 0x80, 0x01, 0x5a, 0xf9, 0xdc, + 0xd8, 0x0d, 0xec, 0xc6, 0xac, 0xac, 0xd9, 0x04, 0x88, 0xa2, 0xc9, 0xe2, 0x62, 0xbb, 0xa2, 0xd1, + 0x4d, 0x3a, 0x35, 0xdd, 0xc5, 0x4c, 0x85, 0xee, 0xae, 0xa1, 0xbb, 0xa6, 0x03, 0x31, 0xc6, 0xc4, + 0x83, 0x17, 0x3d, 0x98, 0x78, 0xf0, 0xe0, 0xd1, 0xbb, 0xff, 0x81, 0x51, 0x6f, 0x7b, 0xdc, 0xc4, + 0xc4, 0x78, 0x30, 0xc6, 0x80, 0x67, 0xff, 0x86, 0x4d, 0x57, 0xd7, 0xcc, 0xce, 0xd0, 0x1f, 0x03, + 0x5c, 0x66, 0xba, 0x5f, 0xfd, 0xde, 0xeb, 0xdf, 0xfb, 0xa8, 0x5f, 0x15, 0x4c, 0x39, 0xc7, 0xce, + 0x51, 0x23, 0xe0, 0x82, 0xdb, 0xdc, 0x35, 0x6c, 0x97, 0x57, 0x8d, 0xc3, 0x26, 0x0d, 0x8e, 0x75, + 0x69, 0xc3, 0x2f, 0x77, 0x2e, 0xeb, 0xf1, 0x72, 0xf9, 0x46, 0x8d, 0xd7, 0xb8, 0x34, 0x19, 0xf1, + 0x53, 0x02, 0x2c, 0x4f, 0xd6, 0x38, 0xaf, 0xb9, 0xd4, 0x20, 0x0d, 0x66, 0x10, 0xdf, 0xe7, 0x82, + 0x08, 0xc6, 0xfd, 0x50, 0xad, 0x2e, 0xdb, 0x3c, 0xf4, 0x78, 0x68, 0x54, 0x49, 0x48, 0x93, 0xf8, + 0x46, 0xb4, 0x5a, 0xa5, 0x82, 0xac, 0x1a, 0x0d, 0x52, 0x63, 0xbe, 0x04, 0x2b, 0xac, 0x91, 0x66, + 0x54, 0x75, 0xb9, 0x7d, 0x60, 0x05, 0x44, 0x50, 0xcb, 0x65, 0x1e, 0x13, 0x96, 0xcd, 0xfd, 0x7d, + 0x56, 0x53, 0x0e, 0xb3, 0x69, 0x87, 0xf8, 0xc7, 0x6a, 0x10, 0x16, 0x28, 0xc8, 0x4a, 0x1a, 0x42, + 0x0f, 0x9b, 0x4c, 0x1c, 0x5b, 0x82, 0xd1, 0x20, 0x2b, 0xe8, 0xad, 0xb4, 0x87, 0xcb, 0x0e, 0x9b, + 0xcc, 0x49, 0xf2, 0xea, 0x06, 0x4f, 0xa4, 0xc1, 0x1e, 0x8d, 0xd4, 0xe2, 0xfd, 0xae, 0x45, 0xe6, + 0x3b, 0xf4, 0x88, 0x06, 0x06, 0xdf, 0xdf, 0xb7, 0xec, 0x3a, 0x61, 0xbe, 0xd5, 0x6c, 0x38, 0x44, + 0xd0, 0x30, 0x6d, 0x49, 0xfc, 0xb5, 0x25, 0xb8, 0xf9, 0x41, 0x5c, 0xb1, 0x6d, 0x2a, 0xb6, 0x5c, + 0x5e, 0xdd, 0x25, 0x2c, 0x30, 0xe9, 0x61, 0x93, 0x86, 0x02, 0xbf, 0x08, 0xfd, 0xcc, 0x29, 0xa1, + 0x19, 0xb4, 0x78, 0xdd, 0xec, 0x67, 0x8e, 0xf6, 0x31, 0x8c, 0x49, 0xe8, 0x73, 0x5c, 0xd8, 0xe0, + 0x7e, 0x48, 0xf1, 0x7d, 0x18, 0x6e, 0x97, 0x44, 0xe2, 0x47, 0xd6, 0x26, 0xf4, 0x54, 0x6b, 0xf5, + 0x96, 0xdf, 0xe6, 0xe0, 0x93, 0x7f, 0xa6, 0xfb, 0xcc, 0x21, 0x5b, 0xbd, 0x6b, 0x44, 0x71, 0xd8, + 0x70, 0xdd, 0xb3, 0x1c, 0xde, 0x01, 0x78, 0xde, 0x42, 0x15, 0x7b, 0x5e, 0x4f, 0xfa, 0xad, 0xc7, + 0xfd, 0xd6, 0x93, 0x79, 0x52, 0xfd, 0xd6, 0x77, 0x49, 0x8d, 0x2a, 0x5f, 0xb3, 0xc3, 0x53, 0xfb, + 0x09, 0x41, 0xa9, 0x8b, 0xfc, 0x86, 0xeb, 0xe6, 0xf1, 0x1f, 0xb8, 0x20, 0x7f, 0xbc, 0xdd, 0x45, + 0xb2, 0x5f, 0x92, 0x5c, 0xe8, 0x49, 0x32, 0xf9, 0x78, 0x17, 0xcb, 0xbf, 0x11, 0x4c, 0xef, 0xd0, + 0xe8, 0x7d, 0xee, 0xd0, 0x47, 0x3c, 0xfe, 0xdd, 0x22, 0xae, 0xdd, 0x74, 0xe5, 0x62, 0xab, 0x22, + 0x8f, 0x61, 0x3c, 0x19, 0xd8, 0x46, 0xc0, 0x1b, 0x3c, 0xa4, 0x81, 0xe5, 0x11, 0x61, 0xd7, 0x69, + 0xd8, 0xae, 0x4e, 0x9a, 0xf9, 0x1e, 0x71, 0xe3, 0xd1, 0xe2, 0xc1, 0x0e, 0x8d, 0x76, 0x12, 0xb4, + 0x79, 0x43, 0x46, 0xd9, 0x55, 0x41, 0x94, 0x15, 0x7f, 0x06, 0x63, 0x51, 0x0b, 0x6c, 0x79, 0x34, + 0xb2, 0x3c, 0x2a, 0x02, 0x66, 0x87, 0xed, 0xac, 0xd2, 0xc1, 0xbb, 0x08, 0xef, 0x24, 0x70, 0x73, + 0x34, 0xea, 0xfc, 0x64, 0x62, 0xd4, 0xfe, 0x47, 0x30, 0x93, 0x9f, 0x9e, 0x6a, 0x46, 0x0d, 0xae, + 0x06, 0x34, 0x6c, 0xba, 0x22, 0x54, 0xad, 0xd8, 0xee, 0xf5, 0xcd, 0x8c, 0x28, 0x31, 0x60, 0xc3, + 0x77, 0xf6, 0xb8, 0xdb, 0xf4, 0xe8, 0x2e, 0x0d, 0xe2, 0xd6, 0xa9, 0xb6, 0xb5, 0xa2, 0x97, 0x09, + 0x8c, 0x66, 0xa0, 0xf0, 0x0c, 0x5c, 0x6b, 0x0f, 0x83, 0xd5, 0x9e, 0x7f, 0x68, 0x35, 0xfb, 0x3d, + 0x07, 0xbf, 0x04, 0x03, 0x1e, 0x8d, 0x64, 0x45, 0xfa, 0xcd, 0xf8, 0x11, 0x8f, 0xc3, 0x95, 0x48, + 0x06, 0x29, 0x0d, 0xcc, 0xa0, 0xc5, 0x41, 0x53, 0xbd, 0x69, 0xcb, 0xb0, 0x28, 0x87, 0xee, 0x6d, + 0xa9, 0x06, 0x8f, 0x18, 0x0d, 0x1e, 0xc4, 0x5a, 0xb0, 0x25, 0x77, 0x77, 0x33, 0xe8, 0xec, 0xab, + 0xf6, 0x23, 0x82, 0xa5, 0x73, 0x80, 0x55, 0x95, 0x7c, 0x28, 0xe5, 0x49, 0x8c, 0x9a, 0x03, 0x23, + 0xa3, 0x6c, 0x45, 0xa1, 0x55, 0x79, 0xc6, 0x68, 0x16, 0x46, 0x5b, 0x82, 0x05, 0x49, 0x6e, 0x33, + 0x1e, 0x1a, 0x93, 0x08, 0x9a, 0x9f, 0xc8, 0x0f, 0x48, 0x65, 0x5d, 0x88, 0x55, 0x79, 0x1c, 0xc0, + 0xcd, 0x1c, 0xf9, 0x55, 0x69, 0xe8, 0x19, 0x69, 0x14, 0x04, 0x56, 0x59, 0x24, 0xc3, 0x7d, 0x06, + 0xa2, 0x2d, 0xc0, 0x9c, 0x24, 0xf6, 0xa0, 0x43, 0x6a, 0x33, 0x53, 0xf8, 0x1a, 0xc1, 0x7c, 0x2f, + 0xa4, 0x4a, 0xe0, 0x31, 0x8c, 0x66, 0x28, 0xb7, 0x22, 0x3f, 0x97, 0x41, 0x3e, 0x1d, 0x52, 0x71, + 0xc6, 0x6e, 0x6a, 0x45, 0xdb, 0x80, 0xa9, 0x0f, 0x45, 0x40, 0x89, 0xf7, 0x30, 0x70, 0x68, 0x50, + 0xe5, 0xfc, 0xe0, 0xa3, 0x44, 0xbd, 0x5b, 0x6a, 0x90, 0x9e, 0xd6, 0x81, 0xee, 0x69, 0xd5, 0xfe, + 0x44, 0x50, 0xc9, 0x8b, 0xa1, 0x72, 0xf8, 0x04, 0xae, 0xaa, 0x43, 0x41, 0x6d, 0xb9, 0xbb, 0xdd, + 0xbc, 0xd5, 0xa9, 0xa2, 0xa7, 0xcf, 0x90, 0x87, 0xfb, 0xfb, 0x5b, 0xb1, 0x21, 0x89, 0xb8, 0xb7, + 0xda, 0xda, 0x63, 0x6a, 0x1d, 0x97, 0x61, 0x28, 0xf4, 0x49, 0x23, 0xac, 0x73, 0x21, 0xf7, 0xcb, + 0x90, 0xd9, 0x7e, 0xc7, 0xb3, 0x70, 0x2d, 0x69, 0x7d, 0x9d, 0xb2, 0x5a, 0x5d, 0xc8, 0xad, 0x73, + 0xdd, 0x1c, 0x91, 0xb6, 0x77, 0xa5, 0x09, 0x4f, 0xc0, 0x30, 0x3d, 0xa2, 0xb6, 0xe5, 0x71, 0x87, + 0x96, 0x06, 0xe5, 0xfa, 0x50, 0x6c, 0xd8, 0xe1, 0x0e, 0x5d, 0xfb, 0x79, 0x18, 0x5e, 0x90, 0x4d, + 0xc2, 0xdf, 0x20, 0x18, 0x6a, 0x89, 0x33, 0x5e, 0xce, 0xa8, 0x79, 0xce, 0x09, 0x57, 0x5e, 0xcc, + 0xc3, 0x9e, 0x3d, 0xe2, 0xb4, 0xa5, 0xaf, 0xfe, 0xf8, 0xef, 0xfb, 0xfe, 0x57, 0xf1, 0xac, 0x51, + 0x70, 0x1d, 0x30, 0x3e, 0x67, 0xce, 0x17, 0xf8, 0x5b, 0x04, 0x23, 0x1d, 0xa7, 0x4c, 0x3e, 0xa1, + 0xf4, 0x71, 0x57, 0xbe, 0xd5, 0x8b, 0x50, 0xc7, 0xb1, 0xa5, 0xbd, 0x26, 0x39, 0x55, 0xf0, 0x64, + 0x11, 0x27, 0xfc, 0x2b, 0x82, 0x52, 0x9e, 0x5c, 0xe2, 0xb5, 0x0b, 0x69, 0x6b, 0xc2, 0xf1, 0xf6, + 0x25, 0xf4, 0x58, 0x5b, 0x97, 0x5c, 0xef, 0xac, 0xa3, 0x65, 0xcd, 0x30, 0x32, 0xef, 0x33, 0x96, + 0xcf, 0x1d, 0x6a, 0x09, 0x9e, 0xfc, 0xdb, 0x1d, 0x24, 0x7f, 0x47, 0x30, 0x59, 0xa4, 0x5c, 0xf8, + 0x5e, 0x5e, 0xd5, 0xce, 0xa1, 0xbb, 0xe5, 0xb7, 0x2e, 0xe7, 0xac, 0xf2, 0x9a, 0x97, 0x79, 0xcd, + 0xe0, 0x8a, 0x51, 0x78, 0x07, 0xc4, 0xbf, 0x20, 0x98, 0x28, 0x90, 0x2d, 0xbc, 0x9e, 0xc7, 0xa2, + 0xb7, 0xe0, 0x96, 0xef, 0x5d, 0xca, 0x57, 0x25, 0x30, 0x27, 0x13, 0x98, 0xc6, 0x53, 0x85, 0x17, + 0x63, 0xfc, 0x1b, 0x82, 0x57, 0x72, 0xc5, 0x10, 0xdf, 0xcd, 0x63, 0xd0, 0x4b, 0x69, 0xcb, 0x6f, + 0x5e, 0xc2, 0x53, 0x31, 0xd7, 0x25, 0xf3, 0x45, 0x3c, 0x6f, 0x9c, 0xeb, 0x32, 0x8d, 0xbf, 0x84, + 0xf1, 0x6c, 0x1d, 0xc4, 0x2b, 0x19, 0x24, 0x0a, 0x65, 0xb7, 0xbc, 0x7a, 0x01, 0x8f, 0x84, 0xee, + 0x0a, 0xda, 0xdc, 0x7d, 0x72, 0x52, 0x41, 0x4f, 0x4f, 0x2a, 0xe8, 0xdf, 0x93, 0x0a, 0xfa, 0xee, + 0xb4, 0xd2, 0xf7, 0xf4, 0xb4, 0xd2, 0xf7, 0xd7, 0x69, 0xa5, 0xef, 0xd3, 0x37, 0x6a, 0x4c, 0xd4, + 0x9b, 0x55, 0xdd, 0xe6, 0x5e, 0x77, 0x32, 0xd1, 0x9d, 0xd7, 0xa5, 0xe0, 0x1a, 0x6d, 0xcb, 0x51, + 0x92, 0xa0, 0x38, 0x6e, 0xd0, 0xb0, 0x7a, 0x45, 0x9a, 0x6f, 0x3f, 0x0b, 0x00, 0x00, 0xff, 0xff, + 0xf1, 0xed, 0x1e, 0x02, 0x67, 0x0d, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1687,6 +1708,16 @@ func (m *StreamOrderbookUpdatesResponse) MarshalToSizedBuffer(dAtA []byte) (int, _ = i var l int _ = l + if m.ExecMode != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.ExecMode)) + i-- + dAtA[i] = 0x20 + } + if m.BlockHeight != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.BlockHeight)) + i-- + dAtA[i] = 0x18 + } if m.Snapshot { i-- if m.Snapshot { @@ -1921,6 +1952,12 @@ func (m *StreamOrderbookUpdatesResponse) Size() (n int) { if m.Snapshot { n += 2 } + if m.BlockHeight != 0 { + n += 1 + sovQuery(uint64(m.BlockHeight)) + } + if m.ExecMode != 0 { + n += 1 + sovQuery(uint64(m.ExecMode)) + } return n } @@ -3201,6 +3238,44 @@ func (m *StreamOrderbookUpdatesResponse) Unmarshal(dAtA []byte) error { } } m.Snapshot = bool(v != 0) + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field BlockHeight", wireType) + } + m.BlockHeight = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.BlockHeight |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ExecMode", wireType) + } + m.ExecMode = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ExecMode |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipQuery(dAtA[iNdEx:]) From 54c2f013dbb5b93fb4d2e141df8cde2b2d9ab3d5 Mon Sep 17 00:00:00 2001 From: jayy04 <103467857+jayy04@users.noreply.github.com> Date: Wed, 27 Mar 2024 16:23:05 -0400 Subject: [PATCH 06/11] [CT-727] avoid state reads when sending updates (#1261) --- protocol/x/clob/keeper/order_state.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/protocol/x/clob/keeper/order_state.go b/protocol/x/clob/keeper/order_state.go index d48996dc5f..63419eddfc 100644 --- a/protocol/x/clob/keeper/order_state.go +++ b/protocol/x/clob/keeper/order_state.go @@ -5,6 +5,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" @@ -271,8 +272,13 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders( allUpdates := types.NewOffchainUpdates() for _, orderId := range prunedOrderIds { if _, exists := k.MemClob.GetOrder(ctx, orderId); exists { - orderbookUpdate := k.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId) - allUpdates.Append(orderbookUpdate) + if message, success := off_chain_updates.CreateOrderUpdateMessage( + ctx, + orderId, + 0, // Total filled quantums is zero because it's been pruned from state. + ); success { + allUpdates.AddUpdateMessage(orderId, message) + } } } k.SendOrderbookUpdates(ctx, allUpdates, false) From a2467a345c63d2c646f9d58578f512e7cb4e054f Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Thu, 28 Mar 2024 13:10:48 -0400 Subject: [PATCH 07/11] fix lint| --- protocol/x/clob/keeper/order_state.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/x/clob/keeper/order_state.go b/protocol/x/clob/keeper/order_state.go index 63419eddfc..e75a2692bb 100644 --- a/protocol/x/clob/keeper/order_state.go +++ b/protocol/x/clob/keeper/order_state.go @@ -273,7 +273,7 @@ func (k Keeper) PruneStateFillAmountsForShortTermOrders( for _, orderId := range prunedOrderIds { if _, exists := k.MemClob.GetOrder(ctx, orderId); exists { if message, success := off_chain_updates.CreateOrderUpdateMessage( - ctx, + k.Logger(ctx), orderId, 0, // Total filled quantums is zero because it's been pruned from state. ); success { From e9c99d34a82ad09a0eafa590a7ce164e6b5a8845 Mon Sep 17 00:00:00 2001 From: jayy04 <103467857+jayy04@users.noreply.github.com> Date: Thu, 28 Mar 2024 20:24:50 -0400 Subject: [PATCH 08/11] [CT-712] send updates for both normal order matches and liquidation (#1280) --- protocol/x/clob/abci.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 546b4ec04e..f378c1feb4 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -165,10 +165,18 @@ func PrepareCheckState( // Send an update for reverted local operations. for _, operation := range localValidatorOperationsQueue { if match := operation.GetMatch(); match != nil { - orderIdsToSend[match.GetMatchOrders().TakerOrderId] = true - - for _, fill := range match.GetMatchOrders().Fills { - orderIdsToSend[fill.MakerOrderId] = true + // For normal order matches, we send an update for the taker and maker orders. + if matchedOrders := match.GetMatchOrders(); matchedOrders != nil { + orderIdsToSend[matchedOrders.TakerOrderId] = true + for _, fill := range matchedOrders.Fills { + orderIdsToSend[fill.MakerOrderId] = true + } + } + // For liquidation matches, we send an update for the maker orders. + if matchedLiquidation := match.GetMatchPerpetualLiquidation(); matchedLiquidation != nil { + for _, fill := range matchedLiquidation.Fills { + orderIdsToSend[fill.MakerOrderId] = true + } } } } From 4d12438be81c97f9055c3595ee10ae05e7e00b2f Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Fri, 29 Mar 2024 10:23:25 -0400 Subject: [PATCH 09/11] fix test --- protocol/x/clob/abci_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/x/clob/abci_test.go b/protocol/x/clob/abci_test.go index e05ccd7076..0e37fec3d4 100644 --- a/protocol/x/clob/abci_test.go +++ b/protocol/x/clob/abci_test.go @@ -143,7 +143,7 @@ func TestEndBlocker_Failure(t *testing.T) { for _, orderId := range tc.expiredStatefulOrderIds { mockIndexerEventManager.On("AddTxnEvent", - ctx, + mock.Anything, indexerevents.SubtypeStatefulOrder, indexerevents.StatefulOrderEventVersion, indexer_manager.GetBytes( From 0252de45a44b3eae903dda1f3642758a0449bfea Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Fri, 29 Mar 2024 10:29:15 -0400 Subject: [PATCH 10/11] fix test --- protocol/x/clob/abci_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocol/x/clob/abci_test.go b/protocol/x/clob/abci_test.go index 0e37fec3d4..d7a9fc636d 100644 --- a/protocol/x/clob/abci_test.go +++ b/protocol/x/clob/abci_test.go @@ -764,7 +764,7 @@ func TestEndBlocker_Success(t *testing.T) { // Assert that the indexer events for Expired Stateful Orders were emitted. for _, orderId := range tc.expectedProcessProposerMatchesEvents.ExpiredStatefulOrderIds { mockIndexerEventManager.On("AddTxnEvent", - ctx, + mock.Anything, indexerevents.SubtypeStatefulOrder, indexerevents.StatefulOrderEventVersion, indexer_manager.GetBytes( @@ -779,7 +779,7 @@ func TestEndBlocker_Success(t *testing.T) { // Assert that the indexer events for triggered conditional orders were emitted. for _, orderId := range tc.expectedTriggeredConditionalOrderIds { mockIndexerEventManager.On("AddTxnEvent", - ctx, + mock.Anything, indexerevents.SubtypeStatefulOrder, indexerevents.StatefulOrderEventVersion, indexer_manager.GetBytes( From b7441a99038de42dab87fdc5065833d206aba1ad Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Mon, 1 Apr 2024 09:42:31 -0400 Subject: [PATCH 11/11] update type --- protocol/lib/context.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/protocol/lib/context.go b/protocol/lib/context.go index 3a78471e23..9bca90064d 100644 --- a/protocol/lib/context.go +++ b/protocol/lib/context.go @@ -2,14 +2,15 @@ package lib import ( "fmt" + "github.com/cometbft/cometbft/crypto/tmhash" ) // Custom exec modes const ( - ExecModeBeginBlock = 100 - ExecModeEndBlock = 101 - ExecModePrepareCheckState = 102 + ExecModeBeginBlock = uint32(100) + ExecModeEndBlock = uint32(101) + ExecModePrepareCheckState = uint32(102) ) type TxHash string