diff --git a/package.json b/package.json index 4b35375..5721799 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "pyxis-safe-sync", - "version": "0.5.2", + "version": "0.6.0", "description": "", "author": { "name": "Aura Network", @@ -99,4 +99,4 @@ "coverageDirectory": "../coverage", "testEnvironment": "node" } -} +} \ No newline at end of file diff --git a/src/app.module.ts b/src/app.module.ts index cce3c63..8c0e842 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -1,11 +1,10 @@ /* eslint-disable prettier/prettier */ import { Module } from '@nestjs/common'; import { - ENTITIES_CONFIG, - REPOSITORY_INTERFACE, - SERVICE_INTERFACE, + ENTITIES_CONFIG, + REPOSITORY_INTERFACE, + SERVICE_INTERFACE, } from './module.config'; -import { SyncWebsocketService } from './services/impls/sync-websocket.service'; import { SharedModule } from './shared/shared.module'; import { TypeOrmModule } from '@nestjs/typeorm'; import { ConfigService } from './shared/services/config.service'; @@ -15,7 +14,6 @@ import { ChainRepository } from './repositories/impls/chain.repository'; import { ScheduleModule } from '@nestjs/schedule'; import { SyncRestService } from './services/impls/sync-rest.service'; import { HttpModule } from '@nestjs/axios'; -import { AppController } from './controllers/websocket.controller'; import { MultisigTransactionRepository } from './repositories/impls/multisig-transaction.repository'; import { MessageRepository } from './repositories/impls/message.repository'; import { BullModule } from '@nestjs/bull'; @@ -25,90 +23,83 @@ import { RedisService } from './shared/services/redis.service'; import { CommonService } from './shared/services/common.service'; import { TransactionHistoryRepository } from './repositories/impls/tx-history.repository'; const entities = [ - ENTITIES_CONFIG.AURA_TX, - ENTITIES_CONFIG.SAFE, - ENTITIES_CONFIG.CHAIN, - ENTITIES_CONFIG.MULTISIG_TRANSACTION, - ENTITIES_CONFIG.MESSAGE, - ENTITIES_CONFIG.TX_HISTORY, + ENTITIES_CONFIG.AURA_TX, + ENTITIES_CONFIG.SAFE, + ENTITIES_CONFIG.CHAIN, + ENTITIES_CONFIG.MULTISIG_TRANSACTION, + ENTITIES_CONFIG.MESSAGE, + ENTITIES_CONFIG.TX_HISTORY, ]; -const controllers = [AppController]; +const controllers = []; const processors = [SyncRestProcessor]; @Module({ - imports: [ - ConfigModule.forRoot(), - SharedModule, - TypeOrmModule.forRootAsync({ - imports: [SharedModule, AppModule], - useFactory: (configService: ConfigService) => - configService.typeOrmConfig, - inject: [ConfigService], - }), - TypeOrmModule.forFeature([...entities]), - ScheduleModule.forRoot(), - HttpModule, - BullModule.forRoot({ - redis: { - host: process.env.REDIS_HOST, - port: Number(process.env.REDIS_PORT), - username: process.env.REDIS_USERNAME, - db: parseInt(process.env.REDIS_DB, 10), - }, - prefix: `pyxis-safe-sync-${ - JSON.parse(process.env.CHAIN_SUBCRIBE)[0] - }`, - defaultJobOptions: { - removeOnComplete: true, - attempts: 3, - }, - }), - BullModule.registerQueue({ - name: 'sync-rest', - }), - RedisService, - ], - exports: [BullModule, ...processors], - controllers: [...controllers], - providers: [ - // services - { - provide: SERVICE_INTERFACE.ISYNC_WEBSOCKET_SERVICE, - useClass: SyncWebsocketService, - }, - { - provide: SERVICE_INTERFACE.ISYNC_REST_SERVICE, - useClass: SyncRestService, - }, - RedisService, - CommonService, - // repositories - { - provide: REPOSITORY_INTERFACE.IAURA_TX_REPOSITORY, - useClass: AuraTxRepository, - }, - { - provide: REPOSITORY_INTERFACE.ISAFE_REPOSITORY, - useClass: SafeRepository, - }, - { - provide: REPOSITORY_INTERFACE.ICHAIN_REPOSITORY, - useClass: ChainRepository, - }, - { - provide: REPOSITORY_INTERFACE.IMULTISIG_TRANSACTION_REPOSITORY, - useClass: MultisigTransactionRepository, - }, - { - provide: REPOSITORY_INTERFACE.IMESSAGE_REPOSITORY, - useClass: MessageRepository, - }, - { - provide: REPOSITORY_INTERFACE.ITX_HISTORY_REPOSITORY, - useClass: TransactionHistoryRepository, - }, - // processors - ...processors, - ], + imports: [ + ConfigModule.forRoot(), + SharedModule, + TypeOrmModule.forRootAsync({ + imports: [SharedModule, AppModule], + useFactory: (configService: ConfigService) => configService.typeOrmConfig, + inject: [ConfigService], + }), + TypeOrmModule.forFeature([...entities]), + ScheduleModule.forRoot(), + HttpModule, + BullModule.forRoot({ + redis: { + host: process.env.REDIS_HOST, + port: Number(process.env.REDIS_PORT), + username: process.env.REDIS_USERNAME, + db: parseInt(process.env.REDIS_DB, 10), + }, + prefix: `pyxis-safe-sync-${JSON.parse(process.env.CHAIN_SUBCRIBE)[0]}`, + defaultJobOptions: { + removeOnComplete: true, + attempts: 3, + }, + }), + BullModule.registerQueue({ + name: 'sync-rest', + }), + RedisService, + ], + exports: [BullModule, ...processors], + controllers: [...controllers], + providers: [ + // services + { + provide: SERVICE_INTERFACE.ISYNC_REST_SERVICE, + useClass: SyncRestService, + }, + RedisService, + CommonService, + // repositories + { + provide: REPOSITORY_INTERFACE.IAURA_TX_REPOSITORY, + useClass: AuraTxRepository, + }, + { + provide: REPOSITORY_INTERFACE.ISAFE_REPOSITORY, + useClass: SafeRepository, + }, + { + provide: REPOSITORY_INTERFACE.ICHAIN_REPOSITORY, + useClass: ChainRepository, + }, + { + provide: REPOSITORY_INTERFACE.IMULTISIG_TRANSACTION_REPOSITORY, + useClass: MultisigTransactionRepository, + }, + { + provide: REPOSITORY_INTERFACE.IMESSAGE_REPOSITORY, + useClass: MessageRepository, + }, + { + provide: REPOSITORY_INTERFACE.ITX_HISTORY_REPOSITORY, + useClass: TransactionHistoryRepository, + }, + // processors + ...processors, + ], }) export class AppModule {} diff --git a/src/controllers/websocket.controller.ts b/src/controllers/websocket.controller.ts deleted file mode 100644 index 038c479..0000000 --- a/src/controllers/websocket.controller.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { Controller, Inject } from '@nestjs/common'; -import { SERVICE_INTERFACE } from '../module.config'; -import { ISyncWebsocketService } from '../services/isync-websocket.service'; -import { ApiTags } from '@nestjs/swagger'; - -@Controller('websocket') -@ApiTags('websocket') -export class AppController { - constructor( - @Inject(SERVICE_INTERFACE.ISYNC_WEBSOCKET_SERVICE) - private readonly syncWebsocketService: ISyncWebsocketService, - ) {} -} diff --git a/src/module.config.ts b/src/module.config.ts index 3891449..7d746fe 100644 --- a/src/module.config.ts +++ b/src/module.config.ts @@ -15,7 +15,6 @@ export const ENTITIES_CONFIG = { }; export const SERVICE_INTERFACE = { - ISYNC_WEBSOCKET_SERVICE: 'ISyncWebsocketService', ISYNC_REST_SERVICE: 'ISyncRestService', }; diff --git a/src/processors/sync-rest.processor.ts b/src/processors/sync-rest.processor.ts index 8700ba1..c6175a9 100644 --- a/src/processors/sync-rest.processor.ts +++ b/src/processors/sync-rest.processor.ts @@ -1,14 +1,13 @@ /* eslint-disable prettier/prettier */ import { - OnQueueActive, - OnQueueCompleted, - OnQueueError, - OnQueueFailed, - Process, - Processor, + OnQueueActive, + OnQueueCompleted, + OnQueueError, + OnQueueFailed, + Process, + Processor, } from '@nestjs/bull'; import { Logger, Inject } from '@nestjs/common'; -import * as axios from 'axios'; import { Job } from 'bull'; import { CommonService } from '../shared/services/common.service'; import { REPOSITORY_INTERFACE } from '../module.config'; @@ -17,104 +16,115 @@ import { ConfigService } from '../shared/services/config.service'; @Processor('sync-rest') export class SyncRestProcessor { - private readonly logger = new Logger(SyncRestProcessor.name); - private horoscopeApi; + private readonly logger = new Logger(SyncRestProcessor.name); + private horoscopeApi; + private graphqlPrefix; - constructor( - private configService: ConfigService, - private commonService: CommonService, - @Inject(REPOSITORY_INTERFACE.IMULTISIG_TRANSACTION_REPOSITORY) - private multisigTransactionRepository: IMultisigTransactionRepository, - ) { - this.logger.log( - '============== Constructor Sync Rest Processor Service ==============', - ); - - this.horoscopeApi = this.configService.get('HOROSCOPE_API'); - } + constructor( + private configService: ConfigService, + private commonService: CommonService, + @Inject(REPOSITORY_INTERFACE.IMULTISIG_TRANSACTION_REPOSITORY) + private multisigTransactionRepository: IMultisigTransactionRepository, + ) { + this.logger.log( + '============== Constructor Sync Rest Processor Service ==============', + ); - @Process({ - name: 'sync-tx-by-height', - concurrency: 10, - }) - async handleQueryTxByHeight(job: Job) { - // this.logger.log(`Handle Job: ${JSON.stringify(job.data)}`); - const result = []; - const height = job.data.height; - const safes = job.data.safeAddresses; - const network = job.data.network; - const param = `transaction?chainid=${network.chainId}&blockHeight=${height}&needFullLog=true&pageLimit=100`; - let urlToCall = param; - let done = false; - let resultCallApi; - while (!done) { - try { - resultCallApi = await axios.default.get( - this.horoscopeApi + urlToCall, - ); - if (resultCallApi.data.data.transactions.length > 0) - resultCallApi.data.data.transactions.map((res) => { - result.push(res); - }); - if (resultCallApi.data.data.nextKey === null) { - done = true; - } else { - urlToCall = `${param}&nextKey=${encodeURIComponent(resultCallApi.data.data.nextKey)}`; - } - } catch (error) { - this.logger.error(error); - done = true; - } - } - this.logger.log(`Txs of block ${height}: ${JSON.stringify(result)}`); + this.horoscopeApi = this.configService.get('HOROSCOPE_API'); + this.graphqlPrefix = this.configService.get('GRAPHQL_PREFIX'); + } - try { - if (result.length > 0) { - await this.commonService.handleTransactions( - result, - safes, - network, - ); - } - } catch (error) { - this.logger.error(error); + @Process({ + name: 'sync-tx-by-height', + concurrency: 10, + }) + async handleQueryTxByHeight(job: Job) { + // this.logger.log(`Handle Job: ${JSON.stringify(job.data)}`); + const result = []; + const height = job.data.height; + const safes = job.data.safeAddresses; + const network = job.data.network; + const variables = { + height, + id: null, + }; + let done = false; + let resultCallApi; + while (!done) { + try { + resultCallApi = await this.commonService.queryGraphql( + this.horoscopeApi, + `query getTxsByHeight($height: Int = null, $id: Int = null) { + ${this.graphqlPrefix} { + transaction(where: {height: {_eq: $height}, id: {_gt: $id}}) { + id + data + } + } + }`, + 'getTxsByHeight', + variables, + ); + if (resultCallApi.data[this.graphqlPrefix].transaction.length > 0) { + resultCallApi.data[this.graphqlPrefix].transaction.map((res) => { + result.push(res.data); + }); + variables.id = resultCallApi.data[this.graphqlPrefix].transaction + .map((tx) => tx.id) + .sort((a, b) => b - a)[0]; + } else { + done = true; } + } catch (error) { + this.logger.error(error); + done = true; + } } + this.logger.log(`Txs of block ${height}: ${JSON.stringify(result)}`); - @OnQueueActive() - onActive(job: Job) { - this.logger.log(`Processing job ${job.id} of type ${job.name}...`); + try { + if (result.length > 0) { + await this.commonService.handleTransactions(result, safes, network); + } + } catch (error) { + this.logger.error(error); } + } - @OnQueueCompleted() - onComplete(job: Job, result: any) { - this.logger.log(`Completed job ${job.id} of type ${job.name}`); - this.logger.log(`Result: ${result}`); - } + @OnQueueActive() + onActive(job: Job) { + this.logger.log(`Processing job ${job.id} of type ${job.name}...`); + } - @OnQueueError() - onError(job: Job, error: Error) { - this.logger.error(`Job: ${job}`); - this.logger.error(`Error job ${job.id} of type ${job.name}`); - this.logger.error(`Error: ${error}`); - } + @OnQueueCompleted() + onComplete(job: Job, result: any) { + this.logger.log(`Completed job ${job.id} of type ${job.name}`); + this.logger.log(`Result: ${result}`); + } - @OnQueueFailed() - onFailed(job: Job, error: Error) { - this.logger.error(`Failed job ${job.id} of type ${job.name}`); - this.logger.error(`Error: ${error}`); - } + @OnQueueError() + onError(job: Job, error: Error) { + this.logger.error(`Job: ${job}`); + this.logger.error(`Error job ${job.id} of type ${job.name}`); + this.logger.error(`Error: ${error}`); + } - async checkTxFail(listData, network) { - const queries = []; - listData.map((data) => - queries.push( - this.multisigTransactionRepository.updateMultisigTransactionsByHashes( - data, - network.id, - ), - ), - ); - await Promise.all(queries); - } + @OnQueueFailed() + onFailed(job: Job, error: Error) { + this.logger.error(`Failed job ${job.id} of type ${job.name}`); + this.logger.error(`Error: ${error}`); + } + + async checkTxFail(listData, network) { + const queries = []; + listData.map((data) => + queries.push( + this.multisigTransactionRepository.updateMultisigTransactionsByHashes( + data, + network.id, + ), + ), + ); + await Promise.all(queries); + } } diff --git a/src/repositories/impls/multisig-transaction.repository.ts b/src/repositories/impls/multisig-transaction.repository.ts index 98472a1..f9fa103 100644 --- a/src/repositories/impls/multisig-transaction.repository.ts +++ b/src/repositories/impls/multisig-transaction.repository.ts @@ -54,7 +54,7 @@ export class MultisigTransactionRepository const successTxAddrs = []; const failTxAddrs = []; auraTxs.forEach((auraTx) => { - auraTx.code === '0' + String(auraTx.code) === '0' ? successTxAddrs.push(auraTx.txHash) : failTxAddrs.push(auraTx.txHash); }); diff --git a/src/services/impls/index.ts b/src/services/impls/index.ts index 0ff14e3..dbfe329 100644 --- a/src/services/impls/index.ts +++ b/src/services/impls/index.ts @@ -1,2 +1 @@ export * from './sync-rest.service'; -export * from './sync-websocket.service'; diff --git a/src/services/impls/sync-rest.service.ts b/src/services/impls/sync-rest.service.ts index bd4f22a..5dc0e26 100644 --- a/src/services/impls/sync-rest.service.ts +++ b/src/services/impls/sync-rest.service.ts @@ -1,6 +1,5 @@ /* eslint-disable prettier/prettier */ /* eslint-disable @typescript-eslint/no-var-requires */ -import * as axios from 'axios'; import { Inject, Injectable, Logger } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { ISyncRestService } from '../ISyncRestService'; @@ -29,6 +28,7 @@ export class SyncRestService implements ISyncRestService { private cacheKey; private horoscopeApi; private redisClient; + private graphqlPrefix; constructor( private configService: ConfigService, @@ -54,6 +54,7 @@ export class SyncRestService implements ISyncRestService { ); this.horoscopeApi = this.configService.get('HOROSCOPE_API'); this.cacheKey = this.configService.get('LAST_BLOCK_HEIGHT'); + this.graphqlPrefix = this.configService.get('GRAPHQL_PREFIX'); this.syncRest(); this.findTxByHash(); } @@ -65,25 +66,30 @@ export class SyncRestService implements ISyncRestService { await this.multisigTransactionRepository.findPendingMultisigTransaction( this.chain.id, ); - const result = await Promise.all( - listPendingTx.map((tx) => - axios.default.get( - this.horoscopeApi + - `transaction?chainid=${this.chain.chainId}&txHash=${tx.txHash}&pageLimit=100`, - ), - ), + if (listPendingTx.length === 0) return; + const result = await this.commonService.queryGraphql( + this.horoscopeApi, + `query pendingTxs($hashes: [String!] = "") { + ${this.graphqlPrefix} { + transaction(where: {hash: {_in: $hashes}}) { + data + } + } + }`, + 'pendingTxs', + { + hashes: listPendingTx.map((tx) => tx.txHash), + }, ); const listTx = []; - const txs = result - .filter((res) => res.data.data.transactions.length > 0) - .map((tx) => { - listTx.push(...tx.data.data.transactions); - return { - code: parseInt(tx.data.data.transactions[0].tx_response.code, 10), - txHash: tx.data.data.transactions[0].tx_response.txhash, - }; - }); + const txs = result.data[this.graphqlPrefix].transaction.map((tx) => { + listTx.push(tx.data); + return { + code: parseInt(tx.data.tx_response.code, 10), + txHash: tx.data.tx_response.txhash, + }; + }); if (txs.length > 0) { const safes = _.keyBy(this.listSafe, 'safeAddress'); await this.commonService.handleTransactions(listTx, safes, this.chain); @@ -117,11 +123,28 @@ export class SyncRestService implements ISyncRestService { async syncFromNetwork(network, listSafes: SafeInfo[]) { try { const safeAddresses = _.keyBy(listSafes, 'safeAddress'); - // Get the current latest block height on network - const requestResult = await axios.default.get( - this.horoscopeApi + `block?chainid=${network.chainId}&pageLimit=1`, + // // Get the current latest block height on network + // const requestResult = await axios.default.get( + // this.horoscopeApi + `block?chainid=${network.chainId}&pageLimit=1`, + // ); + // const height = requestResult.data.data.blocks[0].block.header.height; + + const queryResult = await this.commonService.queryGraphql( + this.horoscopeApi, + `query latestBlock { + ${this.graphqlPrefix} { + block_checkpoint(where: {job_name: {_eq: "crawl:block"}}) { + job_name + height + } + } + }`, + 'latestBlock', + {}, ); - const height = requestResult.data.data.blocks[0].block.header.height; + + const height = + queryResult.data[this.graphqlPrefix].block_checkpoint[0].height; // Get the last block height from cache (if exists) minus 2 blocks let cacheLastHeight = await this.redisClient.get(this.cacheKey); diff --git a/src/services/impls/sync-websocket.service.ts b/src/services/impls/sync-websocket.service.ts deleted file mode 100644 index d69ea28..0000000 --- a/src/services/impls/sync-websocket.service.ts +++ /dev/null @@ -1,86 +0,0 @@ -/* eslint-disable prettier/prettier */ -/* eslint-disable @typescript-eslint/no-this-alias */ -/* eslint-disable @typescript-eslint/no-var-requires */ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { REPOSITORY_INTERFACE } from '../../module.config'; -import * as WebSocket from 'socket.io-client'; -import { ISyncWebsocketService } from '../isync-websocket.service'; -import { ConfigService } from '../../shared/services/config.service'; -import { IChainRepository, ISafeRepository } from '../../repositories'; -import { CommonService } from '../../shared/services/common.service'; -const _ = require('lodash'); - -@Injectable() -export class SyncWebsocketService implements ISyncWebsocketService { - private readonly _logger = new Logger(SyncWebsocketService.name); - private chain: any = {}; - private chainIdSubscriber = ''; - private websocketSubscriber; - - constructor( - private configService: ConfigService, - private commonService: CommonService, - @Inject(REPOSITORY_INTERFACE.ISAFE_REPOSITORY) - private safeRepository: ISafeRepository, - @Inject(REPOSITORY_INTERFACE.ICHAIN_REPOSITORY) - private chainRepository: IChainRepository, - ) { - this._logger.log( - '============== Constructor Sync Websocket Service ==============', - ); - this.chainIdSubscriber = JSON.parse( - this.configService.get('CHAIN_SUBCRIBE'), - ); - this.websocketSubscriber = this.configService.get('WEBSOCKET_URL'); - this.startSyncWebsocket(); - } - - async startSyncWebsocket() { - this._logger.log('syncFromNetwork'); - const websocketUrl = this.websocketSubscriber; - const self = this; - this.chain = await this.chainRepository.findChainByChainId( - this.chainIdSubscriber, - ); - if (this.chain.rest.slice(-1) !== '/') - this.chain.rest = this.chain.rest + '/'; - const websocket = WebSocket.io(websocketUrl); - websocket.on('connect', () => { - console.log('Connected to websocket'); - }); - websocket.on('broadcast-safe-message', (data) => { - self.handleMessage(data); - }); - websocket.on('error', (error) => { - self._logger.error(error); - websocket.close(); - process.exit(1); - }); - websocket.on('close', () => { - self._logger.log('closed'); - websocket.close(); - process.exit(1); - }); - - return websocket; - } - - async handleMessage(listTx) { - this._logger.log(listTx); - try { - const existSafes = - await this.safeRepository.findSafeByInternalChainId( - this.chain.id, - ); - const safes = _.keyBy(existSafes, 'safeAddress'); - - await this.commonService.handleTransactions( - listTx, - safes, - this.chain, - ); - } catch (error) { - this._logger.error(error); - } - } -} diff --git a/src/services/index.ts b/src/services/index.ts index 6154212..acae7c5 100644 --- a/src/services/index.ts +++ b/src/services/index.ts @@ -1,2 +1 @@ export * from './isync-rest.service'; -export * from './isync-websocket.service'; diff --git a/src/services/isync-rest.service.ts b/src/services/isync-rest.service.ts index f42d65e..315b888 100644 --- a/src/services/isync-rest.service.ts +++ b/src/services/isync-rest.service.ts @@ -1,2 +1,13 @@ -// eslint-disable-next-line @typescript-eslint/no-empty-interface -export interface ISyncRestService {} +import { SafeInfo } from 'src/dtos/responses/get-safe-by-chain.response'; + +export interface ISyncRestService { + findTxByHash(); + + syncRest(); + + syncFromNetwork(network: any, listSafes: SafeInfo[]); + + getLatestBlockHeight(chainId: number); + + updateMultisigTxStatus(listData: any); +} diff --git a/src/services/isync-websocket.service.ts b/src/services/isync-websocket.service.ts deleted file mode 100644 index 3208849..0000000 --- a/src/services/isync-websocket.service.ts +++ /dev/null @@ -1,2 +0,0 @@ -// eslint-disable-next-line @typescript-eslint/no-empty-interface -export interface ISyncWebsocketService {} diff --git a/src/shared/services/common.service.ts b/src/shared/services/common.service.ts index 8b98a6f..86c6e5a 100644 --- a/src/shared/services/common.service.ts +++ b/src/shared/services/common.service.ts @@ -2,6 +2,7 @@ import { Inject, Logger } from '@nestjs/common'; import { uniq } from 'lodash'; import { Input, Output } from 'cosmjs-types/cosmos/bank/v1beta1/bank'; +import axios from 'axios'; import { ITransactionHistoryRepository } from 'src/repositories/itx-history.repository'; import { CONST_CHAR, MESSAGE_ACTION } from '../../common'; @@ -43,6 +44,20 @@ export class CommonService { private txHistoryRepository: ITransactionHistoryRepository, ) {} + async queryGraphql( + url: string, + query: string, + operationName: string, + variables: any, + ) { + const response = await axios.post(url, { + query, + variables, + operationName, + }); + return response.data; + } + async handleTransactions(listTx, safes, chain) { // const safeAddresses = Object.keys(safes); const syncTxs: any[] = [],