Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adam/ct 948 add functionality to auxo to create new kafka topics #1756

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b9f6cca
Add block height message to BlockProposer
adamfraser Jun 13, 2024
f7481b2
Add ability for socks to forward block height messages
adamfraser Jun 17, 2024
fd27c82
Add block-height topic to docker config
adamfraser Jun 18, 2024
ce5dfcf
Allow subscribe and unsubscribe to block height without id
adamfraser Jun 18, 2024
f6c19e3
Fix incorrect block height url
adamfraser Jun 18, 2024
62acfd9
Use BlockHeightMessage type
adamfraser Jun 18, 2024
a7b2538
Correctly format block height message in kafka helper
adamfraser Jun 19, 2024
2251142
Remove unused import
adamfraser Jun 19, 2024
914c51d
Modify workflow to push to dev4 from this branch
adamfraser Jun 19, 2024
4fe5a8d
Add kafka publisher tests
adamfraser Jun 19, 2024
a3a840d
Handle block height id is same manner as markets
adamfraser Jun 19, 2024
eb64d70
Revert "Modify workflow to push to dev4 from this branch"
adamfraser Jun 19, 2024
1bcc69d
Rename V4_BLOCK_HEIGHT constant for consistency
adamfraser Jun 20, 2024
f224ef1
Add example to documentation
adamfraser Jun 20, 2024
ec3c1c0
Add block height websocket topic to docker compose local
adamfraser Jun 20, 2024
23a5e41
Rename & add some more tags to stats (#1715)
dydxwill Jun 18, 2024
c1d4f76
chore: `x/subaccounts` move helper functions to to `lib` package (#1711)
BrendanChou Jun 18, 2024
7f9dec0
[OTE-419] Add GB_GEO to compliance reasons (#1697)
Christopher-Li Jun 18, 2024
1e2a44a
[OTE-379] Move Indexer SendOnchainData from Endblocker to Precommitte…
teddyding Jun 18, 2024
34f6c3d
Optimize GetSettlementPpmWithPerpetual, Remove GetSettlementPpm (#1722)
BrendanChou Jun 18, 2024
41d32e3
chore: remove `BigInt0()` & `BigFloat0()` (#1724)
BrendanChou Jun 18, 2024
7eb0f44
[OTE-438] Allow users to read data if they are in FIRST_STRIKE_CLOSE_…
Christopher-Li Jun 18, 2024
473e615
[TRA-354] Add a hard cap to the number of markets listed for PML (#1644)
shrenujb Jun 18, 2024
707cd90
[TRA-414] Add x/revshare module skeleton (#1719)
shrenujb Jun 18, 2024
edc419c
optimize perpInfos (#1723)
BrendanChou Jun 19, 2024
1aaef3a
[OTE-420]: Deprecate ONBOARD from Indexer (#1728)
Christopher-Li Jun 19, 2024
dfbc8a8
[CT-856] Add order replacement to fix vault causing orderbook flicker…
chenyaoy Jun 19, 2024
def4635
Send order remove subaccount message on order replace (#1735)
chenyaoy Jun 20, 2024
8d78baa
chore: add `margin.Risk` type, make margining functions stateless (#1…
BrendanChou Jun 20, 2024
a4cf38d
[TRA-415] implement market mapper rev share gov msg (#1733)
shrenujb Jun 20, 2024
8b6c3d8
[CT-946] only send snapshots to uninitialized streams (#1738)
jayy04 Jun 20, 2024
a2777f3
Optimize `ContainsDuplicates` (#1637)
BrendanChou Jun 21, 2024
a3dea6f
Cherry-pick "Handle invalid price level updates gracefully. (#1747)" …
vincentwschau Jun 21, 2024
59cb777
[TRA-433] Add MarketMapperRevShareDetails state and associated functi…
shrenujb Jun 21, 2024
b49bf48
[TRA-442] Add query for market mapper revenue share params (#1746)
shrenujb Jun 21, 2024
d357ac7
[CT-941] introduce state methods for managing the safety heap (#1731)
jayy04 Jun 22, 2024
b97b568
add MapToSortedSlice function (#1753)
BrendanChou Jun 22, 2024
0228ca8
Perf: add an software override (1.5sec) for `TimeoutPropose` (#1751)
teddyding Jun 24, 2024
a59b27e
Fix logger using incorrect function
adamfraser Jun 24, 2024
c100f2d
Add functionality to create new kafka topics
adamfraser Jun 24, 2024
09a18d4
Deploy to dev5
adamfraser Jun 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 2 additions & 42 deletions .github/workflows/indexer-build-and-push-dev-staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,16 @@ name: Indexer Build & Push Images to AWS ECR for Dev / Staging branches
on: # yamllint disable-line rule:truthy
push:
branches:
- adam/ct-948-add-functionality-to-auxo-to-create-new-kafka-topics
- main
- 'release/indexer/v[0-9]+.[0-9]+.x' # e.g. release/indexer/v0.1.x
- 'release/indexer/v[0-9]+.x' # e.g. release/indexer/v1.x
# TODO(DEC-837): Customize github build and push to ECR by service with paths

jobs:
# Build and push to dev
call-build-and-push-ecs-services-dev:
name: (Dev) Build and Push ECS Services
uses: ./.github/workflows/indexer-build-and-push-all-ecr-images.yml
with:
ENVIRONMENT: dev
secrets: inherit

# Build and push to dev2
call-build-and-push-ecs-services-dev2:
name: (Dev2) Build and Push ECS Services
uses: ./.github/workflows/indexer-build-and-push-all-ecr-images.yml
with:
ENVIRONMENT: dev2
secrets: inherit

# Build and push to dev3
call-build-and-push-ecs-services-dev3:
name: (Dev3) Build and Push ECS Services
uses: ./.github/workflows/indexer-build-and-push-all-ecr-images.yml
with:
ENVIRONMENT: dev3
secrets: inherit

# Build and push to dev4
call-build-and-push-ecs-services-dev4:
name: (Dev4) Build and Push ECS Services
uses: ./.github/workflows/indexer-build-and-push-all-ecr-images.yml
with:
ENVIRONMENT: dev4
secrets: inherit

# Build and push to dev5
call-build-and-push-ecs-services-dev5:
name: (Dev5) Build and Push ECS Services
uses: ./.github/workflows/indexer-build-and-push-all-ecr-images.yml
with:
ENVIRONMENT: dev5
secrets: inherit

# Build and push to staging
call-build-and-push-ecs-services-staging:
name: (Staging) Build and Push ECS Services
uses: ./.github/workflows/indexer-build-and-push-all-ecr-images.yml
with:
ENVIRONMENT: staging
secrets: inherit
secrets: inherit
1 change: 1 addition & 0 deletions indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,5 @@ Other example subscription events:
{ "type": "subscribe", "channel": "v4_markets" }
{ "type": "subscribe", "channel": "v4_orderbook", "id": "BTC-USD" }
{ "type": "subscribe", "channel": "v4_subaccounts", "id": "address/0" }
{ "type": "subscribe", "channel": "v4_block_height" }
```
3 changes: 2 additions & 1 deletion indexer/docker-compose-local-deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ services:
to-websockets-subaccounts:1:1,\
to-websockets-trades:1:1,\
to-websockets-markets:1:1,\
to-websockets-candles:1:1"
to-websockets-candles:1:1,\
to-websockets-block-height:1:1"
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL_SAME_HOST://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
Expand Down
5 changes: 3 additions & 2 deletions indexer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ services:
environment:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS:
KAFKA_CREATE_TOPICS:
"to-ender:1:1,\
to-vulcan:1:1,\
to-websockets-orderbooks:1:1,\
to-websockets-subaccounts:1:1,\
to-websockets-trades:1:1,\
to-websockets-markets:1:1,\
to-websockets-candles:1:1"
to-websockets-candles:1:1,\
to-websockets-block-height:1:1"
postgres-test:
build:
context: .
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/kafka/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '3.0.0';
export const TRADES_WEBSOCKET_MESSAGE_VERSION: string = '2.1.0';
export const MARKETS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
export const CANDLES_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
export const BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
2 changes: 2 additions & 0 deletions indexer/packages/kafka/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export enum WebsocketTopics {
TO_WEBSOCKETS_TRADES = 'to-websockets-trades',
TO_WEBSOCKETS_MARKETS = 'to-websockets-markets',
TO_WEBSOCKETS_CANDLES = 'to-websockets-candles',
TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height',
}

export enum KafkaTopics {
Expand All @@ -14,4 +15,5 @@ export enum KafkaTopics {
TO_WEBSOCKETS_TRADES = 'to-websockets-trades',
TO_WEBSOCKETS_MARKETS = 'to-websockets-markets',
TO_WEBSOCKETS_CANDLES = 'to-websockets-candles',
TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height',
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import * as Knex from 'knex';

import { formatAlterTableEnumSql } from '../helpers';

export async function up(knex: Knex): Promise<void> {
return knex.raw(formatAlterTableEnumSql(
'compliance_status',
'reason',
['MANUAL', 'US_GEO', 'CA_GEO', 'GB_GEO', 'SANCTIONED_GEO', 'COMPLIANCE_PROVIDER'],
));
}

export async function down(knex: Knex): Promise<void> {
return knex.raw(formatAlterTableEnumSql(
'compliance_status',
'reason',
['MANUAL', 'US_GEO', 'CA_GEO', 'SANCTIONED_GEO', 'COMPLIANCE_PROVIDER'],
));
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export enum ComplianceReason {
MANUAL = 'MANUAL',
US_GEO = 'US_GEO',
CA_GEO = 'CA_GEO',
GB_GEO = 'GB_GEO',
SANCTIONED_GEO = 'SANCTIONED_GEO',
COMPLIANCE_PROVIDER = 'COMPLIANCE_PROVIDER',
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from '../../src/caches/orderbook-levels-cache';
import { OrderSide } from '@dydxprotocol-indexer/postgres';
import { OrderbookLevels, PriceLevel } from '../../src/types';
import { InvalidOptionsError, InvalidPriceLevelUpdateError } from '../../src/errors';
import { InvalidOptionsError } from '../../src/errors';
import { logger } from '@dydxprotocol-indexer/base';

describe('orderbookLevelsCache', () => {
Expand Down Expand Up @@ -176,11 +176,10 @@ describe('orderbookLevelsCache', () => {
expect(orderbookLevels.bids).toEqual([]);
});

it('throws error if update will cause quantums to be negative', async () => {
it('sets price level to 0 if update will cause quantums to be negative', async () => {
const humanPrice: string = '50000';
const quantums: string = '1000';
const invalidDelta: string = '-2000';
const resultingQuantums: string = '-1000';
// Set existing quantums for the level
await updatePriceLevel({
ticker,
Expand All @@ -190,31 +189,26 @@ describe('orderbookLevelsCache', () => {
client,
});

// Test that an error is thrown if the update results in a negative quantums for the price
// level
await expect(updatePriceLevel({
await updatePriceLevel({
ticker,
side: OrderSide.BUY,
humanPrice,
sizeDeltaInQuantums: invalidDelta,
client,
})).rejects.toBeInstanceOf(InvalidPriceLevelUpdateError);
});
expect(logger.crit).toHaveBeenCalledTimes(1);
await expect(updatePriceLevel({

// Expect that the value in the orderbook is set to 0
const orderbookLevels: OrderbookLevels = await getOrderBookLevels(
ticker,
side: OrderSide.BUY,
humanPrice,
sizeDeltaInQuantums: invalidDelta,
client,
})).rejects.toEqual(expect.objectContaining({
message: expect.stringContaining(resultingQuantums),
}));

// Expect that the value in the orderbook is unchanged
const orderbookLevels: OrderbookLevels = await getOrderBookLevels(ticker, client);
{
removeZeros: false,
},
);
expect(orderbookLevels.bids).toMatchObject([{
humanPrice,
quantums,
quantums: '0',
}]);
});
});
Expand Down
21 changes: 9 additions & 12 deletions indexer/packages/redis/src/caches/orderbook-levels-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import Big from 'big.js';
import _ from 'lodash';
import { Callback, RedisClient } from 'redis';

import { InvalidOptionsError, InvalidPriceLevelUpdateError } from '../errors';
import { InvalidOptionsError } from '../errors';
import { hGetAsync } from '../helpers/redis';
import { OrderbookLevels, PriceLevel } from '../types';
import {
Expand Down Expand Up @@ -58,32 +58,29 @@ export async function updatePriceLevel({
// NOTE: If this happens from a single price level update, it's possible for multiple subsequent
// price level updates to fail with the same error due to interleaved price level updates.
if (updatedQuantums < 0) {
// Undo the update. This can't be done in a Lua script as Redis runs Lua 5.1, which only
// uses doubles which support up to 53-bit integers. Race-condition where it's possible for a
// price-level to have negative quantums handled in `getOrderbookLevels` where price-levels with
// negative quantums are filtered out. Note: even though we are reverting this information, each
// call to incrementOrderbookLevel updates the lastUpdated key in the cache.
// Set the price level to 0.
// Race-condition where it's possible for a price-level to have negative quantums handled in
// `getOrderbookLevels` where price-levels with negative quantums are filtered out. Note: even
// though we are reverting this information, each call to incrementOrderbookLevel updates the
// lastUpdated key in the cache.
await incrementOrderbookLevel(
ticker,
side,
humanPrice,
// Needs to be an integer
Big(sizeDeltaInQuantums).mul(-1).toFixed(0),
Big(updatedQuantums).mul(-1).toFixed(0),
client,
);
logger.crit({
at: 'orderbookLevelsCache#updatePriceLevel',
message: 'Price level updated to negative quantums',
message: 'Price level updated to negative quantums, set to zero',
ticker,
side,
humanPrice,
updatedQuantums,
sizeDeltaInQuantums,
});
throw new InvalidPriceLevelUpdateError(
'#updatePriceLevel: Resulting price level has negative quantums, quantums = ' +
`${updatedQuantums}`,
);
return 0;
}

return updatedQuantums;
Expand Down
8 changes: 0 additions & 8 deletions indexer/packages/redis/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,3 @@ export class InvalidOptionsError extends Error {
Error.captureStackTrace(this, this.constructor);
}
}

export class InvalidPriceLevelUpdateError extends Error {
constructor(message: string) {
super(`Invalid price level update: ${message}`);
this.name = this.constructor.name;
Error.captureStackTrace(this, this.constructor);
}
}
Loading
Loading