Skip to content

Commit

Permalink
feat(telemetry): add swap transaction handling and migration (#13)
Browse files Browse the repository at this point in the history
* feat(telemetry): add swap transaction handling and migration

* feat(server): enable telemetry transaction indexing

* docs(deploy): update deployment instructions and add migrations

* chore: update app name in fly.toml configuration
  • Loading branch information
lmquang authored Feb 13, 2025
1 parent eda4f1f commit 059b734
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 202 deletions.
21 changes: 17 additions & 4 deletions docs/deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@

### 1. Initialize Fly.io Configuration
```bash
flyctl launch
flyctl launch --ha=false
```
- This command will detect your Dockerfile and create a `fly.toml` configuration file
- Choose a name for your application
- Select the region closest to your primary users

### 2. Configure Secrets
Create a `.env` file with your application's environment variables. Then import them:
Create a `.env.prod` file with your application's environment variables. Then import them:
```bash
flyctl secrets import < .env
flyctl secrets import < .env.prod
```

### 3. Set Required Environment Variables
Expand All @@ -44,7 +44,7 @@ flyctl secrets set \

### 4. Deploy the Application
```bash
flyctl deploy
flyctl deploy --ha=false
```

### 5. Verify Deployment
Expand All @@ -53,6 +53,19 @@ flyctl status
flyctl open # Opens the deployed application in your browser
```

### 6. Run Database Migrations
To run database migrations, use the following command:
```bash
export $(grep -v '^#' .env.prod | xargs) \
&& migrate -path ./migrations/schema \
-database "postgres://${DB_USER}:${DB_PASS}@${DB_HOST}:${DB_PORT}/${DB_NAME}?sslmode=${DB_SSL_MODE}" \
up
```
Notes:
- Ensure `.env.prod` contains all necessary database connection variables
- The `migrate` command applies all pending schema migrations
- Run this command after initial deployment or when new migrations are added

## Additional Fly.io Commands

- Scale your app:
Expand Down
13 changes: 5 additions & 8 deletions fly.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
# fly.toml app configuration file generated for icy-backend-dev on 2025-02-11T11:30:22+07:00
# fly.toml app configuration file generated for icy-backend-dev on 2025-02-13T10:52:05+07:00
#
# See https://fly.io/docs/reference/configuration/ for information about how to use this file.
#

app = 'icy-backend-dev'
app = 'icy-backend'
primary_region = 'sin'

[build]
[build.args]
GO_VERSION = '1.23.2'

[env]
PORT = '8080'

[http_service]
internal_port = 8080
Expand All @@ -22,4 +17,6 @@ primary_region = 'sin'
processes = ['app']

[[vm]]
size = 'shared-cpu-1x'
memory = '1gb'
cpu_kind = 'shared'
cpus = 1
19 changes: 9 additions & 10 deletions internal/handler/swap/swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math/big"
"net/http"
"strconv"
"time"

"github.com/gin-gonic/gin"
"github.com/go-playground/validator/v10"
Expand Down Expand Up @@ -141,10 +140,10 @@ func (h *handler) TriggerSwap(c *gin.Context) {
priceAmountBig.SetString(latestPrice.Value, 10)

// Perform division with high precision
btcAmountBig := new(big.Int).Div(icyAmountBig, priceAmountBig)
satAmountBig := new(big.Int).Div(icyAmountBig, priceAmountBig)

btcAmount := &model.Web3BigInt{
Value: btcAmountBig.String(),
satAmount := &model.Web3BigInt{
Value: satAmountBig.String(),
Decimal: consts.BTC_DECIMALS,
}

Expand All @@ -157,7 +156,7 @@ func (h *handler) TriggerSwap(c *gin.Context) {
}()

// trigger swap if ICY burn is successful
btcTxHash, err := h.controller.TriggerSwap(icyAmount, btcAmount, req.BTCAddress)
swapTxHash, err := h.controller.TriggerSwap(icyAmount, satAmount, req.BTCAddress)
if err != nil {
tx.Rollback()
h.logger.Error("[TriggerSwap][TriggerSwap]", map[string]string{
Expand All @@ -169,11 +168,11 @@ func (h *handler) TriggerSwap(c *gin.Context) {

// Record BTC transaction processing
_, err = h.btcProcessedTxStore.Create(&model.OnchainBtcProcessedTransaction{
IcyTransactionHash: req.IcyTx,
BtcTransactionHash: btcTxHash,
ProcessedAt: time.Now(),
Amount: btcAmount.Value,
Status: model.BtcProcessingStatusPending,
IcyTransactionHash: req.IcyTx,
SwapTransactionHash: swapTxHash,
BTCAddress: req.BTCAddress,
Amount: satAmount.Value,
Status: model.BtcProcessingStatusPending,
})
if err != nil {
tx.Rollback()
Expand Down
18 changes: 10 additions & 8 deletions internal/model/onchain_btc_processed_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ const (
)

type OnchainBtcProcessedTransaction struct {
ID int `json:"id"`
IcyTransactionHash string `json:"icy_transaction_hash"`
BtcTransactionHash string `json:"btc_transaction_hash"`
ProcessedAt time.Time `json:"processed_at"`
Amount string `json:"amount"`
Status BtcProcessingStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ID int `json:"id"`
IcyTransactionHash string `json:"icy_transaction_hash"`
BtcTransactionHash string `json:"btc_transaction_hash"`
SwapTransactionHash string `json:"swap_transaction_hash"`
BTCAddress string `json:"btc_address"`
ProcessedAt time.Time `json:"processed_at"`
Amount string `json:"amount"`
Status BtcProcessingStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
5 changes: 3 additions & 2 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ func Init() {
}

c.AddFunc("@every "+indexInterval, func() {
telemetry.IndexBtcTransaction()
telemetry.IndexIcyTransaction()
go telemetry.IndexBtcTransaction()
go telemetry.IndexIcyTransaction()
go telemetry.ProcessPendingBtcTransactions()
})

c.Start()
Expand Down
6 changes: 6 additions & 0 deletions internal/store/onchainbtcprocessedtransaction/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@ type IStore interface {

// Update the status of a BTC processed transaction
UpdateStatus(id int, status model.BtcProcessingStatus) error

// UpdateToCompleted updates the status of a BTC processed transaction to processed
UpdateToCompleted(id int, btcTxHash string) error

// Get all pending BTC processed transactions
GetPendingTransactions() ([]model.OnchainBtcProcessedTransaction, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,17 @@ func (s *store) UpdateStatus(id int, status model.BtcProcessingStatus) error {
"updated_at": time.Now(),
}).Error
}

func (s *store) UpdateToCompleted(id int, btcTxHash string) error {
return s.db.Model(&model.OnchainBtcProcessedTransaction{}).Where("id = ?", id).Updates(map[string]interface{}{
"status": model.BtcProcessingStatusCompleted,
"btc_transaction_hash": btcTxHash,
"updated_at": time.Now(),
}).Error
}

func (s *store) GetPendingTransactions() ([]model.OnchainBtcProcessedTransaction, error) {
var pendingTxs []model.OnchainBtcProcessedTransaction
err := s.db.Where("status = ?", model.BtcProcessingStatusPending).Find(&pendingTxs).Error
return pendingTxs, err
}
110 changes: 110 additions & 0 deletions internal/telemetry/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package telemetry

import (
"context"
"errors"
"fmt"
"slices"

"github.com/dwarvesf/icy-backend/internal/model"
"github.com/dwarvesf/icy-backend/internal/store"
"github.com/ethereum/go-ethereum/common"
"gorm.io/gorm"
)

func (t *Telemetry) IndexIcyTransaction() error {
t.logger.Info("[IndexIcyTransaction] Start indexing ICY transactions...")

var latestTx *model.OnchainIcyTransaction
latestTx, err := t.store.OnchainIcyTransaction.GetLatestTransaction(t.db)
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
t.logger.Error("[IndexIcyTransaction][GetLatestTransaction]", map[string]string{
"error": err.Error(),
})
return err
}
t.logger.Info("[IndexIcyTransaction] No previous transactions found. Starting from the beginning.")
}

// Determine the starting block
startBlock := uint64(0)
if latestTx != nil && latestTx.TransactionHash != "" {
t.logger.Info(fmt.Sprintf("[IndexIcyTransaction] Latest ICY transaction: %s", latestTx.TransactionHash))
receipt, err := t.baseRpc.Client().TransactionReceipt(context.Background(), common.HexToHash(latestTx.TransactionHash))
if err != nil {
t.logger.Error("[IndexIcyTransaction][LastTransactionReceipt]", map[string]string{
"txHash": latestTx.TransactionHash,
"error": err.Error(),
})
} else {
startBlock = receipt.BlockNumber.Uint64() + 1
}
}

fromTxId := t.appConfig.Blockchain.InitialICYTransactionHash
if latestTx != nil {
fromTxId = latestTx.TransactionHash
}

// Fetch all transactions for the ICY contract
allTxs, err := t.baseRpc.GetTransactionsByAddress(t.appConfig.Blockchain.ICYContractAddr, fromTxId)
if err != nil {
t.logger.Error("[IndexIcyTransaction][GetTransactionsByAddress]", map[string]string{
"error": err.Error(),
})
return err
}

// Filter and prepare transactions to store
var txsToStore []model.OnchainIcyTransaction
for _, tx := range allTxs {
receipt, err := t.baseRpc.Client().TransactionReceipt(context.Background(), common.HexToHash(tx.TransactionHash))
if err != nil {
t.logger.Error("[IndexIcyTransaction][TransactionReceipt]", map[string]string{
"txHash": tx.TransactionHash,
"error": err.Error(),
})
continue
}

// Only add transactions after the last known transaction
if receipt.BlockNumber.Uint64() >= startBlock {
txsToStore = append(txsToStore, tx)
}
}

// Sort transactions by block number to maintain order
slices.SortFunc(txsToStore, func(a, b model.OnchainIcyTransaction) int {
receiptA, _ := t.baseRpc.Client().TransactionReceipt(context.Background(), common.HexToHash(a.TransactionHash))
receiptB, _ := t.baseRpc.Client().TransactionReceipt(context.Background(), common.HexToHash(b.TransactionHash))
return int(receiptA.BlockNumber.Int64() - receiptB.BlockNumber.Int64())
})

// Store transactions
if len(txsToStore) > 0 {
err = store.DoInTx(t.db, func(tx *gorm.DB) error {
for _, onchainTx := range txsToStore {
_, err := t.store.OnchainIcyTransaction.Create(tx, &onchainTx)
if err != nil {
return err
}
t.logger.Info(fmt.Sprintf("Tx Hash: %s - Amount: %s [%s]", onchainTx.TransactionHash, onchainTx.Amount, onchainTx.Type))
}
return nil
})
if err != nil {
t.logger.Error("[IndexIcyTransaction][CreateTransactions]", map[string]string{
"error": err.Error(),
})
return err
}
}

t.logger.Info(fmt.Sprintf("[IndexIcyTransaction] Processed %d new transactions", len(txsToStore)))
return nil
}

func (t *Telemetry) GetIcyTransactionByHash(txHash string) (*model.OnchainIcyTransaction, error) {
return t.store.OnchainIcyTransaction.GetByTransactionHash(t.db, txHash)
}
Loading

0 comments on commit 059b734

Please sign in to comment.