Skip to content

Commit

Permalink
Fix to allow multiple blockchains to be indexed at once
Browse files Browse the repository at this point in the history
  • Loading branch information
yangseongja committed Nov 8, 2022
1 parent 109a828 commit 9688f30
Show file tree
Hide file tree
Showing 28 changed files with 220 additions and 460 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,24 @@ namespace SapientFi.Infrastructure.Cosmos.Indexers.Delegations.Storage;
/// </summary>
public interface ICosmosValidatorDelegationLedgerEntity
{
public long Id { get; set; }
public long Id { get; init; }

public DateTimeOffset At { get; set; }
public DateTimeOffset At { get; init; }

/// <summary>
/// The TxHash of the transaction that this delegation came from
/// </summary>
public string TxHash { get; set; }
public string TxHash { get; init; }

public string ValidatorAddress { get; set; }
public string ValidatorAddress { get; init; }

public string DelegatorAddress { get; set; }
public string DelegatorAddress { get; init; }

/// <summary>
/// Positive = delegation to validator
/// Negative = delegation away from validator
/// </summary>
public long Amount { get; set; }
public long Amount { get; init; }

public string Denominator { get; set; }
public string Denominator { get; init; }
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
Expand All @@ -7,6 +8,8 @@
using SapientFi.Infrastructure.Cosmos.BusMessages;
using SapientFi.Infrastructure.Cosmos.Storage;
using TerraDotnet;
using TerraDotnet.TerraLcd.Messages;
// ReSharper disable MemberCanBePrivate.Global

namespace SapientFi.Infrastructure.Cosmos.TransactionListener;

Expand All @@ -31,7 +34,7 @@ protected readonly
protected abstract string NameOfBlockChain { get; }


public CosmosTransactionListenerHostedService(
protected CosmosTransactionListenerHostedService(
ILogger<CosmosTransactionListenerHostedService<
TMarker,
TRawTransactionEntity,
Expand All @@ -54,33 +57,41 @@ public virtual async Task StartAsync(CancellationToken cancellationToken)
Logger.LogInformation("Starting {BlockChain} transaction listener", NameOfBlockChain);
TokenSource = new CancellationTokenSource();

var latestSeenBlock = await RawRepository.GetLatestSeenBlockHeightAsync(cancellationToken);
var enumeration = TransactionEnumerator.EnumerateTransactionsAsync(latestSeenBlock, cancellationToken);
int latestSeenBlock = await RawRepository.GetLatestSeenBlockHeightAsync(cancellationToken);
IAsyncEnumerable<LcdTxResponse> enumeration =
TransactionEnumerator.EnumerateTransactionsAsync(latestSeenBlock, cancellationToken);

await foreach (var lcdTransaction in enumeration.WithCancellation(cancellationToken))
{
Logger.LogDebug("Got new raw transaction {TxHash} with height={TxHeight}",
lcdTransaction.TransactionHash,
lcdTransaction.HeightAsInt
);
TRawTransactionEntity entity = Factory.NewRawEntity(lcdTransaction);

try
var thread = new Thread(async () =>
{
await RawRepository.SaveRawTransactionAsync(entity, cancellationToken);
await foreach (LcdTxResponse lcdTransaction in enumeration.WithCancellation(cancellationToken))
{
Logger.LogDebug("Got new raw transaction {TxHash} with height={TxHeight}",
lcdTransaction.TransactionHash,
lcdTransaction.HeightAsInt
);
TRawTransactionEntity entity = Factory.NewRawEntity(lcdTransaction);

await MassTransitBus.Publish(
new TRawTransactionAvailableAnnouncement
try
{
TransactionHash = entity.TxHash,
RawEntityId = entity.Id
}, cancellationToken);
}
catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.UniqueViolation)
{
// we already have that transaction, so do nothing else
await RawRepository.SaveRawTransactionAsync(entity, cancellationToken);

await MassTransitBus.Publish(
new TRawTransactionAvailableAnnouncement
{
TransactionHash = entity.TxHash,
RawEntityId = entity.Id
}, cancellationToken);
}
catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.UniqueViolation)
{
// we already have that transaction, so do nothing else
}
}
}
}
);

Logger.LogInformation("Starting worker thread for {BlockChain}'s transaction listener", NameOfBlockChain);
thread.Start();
}

public virtual Task StopAsync(CancellationToken cancellationToken)
Expand Down
7 changes: 0 additions & 7 deletions src/Infrastructure/CosmosChain.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Logging;
using SapientFi.Infrastructure.Cosmos.Indexers.Delegations;
using SapientFi.Infrastructure.Indexing;
Expand All @@ -12,9 +10,9 @@ namespace SapientFi.Infrastructure.Kujira.Indexers.Delegations;

public class KujiraDelegationIndexer
: CosmosDelegationIndexer<
RawKujiraTransactionAvailableAnnouncement,
KujiraValidatorDelegationLedgerEntity,
KujiraRawTransactionEntity>,
RawKujiraTransactionAvailableAnnouncement,
KujiraValidatorDelegationLedgerEntity,
KujiraRawTransactionEntity>,
IIndexer<RawKujiraTransactionAvailableAnnouncement>
{
public KujiraDelegationIndexer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@ namespace SapientFi.Infrastructure.Kujira.Indexers.Delegations.Storage;
/// </summary>
public record KujiraValidatorDelegationLedgerEntity : ICosmosValidatorDelegationLedgerEntity
{
public long Id { get; set; }
public long Id { get; init; }

public DateTimeOffset At { get; set; }
public DateTimeOffset At { get; init; }

/// <summary>
/// The TxHash of the transaction that this delegation
/// came from
/// </summary>
[Index]
public string TxHash { get; set; } = string.Empty;
public string TxHash { get; init; } = string.Empty;

[Index]
public string ValidatorAddress { get; set; } = string.Empty;
public string ValidatorAddress { get; init; } = string.Empty;

[Index]
public string DelegatorAddress { get; set; } = string.Empty;
public string DelegatorAddress { get; init; } = string.Empty;

/// <summary>
/// Positive = delegation to validator
/// Negative = delegation away from validator
/// </summary>
public long Amount { get; set; }
public long Amount { get; init; }

public string Denominator { get; set; } = string.Empty;
public string Denominator { get; init; } = string.Empty;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public static IServiceCollection AddKujiraStack(this IServiceCollection services
services.AddKujiraDelegationsIndexer();
services.AddTransient<KujiraTransactionListenerHostedService>();
services.AddTransient<CosmosFactory<KujiraRawTransactionEntity>>();
services.AddTransient<KujiraTransactionEnumerator>();
services.AddCosmosDotnet<KujiraMarker>("https://lcd.kaiyo.kujira.setten.io");

if (config.DoEnable())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Microsoft.Extensions.Logging;
using TerraDotnet;
using TerraDotnet.TerraLcd;

namespace SapientFi.Infrastructure.Kujira.TransactionListener;

public class KujiraTransactionEnumerator : CosmosTransactionEnumerator<KujiraMarker>
{
public KujiraTransactionEnumerator(
ILogger<CosmosTransactionEnumerator<KujiraMarker>> logger,
ICosmosLcdApiClient<KujiraMarker> cosmosClient
) : base(logger,
cosmosClient,
new()
{
SecondsPerBlock = 6,
WindowBlockWidth = 2000,
PaginationLimit = 200
}
)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class KujiraTransactionListenerHostedService
{
public KujiraTransactionListenerHostedService(
ILogger<KujiraTransactionListenerHostedService> logger,
CosmosTransactionEnumerator<KujiraMarker> transactionEnumerator,
KujiraTransactionEnumerator transactionEnumerator,
KujiraRawRepository rawRepository,
CosmosFactory<KujiraRawTransactionEntity> factory,
IBus massTransitBus
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using SapientFi.Infrastructure.Cosmos.BusMessages;

namespace SapientFi.Infrastructure.Terra2.BusMessages;

public class RawTerra2TransactionAvailableAnnouncement
public class RawTerra2TransactionAvailableAnnouncement : IRawCosmosTransactionAvailableAnnouncement
{
public string TransactionHash { get; set; } = string.Empty;

public long RawEntityId { get; set; }
public string TransactionHash { get; init; } = string.Empty;
public long RawEntityId { get; init; }
}
Original file line number Diff line number Diff line change
@@ -1,66 +1,11 @@
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Threading;
using System.Threading.Tasks;
using SapientFi.Infrastructure.Cosmos.Indexers.Delegations.Storage;
using ServiceStack.Data;
using ServiceStack.OrmLite;

namespace SapientFi.Infrastructure.Terra2.Indexers.Delegations.Storage;

public class Terra2DelegationsRepository
public class Terra2DelegationsRepository : CosmosDelegationsRepository<Terra2ValidatorDelegationLedgerEntity>
{
private readonly IDbConnectionFactory _dbFactory;

public Terra2DelegationsRepository(IDbConnectionFactory dbFactory)
{
_dbFactory = dbFactory;
}

public virtual async Task SaveAsync(Terra2ValidatorDelegationLedgerEntity entity, CancellationToken cancellationToken = default)
{
using var db = await _dbFactory.OpenDbConnectionAsync(cancellationToken);
await db.SaveAsync(entity, token: cancellationToken);
}

public virtual async Task SaveAllAsync(IEnumerable<Terra2ValidatorDelegationLedgerEntity> entities, CancellationToken cancellationToken = default)
{
using var db = await _dbFactory.OpenDbConnectionAsync(cancellationToken);
await db.SaveAllAsync(entities, token: cancellationToken);
}

public virtual async Task<T1> SingleAsync<T1, T2>(SqlExpression<T2> sql, CancellationToken ct = default)
{
using var db = await _dbFactory.OpenDbConnectionAsync(ct);
return await db.SingleAsync<T1>(sql, ct);
}

public virtual async Task<IEnumerable<T1>> SelectAsync<T1, T2>(SqlExpression<T2> sql, CancellationToken ct = default)
{
using var db = await _dbFactory.OpenDbConnectionAsync(ct);
return await db.SelectAsync<T1>(sql, ct);
}

public virtual async Task<T1> SingleByIdAsync<T1, T2>(T2 id, CancellationToken ct = default)
{
using var db = await _dbFactory.OpenDbConnectionAsync(ct);
return await db.SingleByIdAsync<T1>(id, ct);
}

public virtual async Task<List<T>> SelectByIds<T>(IEnumerable ids, CancellationToken ct = default)
{
using var db = await _dbFactory.OpenDbConnectionAsync(ct);
return await db.SelectByIdsAsync<T>(ids, ct);
}

public virtual async Task<T1> ScalarAsync<T1, T2>(SqlExpression<T2> sql, CancellationToken ct = default)
{
using var db = await _dbFactory.OpenDbConnectionAsync(ct);
return await db.ScalarAsync<T1>(sql, ct);
}

public virtual async Task<IDbConnection> GetDbConnectionAsync(CancellationToken cancellationToken = new())
public Terra2DelegationsRepository(IDbConnectionFactory dbFactory) : base(dbFactory)
{
return await _dbFactory.OpenDbConnectionAsync(cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using SapientFi.Infrastructure.Cosmos.Indexers.Delegations.Storage;
using ServiceStack.DataAnnotations;

namespace SapientFi.Infrastructure.Terra2.Indexers.Delegations.Storage;
Expand All @@ -13,30 +14,30 @@ namespace SapientFi.Infrastructure.Terra2.Indexers.Delegations.Storage;
/// inverted signs), as the tokens are leaving A
/// and going into B.
/// </summary>
public record Terra2ValidatorDelegationLedgerEntity
public record Terra2ValidatorDelegationLedgerEntity : ICosmosValidatorDelegationLedgerEntity
{
public long Id { get; set; }
public long Id { get; init; }

public DateTimeOffset At { get; set; }
public DateTimeOffset At { get; init; }

/// <summary>
/// The TxHash of the transaction that this delegation
/// came from
/// </summary>
[Index]
public string TxHash { get; set; } = string.Empty;
public string TxHash { get; init; } = string.Empty;

[Index]
public string ValidatorAddress { get; set; } = string.Empty;
public string ValidatorAddress { get; init; } = string.Empty;

[Index]
public string DelegatorAddress { get; set; } = string.Empty;
public string DelegatorAddress { get; init; } = string.Empty;

/// <summary>
/// Positive = delegation to validator
/// Negative = delegation away from validator
/// </summary>
public long Amount { get; set; }
public long Amount { get; init; }

public string Denominator { get; set; } = string.Empty;
public string Denominator { get; init; } = string.Empty;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using Microsoft.Extensions.Logging;
using SapientFi.Infrastructure.Cosmos.Indexers.Delegations;
using SapientFi.Infrastructure.Indexing;
using SapientFi.Infrastructure.Terra2.BusMessages;
using SapientFi.Infrastructure.Terra2.Indexers.Delegations.Storage;
using SapientFi.Infrastructure.Terra2.Storage;
using SapientFi.Kernel.IdGeneration;

namespace SapientFi.Infrastructure.Terra2.Indexers.Delegations;

public class Terra2DelegationIndexer
: CosmosDelegationIndexer<
RawTerra2TransactionAvailableAnnouncement,
Terra2ValidatorDelegationLedgerEntity,
Terra2RawTransactionEntity>,
IIndexer<RawTerra2TransactionAvailableAnnouncement>
{
public Terra2DelegationIndexer(
ILogger<Terra2DelegationIndexer> logger,
Terra2DelegationsRepository repository,
Terra2RawRepository rawRepository,
IdProvider idProvider
) : base(logger, repository, rawRepository, idProvider)
{
}
public override string NameOfBlockChain => "Terra2";
public override string Id => "terra2_delegations";
public override string DisplayName => "Terra2 Delegations";
}
Loading

0 comments on commit 9688f30

Please sign in to comment.