Skip to content

Commit

Permalink
Merge pull request #31 from shruggr/mnee
Browse files Browse the repository at this point in the history
Mnee
  • Loading branch information
shruggr authored Mar 1, 2025
2 parents 6c973d1 + e137330 commit 0706c0b
Show file tree
Hide file tree
Showing 42 changed files with 874 additions and 990 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ app.yaml
.gcloudignore

*.run
config/config.go
config/config.go
redis.sh
settings.json
4 changes: 1 addition & 3 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [


{
"name": "Launch file",
"type": "go",
Expand All @@ -18,7 +16,7 @@
// "-t=b1d6decfecfebadec22c9e98109aad3e58f7e4ac799024d1e5bf3afe3b95d0f2",
// "-s=783968",
// "-m=1"
// "-p=8083"
"-p=8088"
// "-tag=locks",
// "-t=9c554bf80570ba371a558127612960e885de846a607329277452e668a4be6c76",
// "-s=801000",
Expand Down
98 changes: 51 additions & 47 deletions audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,12 @@ var arc *broadcaster.Arc
func StartTxAudit(ctx context.Context, ingestCtx *idx.IngestCtx, bcast *broadcaster.Arc, rollback bool) {
ingest = ingestCtx
arc = bcast
if tip, chaintips, err := blk.StartChaintipSub(ctx); err != nil {
log.Panic(err)
} else {
chaintip := tip
blk.StartChaintipSub(ctx)

for chaintip := range blk.C {
log.Println("Chaintip", chaintip.Height, chaintip.Hash)
immutableScore = idx.HeightScore(chaintip.Height-10, 0)
AuditTransactions(ctx, rollback)
for chaintip = range chaintips {
log.Println("Chaintip", chaintip.Height, chaintip.Hash)
immutableScore = idx.HeightScore(chaintip.Height-10, 0)
AuditTransactions(ctx, rollback)
}
}
}

Expand All @@ -51,7 +46,7 @@ func AuditTransactions(ctx context.Context, rollback bool) {
limiter := make(chan struct{}, 20)
var wg sync.WaitGroup
if items, err := ingest.Store.Search(ctx, cfg); err != nil {
log.Panic(err)
log.Println(err)
} else {
log.Println("Audit pending txs", len(items))
for _, item := range items {
Expand All @@ -63,7 +58,7 @@ func AuditTransactions(ctx context.Context, rollback bool) {
wg.Done()
}()
if err := AuditTransaction(ctx, item.Member, item.Score, rollback); err != nil {
log.Panic(err)
log.Println("AuditTransaction:", err)
}
}(item)
}
Expand All @@ -74,7 +69,7 @@ func AuditTransactions(ctx context.Context, rollback bool) {
cfg.From = &from
cfg.To = &immutableScore
if items, err := ingest.Store.Search(ctx, cfg); err != nil {
log.Panic(err)
log.Println("Search:", err)
} else {
log.Println("Audit mined txs", len(items))
for _, item := range items {
Expand All @@ -86,7 +81,7 @@ func AuditTransactions(ctx context.Context, rollback bool) {
wg.Done()
}()
if err := AuditTransaction(ctx, item.Member, item.Score, rollback); err != nil {
log.Panic(err)
log.Println("AuditTransaction:", err)
}
}(item)
}
Expand All @@ -98,7 +93,7 @@ func AuditTransactions(ctx context.Context, rollback bool) {
to = float64(until.UnixNano())
cfg.To = &to
if items, err := ingest.Store.Search(ctx, cfg); err != nil {
log.Panic(err)
log.Println("Search:", err)
} else {
log.Println("Audit mempool txs", len(items))
for _, item := range items {
Expand All @@ -110,7 +105,7 @@ func AuditTransactions(ctx context.Context, rollback bool) {
wg.Done()
}()
if err := AuditTransaction(ctx, item.Member, item.Score, rollback); err != nil {
log.Panic(err)
log.Println("AuditTransaction:", err)
}
}(item)
}
Expand All @@ -120,52 +115,57 @@ func AuditTransactions(ctx context.Context, rollback bool) {

func AuditTransaction(ctx context.Context, hexid string, score float64, rollback bool) error {
// log.Println("Auditing", hexid)
tx, err := jb.LoadTx(ctx, hexid, true)
if err == jb.ErrMissingTxn {
log.Println("Missing tx", hexid)
return nil
tx, err := jb.LoadTx(ctx, hexid, false)
if err == jb.ErrNotFound {
log.Println("Archive Missing", hexid)
if err = ingest.Store.Rollback(ctx, hexid); err != nil {
log.Println("Rollback error", hexid, err)
return err
} else if err := ingest.Store.Log(ctx, idx.RollbackTxLog, hexid, score); err != nil {
log.Println("Delog error", hexid, err)
return err
}
} else if err != nil {
log.Panicln("LoadTx error", hexid, err)
} else if tx == nil {
// TODO: Handle missing tx
// Something bad has heppened if we get here
log.Println("Missing tx", hexid)
return nil
}
if tx.MerklePath == nil {
log.Println("LoadTx error", hexid, err)
return err
} else if tx.MerklePath == nil {
log.Println("Fetching status for", hexid)
if status, err := arc.Status(hexid); err != nil {
return err
} else if status.Status == 404 {
log.Println("Status not found", hexid)
// TODO: Handle missing tx
} else if status.Status == 200 && status.MerklePath != "" {
log.Println("MerklePath found", hexid)
if tx.MerklePath, err = jb.LoadProof(ctx, hexid); err != nil {
log.Println("LoadProof error", hexid, err)
}
} else if status.Status == 200 || status.MerklePath != "" {
// log.Println("MerklePath found", hexid)
if tx.MerklePath, err = transaction.NewMerklePathFromHex(status.MerklePath); err != nil {
log.Println("NewMerklePathFromHex error", hexid, err)
}
} else {
log.Println("No proof", hexid)
// TODO: Handle no proof

}
}
if rollback && score < 0 || (score > mempoolScore && score < float64(time.Now().Add(-2*time.Hour).UnixNano())) {
log.Println("Rollback", hexid)
if err = ingest.Rollback(ctx, hexid); err != nil {
log.Panicln("Rollback error", hexid, err)
}
}

if tx.MerklePath == nil {
if rollback && (score < 0 || (score > mempoolScore && score < float64(time.Now().Add(-2*time.Hour).UnixNano()))) {
log.Println("Rollback", hexid)
if err = ingest.Store.Rollback(ctx, hexid); err != nil {
log.Println("Rollback error", hexid, err)
return err
} else if err := ingest.Store.Log(ctx, idx.RollbackTxLog, hexid, score); err != nil {
log.Println("Delog error", hexid, err)
return err
}
}
return nil
}

txid := tx.TxID()
var newScore float64
if root, err := tx.MerklePath.ComputeRoot(txid); err != nil {
log.Panicln("ComputeRoot error", hexid, err)
log.Println("ComputeRoot error", hexid, err)
return err
} else if valid, err := headers.IsValidRootForHeight(root, tx.MerklePath.BlockHeight); err != nil {
log.Panicln("IsValidRootForHeight error", hexid, err)
log.Println("IsValidRootForHeight error", hexid, err)
return err
} else if !valid {
// TODO: Reload proof and revalidate
log.Println("Invalid proof for", hexid)
Expand All @@ -181,7 +181,8 @@ func AuditTransaction(ctx context.Context, hexid string, score float64, rollback

if newScore == 0 {
// This should never happen
log.Panicln("Transaction not in proof", hexid)
log.Println("Transaction not in proof", hexid)
return nil
}

if newScore != score {
Expand All @@ -190,16 +191,19 @@ func AuditTransaction(ctx context.Context, hexid string, score float64, rollback
Load: true,
Parse: true,
}); err != nil {
log.Panicln("IngestTx error", hexid, err)
log.Println("IngestTx error", hexid, err)
return err
}
}

if newScore < immutableScore {
log.Println("Archive Immutable", hexid, newScore)
if err := ingest.Store.Log(ctx, idx.ImmutableTxLog, hexid, newScore); err != nil {
log.Panicln("Log error", hexid, err)
log.Println("Log error", hexid, err)
return err
} else if err := ingest.Store.Delog(ctx, idx.PendingTxLog, hexid); err != nil {
log.Panicln("Delog error", hexid, err)
log.Println("Delog error", hexid, err)
return err
}
}
return nil
Expand Down
55 changes: 18 additions & 37 deletions blk/block-header.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,28 @@
package blk

import (
"encoding/json"
"fmt"
"log"
"net/http"

"github.com/bitcoin-sv/go-sdk/chainhash"
)

// BlockHeader defines a single block header, used in SPV validations.
type BlockHeader struct {
Hash *chainhash.Hash `json:"hash"`
Coin uint32 `json:"coin"`
Height uint32 `json:"height"`
Time uint32 `json:"time"`
Nonce uint32 `json:"nonce"`
Version uint32 `json:"version"`
MerkleRoot *chainhash.Hash `json:"merkleroot"`
Bits string `json:"bits"`
Synced uint64 `json:"synced"`
}

func (b BlockHeader) MarshalBinary() (data []byte, err error) {
return json.Marshal(b)
}

func (b *BlockHeader) UnmarshalBinary(data []byte) (err error) {
return json.Unmarshal(data, b)
Height uint32 `json:"height"`
Hash chainhash.Hash `json:"hash"`
Version uint32 `json:"version"`
MerkleRoot chainhash.Hash `json:"merkleRoot"`
Timestamp uint32 `json:"creationTimestamp"`
Bits uint32 `json:"-"`
Nonce uint32 `json:"nonce"`
// ChainWork *big.Int `json:"chainWork"`
// CumulatedWork *big.Int `json:"work"`
PreviousBlock chainhash.Hash `json:"prevBlockHash"`
}

func FetchBlockHeaders(fromBlock uint64, pageSize uint) (blocks []*BlockHeader, err error) {
url := fmt.Sprintf("%s/v1/block_header/list/%d?limit=%d", JUNGLEBUS, fromBlock, pageSize)
log.Printf("Requesting %d blocks from height %d\n", pageSize, fromBlock)
if resp, err := http.Get(url); err != nil {
log.Panicln("Failed to get blocks from junglebus", err)
} else if resp.StatusCode != http.StatusOK {
log.Panicln("Failed to get blocks from junglebus", resp.StatusCode)
} else {
err := json.NewDecoder(resp.Body).Decode(&blocks)
resp.Body.Close()
if err != nil {
log.Panic(err)
}
}
return
// BlockHeaderState is an extended version of the BlockHeader
// that has more important informations. Mostly used in http server endpoints.
type BlockHeaderState struct {
Header BlockHeader `json:"header"`
State string `json:"state"`
// ChainWork *big.Int `json:"chainWork" swaggertype:"string"`
Height uint32 `json:"height"`
}
Loading

0 comments on commit 0706c0b

Please sign in to comment.