From 1a8c451b7fa448a88617db0abbf2b40e0eb6aa81 Mon Sep 17 00:00:00 2001 From: Artem Poltorzhitskiy Date: Fri, 6 Sep 2024 12:01:22 +0200 Subject: [PATCH] Fix: receiver and optimization db request (#6) * Fix: receiver and optimization db request * Remove artifacts --- cmd/metadata/receiver.go | 7 +++- internal/storage/postgres/core.go | 42 +++++++++++++++++++++ internal/storage/postgres/token_metadata.go | 14 +++++-- internal/storage/token_metadata.go | 2 +- 4 files changed, 60 insertions(+), 5 deletions(-) diff --git a/cmd/metadata/receiver.go b/cmd/metadata/receiver.go index 6162894..a327893 100644 --- a/cmd/metadata/receiver.go +++ b/cmd/metadata/receiver.go @@ -119,12 +119,18 @@ func (r *Receiver) work(ctx context.Context) { } func (r *Receiver) worker(ctx context.Context, task storage.TokenMetadata) { + defer r.queue.Delete(task.Id) + if task.Uri == nil { task.Status = storage.StatusFailed task.Attempts = uint(r.maxAttempts) err := ErrInvalidUri.Error() task.Error = &err log.Err(ErrInvalidUri).Str("uri", "nil").Msg("fail to receive metadata") + + if err := r.storage.Update(ctx, &task); err != nil { + log.Err(err).Msg("saving token metadata in receiver") + } return } @@ -161,7 +167,6 @@ func (r *Receiver) worker(ctx context.Context, task storage.TokenMetadata) { if err := r.storage.Update(ctx, &task); err != nil { log.Err(err).Msg("saving token metadata in receiver") } - r.queue.Delete(task.Id) } func (r *Receiver) httpRequest(ctx context.Context, task *storage.TokenMetadata) error { diff --git a/internal/storage/postgres/core.go b/internal/storage/postgres/core.go index f636677..33e6817 100644 --- a/internal/storage/postgres/core.go +++ b/internal/storage/postgres/core.go @@ -3,6 +3,7 @@ package postgres import ( "context" "database/sql" + "github.com/dipdup-io/starknet-metadata/internal/storage/postgres/migrations" "github.com/uptrace/bun/migrate" @@ -74,6 +75,47 @@ func initDatabase(ctx context.Context, conn *database.Bun) error { func createIndices(ctx context.Context, conn *database.Bun) error { log.Info().Msg("creating indexes...") return conn.DB().RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error { + if _, err := tx.NewCreateIndex(). + IfNotExists(). + Model((*models.TokenMetadata)(nil)). + Index("tm_created_at_idx"). + Column("created_at"). + Exec(ctx); err != nil { + return err + } + if _, err := tx.NewCreateIndex(). + IfNotExists(). + Model((*models.TokenMetadata)(nil)). + Index("tm_updated_at_idx"). + Column("updated_at"). + Exec(ctx); err != nil { + return err + } + if _, err := tx.NewCreateIndex(). + IfNotExists(). + Model((*models.TokenMetadata)(nil)). + Index("tm_attempts_idx"). + Column("attempts"). + Exec(ctx); err != nil { + return err + } + if _, err := tx.NewCreateIndex(). + IfNotExists(). + Model((*models.TokenMetadata)(nil)). + Index("tm_status_idx"). + Column("status"). + Exec(ctx); err != nil { + return err + } + if _, err := tx.NewCreateIndex(). + IfNotExists(). + Model((*models.TokenMetadata)(nil)). + Index("tm_contract_id_idx"). + Column("contract_id"). + Exec(ctx); err != nil { + return err + } + return nil }) } diff --git a/internal/storage/postgres/token_metadata.go b/internal/storage/postgres/token_metadata.go index 2607432..57cbe41 100644 --- a/internal/storage/postgres/token_metadata.go +++ b/internal/storage/postgres/token_metadata.go @@ -22,9 +22,9 @@ func NewTokenMetadata(db *database.Bun) *TokenMetadata { // GetByStatus - func (tm *TokenMetadata) GetByStatus(ctx context.Context, status storage.Status, limit, offset, attempts, delay int) (response []storage.TokenMetadata, err error) { - query := tm.DB().NewSelect().Model(&response). + query := tm.DB().NewSelect(). + Model((*storage.TokenMetadata)(nil)). Where("status = ?", status). - Relation("Contract"). Order("attempts asc", "updated_at desc") if delay > 0 { @@ -41,6 +41,14 @@ func (tm *TokenMetadata) GetByStatus(ctx context.Context, status storage.Status, if attempts > 0 { query.Where("attempts < ?", attempts) } - err = query.Limit(limit).Offset(offset).Scan(ctx) + query = query.Limit(limit).Offset(offset) + + err = tm.DB().NewSelect(). + TableExpr("(?) as token_metadata", query). + ColumnExpr("token_metadata.*"). + ColumnExpr("address.hash as contract__hash"). + Join("left join address on contract_id = address.id"). + Scan(ctx, &response) + return } diff --git a/internal/storage/token_metadata.go b/internal/storage/token_metadata.go index 55e195e..82432e5 100644 --- a/internal/storage/token_metadata.go +++ b/internal/storage/token_metadata.go @@ -27,7 +27,7 @@ type TokenMetadata struct { Id uint64 `bun:"id,notnull,type:bigint,pk" comment:"Unique internal identity"` CreatedAt int64 `comment:"Time when row was created"` UpdatedAt int64 `comment:"Time when row was last updated"` - UpdateID int64 `json:"-" pg:",notnull" comment:"Update counter, increments on each and any token metadata update"` + UpdateID int64 `bun:",notnull" comment:"Update counter, increments on each and any token metadata update"` ContractID uint64 `comment:"Token contract id"` TokenId decimal.Decimal `bun:",type:numeric" comment:"Token id"` Type TokenType `bun:",type:token_type" comment:"Token type"`