Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport v3.x] backport full node streaming - stateful order updates #1269

Merged
merged 11 commits into from
Apr 1, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ export interface StreamOrderbookUpdatesResponse {
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

blockHeight: number;
/** Exec mode of the updates. */

execMode: number;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
Expand All @@ -250,6 +259,15 @@ export interface StreamOrderbookUpdatesResponseSDKType {
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

block_height: number;
/** Exec mode of the updates. */

exec_mode: number;
}

function createBaseQueryGetClobPairRequest(): QueryGetClobPairRequest {
Expand Down Expand Up @@ -904,7 +922,9 @@ export const StreamOrderbookUpdatesRequest = {
function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse {
return {
updates: [],
snapshot: false
snapshot: false,
blockHeight: 0,
execMode: 0
};
}

Expand All @@ -918,6 +938,14 @@ export const StreamOrderbookUpdatesResponse = {
writer.uint32(16).bool(message.snapshot);
}

if (message.blockHeight !== 0) {
writer.uint32(24).uint32(message.blockHeight);
}

if (message.execMode !== 0) {
writer.uint32(32).uint32(message.execMode);
}

return writer;
},

Expand All @@ -938,6 +966,14 @@ export const StreamOrderbookUpdatesResponse = {
message.snapshot = reader.bool();
break;

case 3:
message.blockHeight = reader.uint32();
break;

case 4:
message.execMode = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -951,6 +987,8 @@ export const StreamOrderbookUpdatesResponse = {
const message = createBaseStreamOrderbookUpdatesResponse();
message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || [];
message.snapshot = object.snapshot ?? false;
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

Expand Down
7 changes: 7 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,11 @@ message StreamOrderbookUpdatesResponse {
// Note that if the snapshot is true, then all previous entries should be
// discarded and the orderbook should be resynced.
bool snapshot = 2;

// ---Additional fields used to debug issues---
// Block height of the updates.
uint32 block_height = 3;

// Exec mode of the updates.
uint32 exec_mode = 4;
}
5 changes: 2 additions & 3 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,9 +885,8 @@ func New(
clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts)
logger.Info("Parsed CLOB flags", "Flags", clobFlags)

memClob := clobmodulememclob.NewMemClobPriceTimePriority(
app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(),
)
memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled())
memClob.SetGenerateOrderbookUpdates(app.GrpcStreamingManager.Enabled())

app.ClobKeeper = clobmodulekeeper.NewKeeper(
appCodec,
Expand Down
7 changes: 7 additions & 0 deletions protocol/lib/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import (
"github.com/cometbft/cometbft/crypto/tmhash"
)

// Custom exec modes
const (
ExecModeBeginBlock = 100
ExecModeEndBlock = 101
ExecModePrepareCheckState = 102
)

type TxHash string

func GetTxHash(tx []byte) TxHash {
Expand Down
5 changes: 5 additions & 0 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 17 additions & 9 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode uint32,
) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
Expand Down Expand Up @@ -103,17 +105,23 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
updatesToSend := make([]ocutypes.OffChainUpdateV1, 0)
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := v1updates[clobPairId]; ok {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
break
}
updatesToSend = append(updatesToSend, updates...)
}
}

if len(updatesToSend) > 0 {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updatesToSend,
Snapshot: snapshot,
BlockHeight: blockHeight,
ExecMode: execMode,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func (sm *NoopGrpcStreamingManager) Subscribe(
func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode uint32,
) {
}

Expand Down
2 changes: 2 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ type GrpcStreamingManager interface {
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode uint32,
)
}
7 changes: 7 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,10 @@ func (f *FakeMemClobKeeper) ValidateSubaccountEquityTierLimitForNewOrder(ctx sdk
func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger()
}

func (f *FakeMemClobKeeper) SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
}
36 changes: 36 additions & 0 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func BeginBlocker(
ctx sdk.Context,
keeper types.ClobKeeper,
) {
ctx = ctx.WithValue("ExecMode", lib.ExecModeBeginBlock)
// Initialize the set of process proposer match events for the next block effectively
// removing any events that occurred in the last block.
keeper.MustSetProcessProposerMatchesEvents(
Expand All @@ -35,6 +36,8 @@ func EndBlocker(
ctx sdk.Context,
keeper keeper.Keeper,
) {
ctx = ctx.WithValue("ExecMode", lib.ExecModeEndBlock)

processProposerMatchesEvents := keeper.GetProcessProposerMatchesEvents(ctx)

// Prune any fill amounts from state which are now past their `pruneableBlockHeight`.
Expand Down Expand Up @@ -117,6 +120,8 @@ func PrepareCheckState(
ctx sdk.Context,
keeper *keeper.Keeper,
) {
ctx = ctx.WithValue("ExecMode", lib.ExecModePrepareCheckState)

// Get the events generated from processing the matches in the latest block.
processProposerMatchesEvents := keeper.GetProcessProposerMatchesEvents(ctx)
if ctx.BlockHeight() != int64(processProposerMatchesEvents.BlockHeight) {
Expand Down Expand Up @@ -152,6 +157,37 @@ func PrepareCheckState(
offchainUpdates,
)

// For orders that are filled in the last block, send an orderbook update to the grpc streams.
if keeper.GetGrpcStreamingManager().Enabled() {
allUpdates := types.NewOffchainUpdates()
orderIdsToSend := make(map[types.OrderId]bool)

// Send an update for reverted local operations.
for _, operation := range localValidatorOperationsQueue {
if match := operation.GetMatch(); match != nil {
orderIdsToSend[match.GetMatchOrders().TakerOrderId] = true

for _, fill := range match.GetMatchOrders().Fills {
orderIdsToSend[fill.MakerOrderId] = true
}
}
}

// Send an update for orders that were proposed.
for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock {
orderIdsToSend[orderId] = true
}

// Send update.
for orderId := range orderIdsToSend {
if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists {
orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId)
allUpdates.Append(orderbookUpdate)
}
}
keeper.SendOrderbookUpdates(ctx, allUpdates, false)
}

// 3. Place all stateful order placements included in the last block on the memclob.
// Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock`
// is called within `PlaceConditionalOrdersTriggeredInLastBlock`.
Expand Down
21 changes: 20 additions & 1 deletion protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,24 @@ func (k Keeper) InitializeNewGrpcStreams(ctx sdk.Context) {
allUpdates.Append(update)
}

streamingManager.SendOrderbookUpdates(allUpdates, true)
k.SendOrderbookUpdates(ctx, allUpdates, true)
}

// SendOrderbookUpdates sends the offchain updates to the gRPC streaming manager.
func (k Keeper) SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
if len(offchainUpdates.Messages) == 0 {
return
}

execMode, _ := ctx.Value("ExecMode").(uint32)
k.GetGrpcStreamingManager().SendOrderbookUpdates(
offchainUpdates,
snapshot,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
execMode,
)
}
Loading
Loading