Skip to content

Commit

Permalink
Merge pull request #6 from dataswap/craw-message-task
Browse files Browse the repository at this point in the history
feat: 🎸 sync chain info
  • Loading branch information
waynewyang authored Dec 27, 2023
2 parents 0868032 + bee54d8 commit a2a339a
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 79 deletions.
24 changes: 7 additions & 17 deletions src/chainsync/chainsync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
********************************************************************************/

import { Injectable, OnModuleInit } from '@nestjs/common';
import { chainNetwork, chainNetworkCalibration } from '../config/network';

/**
* Service responsible for synchronizing with the blockchain.
Expand All @@ -36,23 +37,12 @@ export class ChainsyncService implements OnModuleInit {
* Start the background task for continuous synchronization.
*/
private async startBackgroundTask() {
while (true) {
try {
console.log(
'Always crawl the latest tipset, blockMessages, and messages.',
);
await this.delay(1000);
} catch (error) {
console.error('Error in the background task:', error);
}
try {
const mainnetSync = chainNetwork.startSyncBackgroundTask();
const calibrationSync = chainNetworkCalibration.startSyncBackgroundTask();
await Promise.all([mainnetSync, calibrationSync]);
} catch (error) {
console.error('Error in the background task:', error);
}
}

/**
* Utility function to introduce a delay.
* @param ms - The delay time in milliseconds.
*/
private async delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
67 changes: 5 additions & 62 deletions src/config/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,81 +18,24 @@
* limitations under the respective licenses.
********************************************************************************/

import {
TipsetMongoDatastore,
BlockMongoDatastore,
MessageMongoDatastore,
ChainService,
ChainFilecoinRPC,
AddressesFilterReplayStrategy,
} from '@unipackage/filecoin';

/**
* Configuration for a Filecoin network.
*/
export interface NetworkConfig {
apiAddress: string;
token: string;
mongoUrl: string;
dataswapStartHeight: number;
contractAddress?: {
dataset: string;
};
}

/**
* Represents a connection to a Filecoin network.
*/
export class ChainNetwork {
rpc: ChainFilecoinRPC;
messageDatastore: MessageMongoDatastore;
blockDatastore: BlockMongoDatastore;
tipsetDatastore: TipsetMongoDatastore;
dataswapStartHeight: number;
chainService: ChainService;

/**
* Creates an instance of ChainNetwork.
* @param config - The network configuration.
*/
constructor(config: NetworkConfig) {
this.dataswapStartHeight = config.dataswapStartHeight;
this.rpc = new ChainFilecoinRPC({
apiAddress: config.apiAddress,
token: config.token,
});
this.messageDatastore = new MessageMongoDatastore(config.mongoUrl);
this.blockDatastore = new BlockMongoDatastore(config.mongoUrl);
this.tipsetDatastore = new TipsetMongoDatastore(config.mongoUrl);
this.chainService = new ChainService({
rpc: this.rpc,
messageDs: this.messageDatastore,
blockMessagesDs: this.blockDatastore,
tipsetDs: this.tipsetDatastore,
replayStrategyOptions: {
replay: false,
replayStrategy: new AddressesFilterReplayStrategy([]),
},
});
}
}
import { Chain } from '../lib/chain';

/**
* Calibration network configuration
*/
export const chainNetworkCalibration = new ChainNetwork({
export const chainNetworkCalibration = new Chain({
apiAddress: process.env.CALIBRATION_LOTUS_API_ENDPOINT as string,
token: process.env.CALIBRATION_LOTUS_TOKEN as string,
mongoUrl: process.env.CALIBRATION_MONGO_URL as string,
dataswapStartHeight: 1,
dataswapStartHeight: 1210900,
});

/**
* Main network configuration
*/
export const chainNetwork = new ChainNetwork({
export const chainNetwork = new Chain({
apiAddress: process.env.MAIN_LOTUS_API_ENDPOINT as string,
token: process.env.MAIN_LOTUS_TOKEN as string,
mongoUrl: process.env.MAIN_MONGO_URL as string,
dataswapStartHeight: 1,
dataswapStartHeight: 3511591,
});
140 changes: 140 additions & 0 deletions src/lib/chain/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*******************************************************************************
* (c) 2023 dataswap
*
* Licensed under either the MIT License (the "MIT License") or the Apache License, Version 2.0
* (the "Apache License"). You may not use this file except in compliance with one of these
* licenses. You may obtain a copy of the MIT License at
*
* https://opensource.org/licenses/MIT
*
* Or the Apache License, Version 2.0 at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the MIT License or the Apache License for the specific language governing permissions and
* limitations under the respective licenses.
********************************************************************************/

import {
TipsetMongoDatastore,
BlockMongoDatastore,
MessageMongoDatastore,
ChainService,
ChainFilecoinRPC,
AddressesFilterReplayStrategy,
} from '@unipackage/filecoin';

/**
* Configuration for a Filecoin network.
*/
export interface NetworkConfig {
apiAddress: string;
token: string;
mongoUrl: string;
dataswapStartHeight: number;
contractAddress?: {
dataset: string;
};
}

/**
* Represents a connection to a Filecoin network.
*/
export class Chain {
rpc: ChainFilecoinRPC;
messageDatastore: MessageMongoDatastore;
blockDatastore: BlockMongoDatastore;
tipsetDatastore: TipsetMongoDatastore;
dataswapStartHeight: number;
chainService: ChainService;

/**
* Creates an instance of ChainNetwork.
* @param config - The network configuration.
*/
constructor(config: NetworkConfig) {
this.dataswapStartHeight = config.dataswapStartHeight;
this.rpc = new ChainFilecoinRPC({
apiAddress: config.apiAddress,
token: config.token,
});
this.messageDatastore = new MessageMongoDatastore(config.mongoUrl);
this.blockDatastore = new BlockMongoDatastore(config.mongoUrl);
this.tipsetDatastore = new TipsetMongoDatastore(config.mongoUrl);
this.chainService = new ChainService({
rpc: this.rpc,
messageDs: this.messageDatastore,
blockMessagesDs: this.blockDatastore,
tipsetDs: this.tipsetDatastore,
replayStrategyOptions: {
replay: false,
replayStrategy: new AddressesFilterReplayStrategy([]),
},
});
}

/**
* Utility function to introduce a delay.
* @param ms - The delay time in milliseconds.
*/
private async delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
* Retrieves the current height of the Filecoin chain from the network.
* @returns {Promise<number>} The height of the chain.
* @throws {Error} If there is an error fetching the chain head height.
*/
async getChainHeadHeight(): Promise<number> {
const res = await this.rpc.ChainHead();
if (!res.ok) {
throw new Error(`getChainHeadHeight error:${res.error}`);
}
return res.data.Height;
}

/**
* Checks if a given height has already been synchronized.
* @param {number} height - The height to check for synchronization.
* @returns {Promise<boolean>} True if the height is already synchronized, false otherwise.
* @throws {Error} If there is an error checking the synchronization status.
*/
async isHeightAlreadySynced(height: number): Promise<boolean> {
const syncedTipsetsRes = await this.tipsetDatastore.find({
conditions: [{ Height: height }],
});
if (!syncedTipsetsRes.ok) {
throw new Error(`isHeightAlreadySynced error:${syncedTipsetsRes.error}`);
}
if (syncedTipsetsRes.data.length === 0) {
return false;
} else {
return true;
}
}

/**
* Initiates a background task to synchronize the chain starting from the configured height.
* This task runs indefinitely, periodically fetching and saving chain information.
* @throws {Error} If there is an error during the synchronization process.
*/
async startSyncBackgroundTask() {
let syncHeight = this.dataswapStartHeight;

while (true) {
const chainHeadHeight = await this.getChainHeadHeight();
const isSynced = await this.isHeightAlreadySynced(syncHeight);
if (!isSynced) {
await this.chainService.GetAndSaveChainInfoByHeight(syncHeight);
} else if (syncHeight < chainHeadHeight) {
syncHeight++;
} else {
await this.delay(3000);
}
}
}
}

0 comments on commit a2a339a

Please sign in to comment.