diff --git a/console-jest.config.js b/console-jest.config.js index 2f3268a45..c933afc6e 100644 --- a/console-jest.config.js +++ b/console-jest.config.js @@ -1,3 +1,3 @@ -import console from "console"; +import "reflect-metadata"; -global.console = console; +globalThis.console = console; diff --git a/package-lock.json b/package-lock.json index 761e3033b..72747ccfb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -43,7 +43,7 @@ "ts-jest": "^29.0.5", "typedoc": "^0.26.11", "typedoc-plugin-frontmatter": "1.0.0", - "typedoc-plugin-inline-sources": "1.2.0", + "typedoc-plugin-inline-sources": "^1.2.0", "typedoc-plugin-markdown": "4.2.10", "typescript": "5.1" } diff --git a/package.json b/package.json index 7ffba608c..b2e673a26 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "ts-jest": "^29.0.5", "typedoc": "^0.26.11", "typedoc-plugin-frontmatter": "1.0.0", - "typedoc-plugin-inline-sources": "1.2.0", + "typedoc-plugin-inline-sources": "^1.2.0", "typedoc-plugin-markdown": "4.2.10", "typescript": "5.1", "istanbul-merge": "^2.0.0" diff --git a/packages/indexer/src/api/GeneratedResolverFactoryGraphqlModule.ts b/packages/indexer/src/api/GeneratedResolverFactoryGraphqlModule.ts index 4f991d012..9aded3518 100644 --- a/packages/indexer/src/api/GeneratedResolverFactoryGraphqlModule.ts +++ b/packages/indexer/src/api/GeneratedResolverFactoryGraphqlModule.ts @@ -90,6 +90,7 @@ export class GeneratedResolverFactoryGraphqlModule extends ResolverFactoryGraphq public async initializePrismaClient() { // setup the prisma client and feed it to the server, // since this is necessary for the returned resolvers to work + const prismaClient = new PrismaClient({ // datasourceUrl: 'postgresql://admin:password@localhost:5433/protokit-indexer?schema=public' }); diff --git a/packages/persistance/prisma/migrations/20251222105633_pending_l1_transactions/migration.sql b/packages/persistance/prisma/migrations/20251222105633_pending_l1_transactions/migration.sql new file mode 100644 index 000000000..b98317704 --- /dev/null +++ b/packages/persistance/prisma/migrations/20251222105633_pending_l1_transactions/migration.sql @@ -0,0 +1,44 @@ +/* + Warnings: + + - You are about to drop the column `settlementTransactionHash` on the `Batch` table. All the data in the column will be lost. + - The primary key for the `Settlement` table will be changed. If it partially fails, the table could be left without primary key constraint. + - You are about to drop the column `transactionHash` on the `Settlement` table. All the data in the column will be lost. + - Added the required column `transactionId` to the `Settlement` table without a default value. This is not possible if the table is not empty. + +*/ +-- DropForeignKey +ALTER TABLE "Batch" DROP CONSTRAINT "Batch_settlementTransactionHash_fkey"; + +-- AlterTable +ALTER TABLE "Batch" DROP COLUMN "settlementTransactionHash", +ADD COLUMN "settlementTransactionId" TEXT; + +-- AlterTable +ALTER TABLE "Settlement" DROP CONSTRAINT "Settlement_pkey", +DROP COLUMN "transactionHash", +ADD COLUMN "transactionId" TEXT NOT NULL, +ADD CONSTRAINT "Settlement_pkey" PRIMARY KEY ("transactionId"); + +-- CreateTable +CREATE TABLE "PendingL1Transaction" ( + "id" TEXT NOT NULL, + "sender" TEXT NOT NULL, + "nonce" INTEGER NOT NULL, + "attempts" INTEGER NOT NULL, + "status" VARCHAR(32) NOT NULL, + "transaction" JSON NOT NULL, + "lastError" TEXT, + "sentAt" TIMESTAMP(3), + + CONSTRAINT "PendingL1Transaction_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "PendingL1Transaction_sender_nonce_key" ON "PendingL1Transaction"("sender", "nonce"); + +-- AddForeignKey +ALTER TABLE "Batch" ADD CONSTRAINT "Batch_settlementTransactionId_fkey" FOREIGN KEY ("settlementTransactionId") REFERENCES "Settlement"("transactionId") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "Settlement" ADD CONSTRAINT "Settlement_transactionId_fkey" FOREIGN KEY ("transactionId") REFERENCES "PendingL1Transaction"("id") ON DELETE RESTRICT ON UPDATE CASCADE; diff --git a/packages/persistance/prisma/schema.prisma b/packages/persistance/prisma/schema.prisma index d0ed1472e..ab4920bc7 100644 --- a/packages/persistance/prisma/schema.prisma +++ b/packages/persistance/prisma/schema.prisma @@ -105,8 +105,8 @@ model Batch { blocks Block[] - settlementTransactionHash String? - settlement Settlement? @relation(fields: [settlementTransactionHash], references: [transactionHash]) + settlementTransactionId String? + settlement Settlement? @relation(fields: [settlementTransactionId], references: [transactionId]) } model BlockResult { @@ -123,11 +123,11 @@ model BlockResult { } model Settlement { - // transaction String - transactionHash String @id + transactionId String @id promisedMessagesHash String - batches Batch[] + batches Batch[] + pendingL1Transaction PendingL1Transaction? @relation(fields: [transactionId], references: [id]) } model IncomingMessageBatchTransaction { @@ -148,3 +148,19 @@ model IncomingMessageBatch { messages IncomingMessageBatchTransaction[] } + +model PendingL1Transaction { + id String @id @default(cuid()) + sender String + nonce Int + attempts Int + status String @db.VarChar(32) + transaction Json @db.Json + hash String? + lastError String? + sentAt DateTime? + + settlement Settlement? + + @@unique([sender, nonce]) +} diff --git a/packages/persistance/src/PrismaDatabaseConnection.ts b/packages/persistance/src/PrismaDatabaseConnection.ts index 84440ca1c..5e0453962 100644 --- a/packages/persistance/src/PrismaDatabaseConnection.ts +++ b/packages/persistance/src/PrismaDatabaseConnection.ts @@ -13,6 +13,7 @@ import { PrismaBlockStorage } from "./services/prisma/PrismaBlockStorage"; import { PrismaSettlementStorage } from "./services/prisma/PrismaSettlementStorage"; import { PrismaMessageStorage } from "./services/prisma/PrismaMessageStorage"; import { PrismaTransactionStorage } from "./services/prisma/PrismaTransactionStorage"; +import { PrismaPendingL1TransactionStorage } from "./services/prisma/PrismaPendingL1TransactionStorage"; export interface PrismaDatabaseConfig { // Either object-based config or connection string @@ -82,6 +83,9 @@ export class PrismaDatabaseConnection transactionStorage: { useClass: PrismaTransactionStorage, }, + pendingL1TransactionStorage: { + useClass: PrismaPendingL1TransactionStorage, + }, }; } @@ -97,6 +101,7 @@ export class PrismaDatabaseConnection "IncomingMessageBatch", "IncomingMessageBatchTransaction", "LinkedLeaf", + "PendingL1Transaction", ]; await this.prismaClient.$transaction( diff --git a/packages/persistance/src/index.ts b/packages/persistance/src/index.ts index 788de3d57..88b5e7df1 100644 --- a/packages/persistance/src/index.ts +++ b/packages/persistance/src/index.ts @@ -7,6 +7,7 @@ export * from "./services/prisma/PrismaBatchStore"; export * from "./services/prisma/PrismaSettlementStorage"; export * from "./services/prisma/PrismaMessageStorage"; export * from "./services/prisma/PrismaTransactionStorage"; +export * from "./services/prisma/PrismaPendingL1TransactionStorage"; export * from "./services/prisma/mappers/BatchMapper"; export * from "./services/prisma/mappers/BlockMapper"; export * from "./services/prisma/mappers/FieldMapper"; diff --git a/packages/persistance/src/services/prisma/PrismaPendingL1TransactionStorage.ts b/packages/persistance/src/services/prisma/PrismaPendingL1TransactionStorage.ts new file mode 100644 index 000000000..29af4479c --- /dev/null +++ b/packages/persistance/src/services/prisma/PrismaPendingL1TransactionStorage.ts @@ -0,0 +1,92 @@ +import { inject, injectable } from "tsyringe"; +import { + PendingL1TransactionRecord, + PendingL1TransactionStatus, + PendingL1TransactionStorage, +} from "@proto-kit/sequencer"; + +import type { PrismaConnection } from "../../PrismaDatabaseConnection"; + +import { PendingL1TransactionMapper } from "./mappers/PendingL1TransactionMapper"; + +@injectable() +export class PrismaPendingL1TransactionStorage + implements PendingL1TransactionStorage +{ + private readonly mapper = new PendingL1TransactionMapper(); + + public constructor( + @inject("Database") private readonly connection: PrismaConnection + ) {} + + public async queue( + record: Omit + ): Promise { + const { prismaClient } = this.connection; + const status: PendingL1TransactionStatus = "queued"; + const txnRecord = await prismaClient.pendingL1Transaction.create({ + data: this.mapper.mapOut({ ...record, status }), + }); + return txnRecord.id; + } + + public async update( + id: string, + updates: Partial> + ): Promise { + const { prismaClient } = this.connection; + await prismaClient.pendingL1Transaction.update({ + where: { id }, + data: { + ...(updates.attempts !== undefined && { attempts: updates.attempts }), + ...(updates.status !== undefined && { status: updates.status }), + ...(updates.transaction !== undefined && { + transaction: updates.transaction.toJSON(), + }), + ...(updates.hash !== undefined && { hash: updates.hash }), + ...(updates.lastError !== undefined && { + lastError: updates.lastError, + }), + ...(updates.sentAt !== undefined && { sentAt: updates.sentAt }), + }, + }); + } + + public async delete(id: string): Promise { + const { prismaClient } = this.connection; + await prismaClient.pendingL1Transaction.delete({ + where: { + id, + }, + }); + } + + public async findById( + id: string + ): Promise { + const { prismaClient } = this.connection; + const record = await prismaClient.pendingL1Transaction.findUnique({ + where: { + id, + }, + }); + if (!record) { + return undefined; + } + return this.mapper.mapIn(record); + } + + public async findByStatuses( + statuses: PendingL1TransactionStatus[] + ): Promise { + const { prismaClient } = this.connection; + const rows = await prismaClient.pendingL1Transaction.findMany({ + where: { + status: { + in: statuses, + }, + }, + }); + return rows.map((record) => this.mapper.mapIn(record)); + } +} diff --git a/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts b/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts index e6b5addcb..47c5f3c31 100644 --- a/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts +++ b/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts @@ -18,7 +18,7 @@ export class PrismaSettlementStorage implements SettlementStorage { const batch = await prismaClient.batch.findFirst({ where: { - settlementTransactionHash: { + settlementTransactionId: { not: null, }, }, diff --git a/packages/persistance/src/services/prisma/mappers/BatchMapper.ts b/packages/persistance/src/services/prisma/mappers/BatchMapper.ts index 490c6cda4..e69f40fcb 100644 --- a/packages/persistance/src/services/prisma/mappers/BatchMapper.ts +++ b/packages/persistance/src/services/prisma/mappers/BatchMapper.ts @@ -21,7 +21,7 @@ export class BatchMapper const batch: PrismaBatch = { proof: input.proof, height: input.height, - settlementTransactionHash: null, + settlementTransactionId: null, }; return [batch, []]; } diff --git a/packages/persistance/src/services/prisma/mappers/PendingL1TransactionMapper.ts b/packages/persistance/src/services/prisma/mappers/PendingL1TransactionMapper.ts new file mode 100644 index 000000000..e672db0b4 --- /dev/null +++ b/packages/persistance/src/services/prisma/mappers/PendingL1TransactionMapper.ts @@ -0,0 +1,39 @@ +import { PendingL1Transaction, Prisma } from "@prisma/client"; +import { + PendingL1TransactionRecord, + PendingL1TransactionStatus, +} from "@proto-kit/sequencer"; +import { Mina } from "o1js"; + +export class PendingL1TransactionMapper { + public mapIn(input: PendingL1Transaction): PendingL1TransactionRecord { + return { + id: input.id, + sender: input.sender, + nonce: input.nonce, + attempts: input.attempts, + status: input.status as PendingL1TransactionStatus, + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + transaction: Mina.Transaction.fromJSON(input.transaction as any), + hash: input.hash ?? undefined, + lastError: input.lastError ?? undefined, + sentAt: input.sentAt ? new Date(input.sentAt) : undefined, + }; + } + + public mapOut( + input: PendingL1TransactionRecord + ): Prisma.PendingL1TransactionCreateInput { + return { + id: input.id, + sender: input.sender, + nonce: input.nonce, + attempts: input.attempts, + status: input.status, + transaction: input.transaction.toJSON(), + hash: input.hash ?? null, + lastError: input.lastError ?? null, + sentAt: input.sentAt ? input.sentAt.toJSON() : null, + }; + } +} diff --git a/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts b/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts index 5f923e34f..33979555c 100644 --- a/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts +++ b/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts @@ -12,7 +12,7 @@ export class SettlementMapper const [settlement, batches] = input; return { batches, - transactionHash: settlement.transactionHash, + transactionId: settlement.transactionId, promisedMessagesHash: settlement.promisedMessagesHash, }; } @@ -21,7 +21,7 @@ export class SettlementMapper return [ { promisedMessagesHash: input.promisedMessagesHash, - transactionHash: input.transactionHash, + transactionId: input.transactionId, }, input.batches, ]; diff --git a/packages/sequencer/src/index.ts b/packages/sequencer/src/index.ts index 15b423ea7..c8b7ce8b6 100644 --- a/packages/sequencer/src/index.ts +++ b/packages/sequencer/src/index.ts @@ -11,7 +11,6 @@ export * from "./sequencer/builder/Closeable"; export * from "./worker/flow/Flow"; export * from "./worker/flow/Task"; export * from "./worker/flow/JSONTaskSerializer"; -// export * from "./worker/queue/BullQueue"; export * from "./worker/queue/TaskQueue"; export * from "./worker/queue/LocalTaskQueue"; export * from "./worker/queue/ListenerList"; @@ -71,6 +70,7 @@ export * from "./storage/repositories/BlockStorage"; export * from "./storage/repositories/SettlementStorage"; export * from "./storage/repositories/MessageStorage"; export * from "./storage/repositories/TransactionStorage"; +export * from "./storage/repositories/PendingL1TransactionStorage"; export * from "./storage/inmemory/InMemoryDatabase"; export * from "./storage/inmemory/InMemoryAsyncMerkleTreeStore"; export * from "./storage/inmemory/InMemoryBlockStorage"; @@ -78,6 +78,7 @@ export * from "./storage/inmemory/InMemoryBatchStorage"; export * from "./storage/inmemory/InMemorySettlementStorage"; export * from "./storage/inmemory/InMemoryMessageStorage"; export * from "./storage/inmemory/InMemoryTransactionStorage"; +export * from "./storage/inmemory/InMemoryPendingL1TransactionStorage"; export * from "./storage/StorageDependencyFactory"; export * from "./storage/Database"; export * from "./storage/DatabasePruneModule"; @@ -107,6 +108,8 @@ export * from "./settlement/permissions/BaseLayerContractPermissions"; export * from "./settlement/permissions/ProvenSettlementPermissions"; export * from "./settlement/permissions/SignedSettlementPermissions"; export * from "./settlement/tasks/SettlementProvingTask"; +export * from "./settlement/transactions/L1TransactionRetryStrategy"; +export * from "./settlement/transactions/DefaultL1TransactionRetryStrategy"; export * from "./settlement/transactions/MinaTransactionSender"; export * from "./settlement/transactions/MinaTransactionSimulator"; export * from "./settlement/transactions/MinaSimulationService"; diff --git a/packages/sequencer/src/settlement/BridgingModule.ts b/packages/sequencer/src/settlement/BridgingModule.ts index 90bf94282..32d35b25a 100644 --- a/packages/sequencer/src/settlement/BridgingModule.ts +++ b/packages/sequencer/src/settlement/BridgingModule.ts @@ -42,7 +42,6 @@ import { FungibleToken } from "mina-fungible-token"; // eslint-disable-next-line import/no-extraneous-dependencies import groupBy from "lodash/groupBy"; -import { FeeStrategy } from "../protocol/baselayer/fees/FeeStrategy"; import type { MinaBaseLayer } from "../protocol/baselayer/MinaBaseLayer"; import { AsyncLinkedLeafStore } from "../state/async/AsyncLinkedLeafStore"; import { CachedLinkedLeafStore } from "../state/lmt/CachedLinkedLeafStore"; @@ -94,8 +93,6 @@ export class BridgingModule { private readonly outgoingMessageCollector: OutgoingMessageCollector, @inject("AsyncLinkedLeafStore") private readonly linkedLeafStore: AsyncLinkedLeafStore, - @inject("FeeStrategy") - private readonly feeStrategy: FeeStrategy, @inject("BaseLayer") private readonly baseLayer: MinaBaseLayer, @inject("SettlementSigner") private readonly signer: MinaSigner, @inject("TransactionSender") @@ -386,7 +383,6 @@ export class BridgingModule { sender: feepayer, // eslint-disable-next-line no-plusplus nonce: nonce++, - fee: this.feeStrategy.getFee(), memo: "pull state root", }, async () => { @@ -504,7 +500,6 @@ export class BridgingModule { sender: feepayer, // eslint-disable-next-line no-plusplus nonce: nonce++, - fee: this.feeStrategy.getFee(), memo: "roll up actions", }, async () => { diff --git a/packages/sequencer/src/settlement/SettlementModule.ts b/packages/sequencer/src/settlement/SettlementModule.ts index 597da53b3..2d2211f68 100644 --- a/packages/sequencer/src/settlement/SettlementModule.ts +++ b/packages/sequencer/src/settlement/SettlementModule.ts @@ -36,7 +36,6 @@ import type { MinaBaseLayer } from "../protocol/baselayer/MinaBaseLayer"; import { Batch, SettleableBatch } from "../storage/model/Batch"; import { BlockProofSerializer } from "../protocol/production/tasks/serializers/BlockProofSerializer"; import { Settlement } from "../storage/model/Settlement"; -import { FeeStrategy } from "../protocol/baselayer/fees/FeeStrategy"; import { SettlementStartupModule } from "../sequencer/SettlementStartupModule"; import { SettlementStorage } from "../storage/repositories/SettlementStorage"; @@ -46,6 +45,7 @@ import { SignedSettlementPermissions } from "./permissions/SignedSettlementPermi import { SettlementUtils } from "./utils/SettlementUtils"; import { BridgingModule } from "./BridgingModule"; import { MinaSigner } from "./MinaSigner"; +import { DefaultL1TransactionRetryStrategy } from "./transactions/DefaultL1TransactionRetryStrategy"; export type SettlementModuleEvents = { "settlement-submitted": [Batch]; @@ -75,8 +75,6 @@ export class SettlementModule @inject("TransactionSender") private readonly transactionSender: MinaTransactionSender, @inject("SettlementSigner") private readonly signer: MinaSigner, - @inject("FeeStrategy") - private readonly feeStrategy: FeeStrategy, private readonly settlementStartupModule: SettlementStartupModule ) { super(); @@ -88,6 +86,9 @@ export class SettlementModule BridgingModule: { useClass: BridgingModule, }, + L1TransactionRetryStrategy: { + useClass: DefaultL1TransactionRetryStrategy, + }, }; } @@ -166,7 +167,6 @@ export class SettlementModule { sender: feepayer, nonce: options?.nonce, - fee: this.feeStrategy.getFee(), memo: "Protokit settle", }, async () => { @@ -186,7 +186,7 @@ export class SettlementModule signingWithSignatureCheck: [...this.signer.getContractAddresses()], }); - const { hash: transactionHash } = + const { transactionId: txId } = await this.transactionSender.proveAndSendTransaction(tx, "included"); log.info("Settlement transaction sent and included"); @@ -194,7 +194,7 @@ export class SettlementModule const settlement = { batches: [batch.height], promisedMessagesHash: latestSequenceStateHash.toString(), - transactionHash, + transactionId: txId, }; await this.settlementStorage.pushSettlement(settlement); @@ -236,7 +236,6 @@ export class SettlementModule { sender: feepayer, nonce, - fee: this.feeStrategy.getFee(), memo: "Protokit settlement deploy", }, async () => { @@ -278,8 +277,7 @@ export class SettlementModule { sender: feepayer, nonce: nonce + 1, - fee: this.feeStrategy.getFee(), - memo: "Deploy MINA bridge", + memo: "Protokit settlement init", }, async () => { AccountUpdate.fundNewAccount(feepayer, 1); @@ -324,7 +322,6 @@ export class SettlementModule sender: feepayer, nonce: nonce, memo: `Deploy token bridge for ${truncate(tokenId.toString(), { length: 6 })}`, - fee: this.feeStrategy.getFee(), }, async () => { AccountUpdate.fundNewAccount(feepayer, 1); diff --git a/packages/sequencer/src/settlement/transactions/DefaultL1TransactionRetryStrategy.ts b/packages/sequencer/src/settlement/transactions/DefaultL1TransactionRetryStrategy.ts new file mode 100644 index 000000000..0a42bad9f --- /dev/null +++ b/packages/sequencer/src/settlement/transactions/DefaultL1TransactionRetryStrategy.ts @@ -0,0 +1,85 @@ +import { noop, sleep } from "@proto-kit/common"; +import { Transaction, UInt64 } from "o1js"; +import { inject } from "tsyringe"; + +import { + sequencerModule, + SequencerModule, +} from "../../sequencer/builder/SequencerModule"; +import { PendingL1TransactionRecord } from "../../storage/repositories/PendingL1TransactionStorage"; +import { FeeStrategy } from "../../protocol/baselayer/fees/FeeStrategy"; + +import { L1TransactionRetryStrategy } from "./L1TransactionRetryStrategy"; + +export type TransactionRetryConfig = { + maxAttempts?: number; + feeMultiplier?: number; + maxFee?: number; + retryDelayMs?: number; +}; + +const DEFAULT_RETRY_CONFIG: Required = { + maxAttempts: 3, + feeMultiplier: 1.1, + maxFee: 10 * 1e9, + retryDelayMs: 60 * 1000, // 1 minute +}; + +@sequencerModule() +export class DefaultL1TransactionRetryStrategy + extends SequencerModule + implements L1TransactionRetryStrategy +{ + public constructor( + @inject("FeeStrategy") private readonly feeStrategy: FeeStrategy + ) { + super(); + } + + private get retryConfig(): Required { + return { + maxAttempts: this.config.maxAttempts ?? DEFAULT_RETRY_CONFIG.maxAttempts, + feeMultiplier: + this.config.feeMultiplier ?? DEFAULT_RETRY_CONFIG.feeMultiplier, + maxFee: this.config.maxFee ?? DEFAULT_RETRY_CONFIG.maxFee, + retryDelayMs: + this.config.retryDelayMs ?? DEFAULT_RETRY_CONFIG.retryDelayMs, + }; + } + + public async start(): Promise { + noop(); + } + + public async shouldRetry( + record: PendingL1TransactionRecord + ): Promise { + return record.attempts < this.retryConfig.maxAttempts; + } + + public async prepareRetryTransaction( + record: PendingL1TransactionRecord + ): Promise> { + const tx = record.transaction; + const currentFee = tx.transaction.feePayer.body.fee; + const newFee = UInt64.from(this.bumpFee(Number(currentFee.toBigInt()))); + await tx.setFee(newFee); + // Delay if needed + await sleep( + Math.max( + 0, + this.retryConfig.retryDelayMs - + (Date.now() - (record.sentAt?.getTime() ?? 0)) + ) + ); + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + return tx as Transaction; + } + + private bumpFee(currentFee: number): number { + const { feeMultiplier, maxFee } = this.retryConfig; + const baseFee = this.feeStrategy.getFee(); + const bumped = Number(currentFee) * feeMultiplier; + return Math.floor(Math.min(Math.max(bumped, baseFee), maxFee)); + } +} diff --git a/packages/sequencer/src/settlement/transactions/L1TransactionRetryStrategy.ts b/packages/sequencer/src/settlement/transactions/L1TransactionRetryStrategy.ts new file mode 100644 index 000000000..0f950276a --- /dev/null +++ b/packages/sequencer/src/settlement/transactions/L1TransactionRetryStrategy.ts @@ -0,0 +1,11 @@ +import { Transaction } from "o1js"; + +import { PendingL1TransactionRecord } from "../../storage/repositories/PendingL1TransactionStorage"; + +export interface L1TransactionRetryStrategy { + shouldRetry(record: PendingL1TransactionRecord): Promise; + + prepareRetryTransaction( + record: PendingL1TransactionRecord + ): Promise>; +} diff --git a/packages/sequencer/src/settlement/transactions/MinaTransactionSender.ts b/packages/sequencer/src/settlement/transactions/MinaTransactionSender.ts index a594d85c0..aaee57e09 100644 --- a/packages/sequencer/src/settlement/transactions/MinaTransactionSender.ts +++ b/packages/sequencer/src/settlement/transactions/MinaTransactionSender.ts @@ -1,157 +1,117 @@ -import { fetchAccount, Mina, PublicKey, Transaction } from "o1js"; +import { fetchAccount, PublicKey, Transaction, UInt64 } from "o1js"; import { inject, injectable } from "tsyringe"; import { - EventEmitter, EventsRecord, - EventListenable, log, ReplayingSingleUseEventEmitter, filterNonUndefined, } from "@proto-kit/common"; import type { MinaBaseLayer } from "../../protocol/baselayer/MinaBaseLayer"; +import { + PendingL1TransactionRecord, + PendingL1TransactionStorage, +} from "../../storage/repositories/PendingL1TransactionStorage"; import { FlowCreator } from "../../worker/flow/Flow"; import { SettlementProvingTask, TransactionTaskResult, } from "../tasks/SettlementProvingTask"; +import { FeeStrategy } from "../../protocol/baselayer/fees/FeeStrategy"; +import { MinaSigner } from "../MinaSigner"; +import { closeable, Closeable } from "../../sequencer/builder/Closeable"; +import { pollTransactionStatus } from "../utils/MinaTransactionUtils"; +import { L1TransactionRetryStrategy } from "./L1TransactionRetryStrategy"; import { MinaTransactionSimulator } from "./MinaTransactionSimulator"; -type SenderKey = string; - export interface TxEvents extends EventsRecord { sent: [{ hash: string }]; included: [{ hash: string }]; rejected: [any]; } -export type TxSendResult = - Input extends "none" ? void : { hash: string }; +export type TxSendResult< + Input extends "sent" | "included" | "queued" | "none", +> = Input extends "none" ? void : { transactionId: string }; @injectable() -export class MinaTransactionSender { - private txStatusEmitters: Record> = {}; - - // TODO Persist all of that - private txQueue: Record = {}; +@closeable() +export class MinaTransactionSender implements Closeable { + private activeEmitters = new Map< + string, + ReplayingSingleUseEventEmitter + >(); - private txIdCursor: number = 0; - - private cache: { tx: Transaction; id: number }[] = []; + private pollingTimeout?: NodeJS.Timeout; public constructor( private readonly creator: FlowCreator, private readonly provingTask: SettlementProvingTask, private readonly simulator: MinaTransactionSimulator, - @inject("BaseLayer") private readonly baseLayer: MinaBaseLayer - ) {} - - public async getNextNonce(sender: PublicKey): Promise { - const account = await this.simulator.getAccount(sender); - return parseInt(account.nonce.toString(), 10); + @inject("BaseLayer") private readonly baseLayer: MinaBaseLayer, + @inject("PendingL1TransactionStorage") + private readonly pendingStorage: PendingL1TransactionStorage, + @inject("L1TransactionRetryStrategy") + private readonly retryStrategy: L1TransactionRetryStrategy, + @inject("SettlementSigner") private readonly signer: MinaSigner, + @inject("FeeStrategy") private readonly feeStrategy: FeeStrategy + ) { + this.startPolling(); } - private async trySendCached({ - tx, - id, - }: { - tx: Transaction; - id: number; - }): Promise { - const feePayer = tx.transaction.feePayer.body; - const sender = feePayer.publicKey.toBase58(); - const senderQueue = this.txQueue[sender]; - - const sendable = senderQueue.at(0) === Number(feePayer.nonce.toString()); - if (sendable) { - const txId = await tx.send(); - - const statusEmitter = this.txStatusEmitters[id]; - log.info(`Sent L1 transaction ${txId.hash}`); - statusEmitter.emit("sent", { hash: txId.hash }); - - txId.wait().then( - (included) => { - log.info(`L1 transaction ${included.hash} has been included`); - statusEmitter.emit("included", { hash: included.hash }); - }, - (error) => { - log.info("Waiting on L1 transaction threw and error", error); - statusEmitter.emit("rejected", error); - } - ); - - senderQueue.pop(); - return txId; - } - return undefined; - } - - private async resolveCached(): Promise { - const indizesToRemove: number[] = []; - for (let i = 0; i < this.cache.length; i++) { - // eslint-disable-next-line no-await-in-loop - const result = await this.trySendCached(this.cache[i]); - if (result !== undefined) { - indizesToRemove.push(i); - } - } - this.cache = this.cache.filter( - (ignored, index) => !indizesToRemove.includes(index) - ); - return indizesToRemove.length; + private getEmitterKey(sender: string, nonce: number): string { + return `${sender}:${nonce}`; } - private async sendOrQueue( - tx: Transaction - ): Promise> { - // eslint-disable-next-line no-plusplus - const id = this.txIdCursor++; - this.cache.push({ tx, id }); - const eventEmitter = new ReplayingSingleUseEventEmitter(); - this.txStatusEmitters[id] = eventEmitter; - - let removedLastIteration = 0; - do { - // eslint-disable-next-line no-await-in-loop - removedLastIteration = await this.resolveCached(); - } while (removedLastIteration > 0); - - // This altered return type only exposes listening-related functions and erases the rest - return eventEmitter; + public async getNextNonce(sender: PublicKey): Promise { + const account = await this.simulator.getAccount(sender); + return parseInt(account.nonce.toString(), 10); } + /** + * If there is a transaction with a lower nonce thats not included yet, + * this transaction will be queued instead. + * @param transaction - The transaction to prove and send. + * @param waitOnStatus + * @returns + */ public async proveAndSendTransaction< - Wait extends "sent" | "included" | "none", + Wait extends "sent" | "included" | "queued" | "none", >( transaction: Transaction, waitOnStatus: Wait ): Promise> { const { publicKey, nonce } = transaction.transaction.feePayer.body; - - log.debug( - `Proving tx from sender ${publicKey.toBase58()} nonce ${nonce.toString()}` + const sender = publicKey.toBase58(); + const nonceNum = Number(nonce.toString()); + const unsignedTx = await transaction.setFee( + UInt64.from(this.feeStrategy.getFee()) ); + const signedTx = this.signer.signTx(unsignedTx); - // Add Transaction to sender's queue - (this.txQueue[publicKey.toBase58()] ??= []).push(Number(nonce.toString())); + // Setup emitter before queueing + const emitterKey = this.getEmitterKey(sender, nonceNum); + const emitter = new ReplayingSingleUseEventEmitter(); + this.activeEmitters.set(emitterKey, emitter); + + log.debug(`Proving tx from sender ${sender} nonce ${nonce.toString()}`); const flow = this.creator.createFlow( - `tx-${publicKey.toBase58()}-${nonce.toString()}`, + `tx-${sender}-${nonce.toString()}`, {} ); const accounts = await Promise.all( - transaction.transaction.accountUpdates.map( + signedTx.transaction.accountUpdates.map( async (au) => await fetchAccount({ publicKey: au.publicKey, tokenId: au.tokenId }) ) ); // Load accounts - await this.simulator.getAccounts(transaction); - await this.simulator.applyTransaction(transaction); + await this.simulator.getAccounts(signedTx); + await this.simulator.applyTransaction(signedTx); log.trace("Applied transaction to local simulated ledger"); @@ -163,7 +123,7 @@ export class MinaTransactionSender { await flow.pushTask( this.provingTask, { - transaction, + transaction: signedTx, chainState: { graphql, accounts: accounts @@ -184,27 +144,242 @@ export class MinaTransactionSender { log.trace(result.transaction.toPretty()); - const txStatus = await this.sendOrQueue(result.transaction); - + // Queue the transaction + const txnId = await this.pendingStorage.queue({ + sender, + nonce: nonceNum, + attempts: 0, + transaction: result.transaction, + sentAt: new Date(), + }); + if (waitOnStatus === "queued") { + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + return { transactionId: txnId } as TxSendResult; + } + let waitPromise: Promise> | undefined = + undefined; if (waitOnStatus !== "none") { const waitInstruction: "sent" | "included" = waitOnStatus; - const hash = await new Promise>( + waitPromise = new Promise>( (resolve, reject) => { - txStatus.on(waitInstruction, (txSendResult) => { - log.info(`Tx ${txSendResult.hash} included`); - resolve(txSendResult); + emitter.on(waitInstruction, (txSendResult) => { + log.info(`Tx ${txnId} ${waitInstruction}`); + resolve({ transactionId: txnId }); + if (waitInstruction === "included") { + this.activeEmitters.delete(emitterKey); + } }); - txStatus.on("rejected", (error) => { + emitter.on("rejected", (error) => { reject(error); + this.activeEmitters.delete(emitterKey); }); } ); + } + + // Send immediately only if there is no pending tx with a lower nonce for this sender. + const pendingForSender = ( + await this.pendingStorage.findByStatuses(["queued", "sent"]) + ).filter((r) => r.sender === sender); + const hasLowerNoncePending = pendingForSender.some( + (r) => r.id !== txnId && r.nonce < nonceNum + ); + + if (!hasLowerNoncePending) { + await this.sendTransaction({ + id: txnId, + status: "queued", + sender, + nonce: nonceNum, + attempts: 0, + transaction: result.transaction, + sentAt: new Date(), + }); + } - // Yeah that's not super clean, but couldn't figure out a better way tbh + if (waitPromise) { + const { transactionId: txId } = await waitPromise; // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - return hash as TxSendResult; + return { transactionId: txId } as TxSendResult; } + + // If waitOnStatus is none, delete the emitter. + this.activeEmitters.delete(emitterKey); + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions return undefined as TxSendResult; } + + private startPolling() { + const intervalMs = 5000; + const poll = async () => { + try { + await this.processPendingTransactions(); + } catch (e) { + log.error("Error in MinaTransactionSender polling loop", e); + } finally { + this.pollingTimeout = setTimeout(poll, intervalMs); + this.pollingTimeout.unref?.(); // important for unit tests. + } + }; + this.pollingTimeout = setTimeout(poll, intervalMs); + this.pollingTimeout.unref?.(); + } + + public async close(): Promise { + if (this.pollingTimeout !== undefined) { + clearTimeout(this.pollingTimeout); + this.pollingTimeout = undefined; + } + } + + private async processPendingTransactions() { + // Find all pending transactions, state: queued | sent + const pendingTransactions = await this.pendingStorage.findByStatuses([ + "queued", + "sent", + ]); + + const bySender: Record = {}; + for (const tx of pendingTransactions) { + (bySender[tx.sender] ??= []).push(tx); + } + + /* eslint-disable no-await-in-loop */ + for (const sender of Object.keys(bySender)) { + // Sort in ascending order of nonce + const txs = bySender[sender].sort((a, b) => a.nonce - b.nonce); + // eslint-disable-next-line no-continue + if (txs.length === 0) continue; + + // Send the first queued transaction, + // transactions stays in queued state until the previous transaction is included or rejected + const txToSend = txs[0]; + if (txToSend.status === "queued") { + await this.sendTransaction(txToSend); + } + // If the transaction is sent and the emitter is not active, + else if ( + txToSend.status === "sent" && + !this.activeEmitters.has( + this.getEmitterKey(txToSend.sender, txToSend.nonce) + ) + ) { + await this.checkSentTransactionStatusAndRetry(txToSend); + } + } + /* eslint-enable no-await-in-loop */ + } + + private async checkSentTransactionStatusAndRetry( + txToSend: PendingL1TransactionRecord + ) { + // Check L1 for inclusion and retry if needed + if (txToSend.hash === undefined) { + // Transaction not found, send again + await this.retryTransaction(txToSend); + return; + } + const inclusionStatus = await pollTransactionStatus(txToSend.hash); + if (inclusionStatus === "included") { + await this.pendingStorage.update(txToSend.id, { + status: "included", + }); + } else { + // Transaction not included, retry + await this.retryTransaction(txToSend); + } + } + + private async sendTransaction(record: PendingL1TransactionRecord) { + const tx = record.transaction; + const emitterKey = this.getEmitterKey(record.sender, record.nonce); + const emitter = this.activeEmitters.get(emitterKey); + + try { + const pendingTx = await tx.send(); + // Update DB + await this.pendingStorage.update(record.id, { + status: "sent", + attempts: record.attempts + 1, + sentAt: new Date(), + transaction: tx, + hash: pendingTx.hash, + }); + + log.info( + `Sent L1 transaction ${pendingTx.hash} for nonce ${record.nonce} (Attempt ${record.attempts + 1})` + ); + emitter?.emit("sent", { hash: pendingTx.hash }); + + // Wait for inclusion + pendingTx.wait().then( + async (included) => { + log.info(`Transaction ${included.hash} included`); + emitter?.emit("included", { hash: included.hash }); + await this.pendingStorage.update(record.id, { status: "included" }); + this.activeEmitters.delete(emitterKey); + }, + async (error) => { + log.info(`Transaction ${pendingTx.hash} failed/rejected`, error); + // retry the transaction + await this.retryTransaction(record); + } + ); + } catch (error) { + log.error( + `Failed to send transaction ${record.sender}:${record.nonce}`, + error + ); + await this.pendingStorage.update(record.id, { + status: "failed", + lastError: error instanceof Error ? error.message : String(error), + }); + } + } + + private async retryTransaction(record: PendingL1TransactionRecord) { + const shouldRetry = await this.retryStrategy.shouldRetry(record); + if (!shouldRetry) { + const emitterKey = this.getEmitterKey(record.sender, record.nonce); + const emitter = this.activeEmitters.get(emitterKey); + if (emitter) { + emitter.emit( + "rejected", + new Error(`Max attempts reached for ${record.sender}:${record.nonce}`) + ); + this.activeEmitters.delete(emitterKey); + } + return; + } + // Prepare retry + try { + const retryTx = await this.retryStrategy.prepareRetryTransaction(record); + const signedRetryTx = this.signer.signTx( + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + retryTx as Transaction + ); + // Send the retry transaction + await this.sendTransaction({ + ...record, + transaction: signedRetryTx, + attempts: record.attempts + 1, + }); + } catch (error) { + log.error( + `Failed to prepare retry for ${record.sender}:${record.nonce}`, + error + ); + await this.pendingStorage.update(record.id, { + status: "failed", + lastError: error instanceof Error ? error.message : String(error), + }); + const emitterKey = this.getEmitterKey(record.sender, record.nonce); + const emitter = this.activeEmitters.get(emitterKey); + if (emitter) { + emitter.emit("rejected", error); + } + this.activeEmitters.delete(emitterKey); + } + } } diff --git a/packages/sequencer/src/settlement/utils/MinaTransactionUtils.ts b/packages/sequencer/src/settlement/utils/MinaTransactionUtils.ts new file mode 100644 index 000000000..56d799715 --- /dev/null +++ b/packages/sequencer/src/settlement/utils/MinaTransactionUtils.ts @@ -0,0 +1,31 @@ +import { checkZkappTransaction } from "o1js"; +import { log, sleep } from "@proto-kit/common"; + +/** + * Polls checkZkappTransaction until the transaction is successful or times out + * @param txnHash - The transaction hash to check + * @param pollIntervalMs - Interval between polls in milliseconds (default: 10000) + * @param maxPolls - Maximum number of polls before giving up (default: 60) + * @returns + */ +export async function pollTransactionStatus( + txnHash: string, + pollIntervalMs: number = 10000, + maxPolls: number = 60 +): Promise<"included" | "not-found"> { + /* eslint-disable no-await-in-loop */ + for (let pollCount = 0; pollCount < maxPolls; pollCount++) { + const result = await checkZkappTransaction(txnHash); + + if (result.success) { + log.info(`Transaction ${txnHash} confirmed as successful`); + return "included"; + } + // Wait before next poll + if (pollCount < maxPolls - 1) { + await sleep(pollIntervalMs); + } + } + /* eslint-enable no-await-in-loop */ + return "not-found"; +} diff --git a/packages/sequencer/src/storage/StorageDependencyFactory.ts b/packages/sequencer/src/storage/StorageDependencyFactory.ts index e53d3b0c9..c7c13319a 100644 --- a/packages/sequencer/src/storage/StorageDependencyFactory.ts +++ b/packages/sequencer/src/storage/StorageDependencyFactory.ts @@ -11,6 +11,7 @@ import { AsyncMerkleTreeStore } from "../state/async/AsyncMerkleTreeStore"; import { BatchStorage } from "./repositories/BatchStorage"; import { BlockQueue, BlockStorage } from "./repositories/BlockStorage"; import { MessageStorage } from "./repositories/MessageStorage"; +import { PendingL1TransactionStorage } from "./repositories/PendingL1TransactionStorage"; import { SettlementStorage } from "./repositories/SettlementStorage"; import { TransactionStorage } from "./repositories/TransactionStorage"; @@ -28,6 +29,7 @@ export interface StorageDependencyMinimumDependencies extends DependencyRecord { messageStorage: DependencyDeclaration; settlementStorage: DependencyDeclaration; transactionStorage: DependencyDeclaration; + pendingL1TransactionStorage: DependencyDeclaration; } export interface StorageDependencyFactory extends DependencyFactory { diff --git a/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts b/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts index 5406d539f..7e1e2d323 100644 --- a/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts +++ b/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts @@ -16,6 +16,7 @@ import { InMemoryMessageStorage } from "./InMemoryMessageStorage"; import { InMemorySettlementStorage } from "./InMemorySettlementStorage"; import { InMemoryTransactionStorage } from "./InMemoryTransactionStorage"; import { InMemoryAsyncMerkleTreeStore } from "./InMemoryAsyncMerkleTreeStore"; +import { InMemoryPendingL1TransactionStorage } from "./InMemoryPendingL1TransactionStorage"; @sequencerModule() @closeable() @@ -55,6 +56,9 @@ export class InMemoryDatabase extends SequencerModule implements Database { transactionStorage: { useClass: InMemoryTransactionStorage, }, + pendingL1TransactionStorage: { + useClass: InMemoryPendingL1TransactionStorage, + }, }; } diff --git a/packages/sequencer/src/storage/inmemory/InMemoryPendingL1TransactionStorage.ts b/packages/sequencer/src/storage/inmemory/InMemoryPendingL1TransactionStorage.ts new file mode 100644 index 000000000..8e77ac79b --- /dev/null +++ b/packages/sequencer/src/storage/inmemory/InMemoryPendingL1TransactionStorage.ts @@ -0,0 +1,56 @@ +import { + PendingL1TransactionRecord, + PendingL1TransactionStatus, + PendingL1TransactionStorage, +} from "../repositories/PendingL1TransactionStorage"; + +export class InMemoryPendingL1TransactionStorage + implements PendingL1TransactionStorage +{ + // Key: sender:nonce + private store = new Map(); + + public async queue( + record: Omit + ): Promise { + const key = Math.random().toString(36).substring(2, 15); + this.store.set(key, { + ...record, + id: key, + status: "queued", + }); + return key; + } + + public async update( + id: string, + updates: Partial> + ): Promise { + const existing = this.store.get(id); + if (existing === undefined) { + return; + } + this.store.set(id, { + ...existing, + ...updates, + }); + } + + public async delete(id: string): Promise { + this.store.delete(id); + } + + public async findById( + id: string + ): Promise { + return this.store.get(id); + } + + public async findByStatuses( + statuses: PendingL1TransactionStatus[] + ): Promise { + return Array.from(this.store.values()).filter((record) => + statuses.includes(record.status) + ); + } +} diff --git a/packages/sequencer/src/storage/model/Settlement.ts b/packages/sequencer/src/storage/model/Settlement.ts index f3b91ab73..ea8c807f0 100644 --- a/packages/sequencer/src/storage/model/Settlement.ts +++ b/packages/sequencer/src/storage/model/Settlement.ts @@ -1,5 +1,5 @@ export interface Settlement { - transactionHash: string; + transactionId: string; promisedMessagesHash: string; batches: number[]; } diff --git a/packages/sequencer/src/storage/repositories/PendingL1TransactionStorage.ts b/packages/sequencer/src/storage/repositories/PendingL1TransactionStorage.ts new file mode 100644 index 000000000..0586b7ce8 --- /dev/null +++ b/packages/sequencer/src/storage/repositories/PendingL1TransactionStorage.ts @@ -0,0 +1,38 @@ +import { Transaction } from "o1js"; + +export type PendingL1TransactionStatus = + | "queued" + | "sent" + | "included" + | "failed"; + +export interface PendingL1TransactionRecord { + id: string; + sender: string; + nonce: number; + attempts: number; + status: PendingL1TransactionStatus; + transaction: Transaction; + hash?: string; + lastError?: string; + sentAt?: Date; +} + +export interface PendingL1TransactionStorage { + queue( + record: Omit + ): Promise; + + update( + id: string, + updates: Partial> + ): Promise; + + delete(id: string): Promise; + + findById(id: string): Promise; + + findByStatuses( + statuses: PendingL1TransactionStatus[] + ): Promise; +} diff --git a/packages/sequencer/test/TestingSequencer.ts b/packages/sequencer/test/TestingSequencer.ts index 41715c40a..8f9f9f9fe 100644 --- a/packages/sequencer/test/TestingSequencer.ts +++ b/packages/sequencer/test/TestingSequencer.ts @@ -15,6 +15,7 @@ import { SequencerStartupModule, } from "../src"; import { ConstantFeeStrategy } from "../src/protocol/baselayer/fees/ConstantFeeStrategy"; +import { DefaultL1TransactionRetryStrategy } from "../src/settlement/transactions/DefaultL1TransactionRetryStrategy"; export interface DefaultTestingSequencerModules extends SequencerModulesRecord { Database: typeof InMemoryDatabase; @@ -27,6 +28,7 @@ export interface DefaultTestingSequencerModules extends SequencerModulesRecord { TaskQueue: typeof LocalTaskQueue; FeeStrategy: typeof ConstantFeeStrategy; SequencerStartupModule: typeof SequencerStartupModule; + L1TransactionRetryStrategy: typeof DefaultL1TransactionRetryStrategy; } export function testingSequencerModules< @@ -52,6 +54,7 @@ export function testingSequencerModules< TaskQueue: LocalTaskQueue, FeeStrategy: ConstantFeeStrategy, SequencerStartupModule, + L1TransactionRetryStrategy: DefaultL1TransactionRetryStrategy, } satisfies DefaultTestingSequencerModules; return { diff --git a/packages/sequencer/test/integration/BlockProduction-test.ts b/packages/sequencer/test/integration/BlockProduction-test.ts index 3189f4798..2629298c5 100644 --- a/packages/sequencer/test/integration/BlockProduction-test.ts +++ b/packages/sequencer/test/integration/BlockProduction-test.ts @@ -150,6 +150,7 @@ export function testBlockProduction< TaskQueue: {}, FeeStrategy: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, }, Runtime: { Balance: {}, diff --git a/packages/sequencer/test/integration/BlockProductionSize.test.ts b/packages/sequencer/test/integration/BlockProductionSize.test.ts index ad263f018..302ff2b0d 100644 --- a/packages/sequencer/test/integration/BlockProductionSize.test.ts +++ b/packages/sequencer/test/integration/BlockProductionSize.test.ts @@ -80,6 +80,7 @@ describe("block limit", () => { LocalTaskWorkerModule: VanillaTaskWorkerModules.defaultConfig(), BaseLayer: {}, TaskQueue: {}, + L1TransactionRetryStrategy: {}, FeeStrategy: {}, SequencerStartupModule: {}, }, diff --git a/packages/sequencer/test/integration/Mempool.test.ts b/packages/sequencer/test/integration/Mempool.test.ts index 2609ef78f..beca99a31 100644 --- a/packages/sequencer/test/integration/Mempool.test.ts +++ b/packages/sequencer/test/integration/Mempool.test.ts @@ -107,6 +107,7 @@ describe.each([["InMemory", InMemoryDatabase]])( BaseLayer: {}, TaskQueue: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, }, Protocol: { AccountState: {}, diff --git a/packages/sequencer/test/integration/Proven.test.ts b/packages/sequencer/test/integration/Proven.test.ts index 6de1a5517..3a9abfb64 100644 --- a/packages/sequencer/test/integration/Proven.test.ts +++ b/packages/sequencer/test/integration/Proven.test.ts @@ -98,6 +98,7 @@ describe.skip("Proven", () => { TaskQueue: {}, FeeStrategy: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, BaseLayer: { network: { type: "local", diff --git a/packages/sequencer/test/integration/StorageIntegration.test.ts b/packages/sequencer/test/integration/StorageIntegration.test.ts index b26a5008b..99382807b 100644 --- a/packages/sequencer/test/integration/StorageIntegration.test.ts +++ b/packages/sequencer/test/integration/StorageIntegration.test.ts @@ -106,6 +106,7 @@ describe.each([["InMemory", InMemoryDatabase]])( TaskQueue: {}, FeeStrategy: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, }, Protocol: { AccountState: {}, diff --git a/packages/sequencer/test/protocol/production/sequencing/atomic-block-production.test.ts b/packages/sequencer/test/protocol/production/sequencing/atomic-block-production.test.ts index a9b156023..83bc33b84 100644 --- a/packages/sequencer/test/protocol/production/sequencing/atomic-block-production.test.ts +++ b/packages/sequencer/test/protocol/production/sequencing/atomic-block-production.test.ts @@ -58,6 +58,7 @@ describe("atomic block production", () => { TaskQueue: {}, FeeStrategy: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, }, Runtime: { Balance: {}, diff --git a/packages/sequencer/test/settlement/MinaTransactionSender.test.ts b/packages/sequencer/test/settlement/MinaTransactionSender.test.ts new file mode 100644 index 000000000..44b5ea601 --- /dev/null +++ b/packages/sequencer/test/settlement/MinaTransactionSender.test.ts @@ -0,0 +1,266 @@ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ +import "reflect-metadata"; +import { jest } from "@jest/globals"; +import { Transaction } from "o1js"; + +import type { PendingL1TransactionStorage } from "../../src/storage/repositories/PendingL1TransactionStorage"; +import { InMemoryPendingL1TransactionStorage } from "../../src/storage/inmemory/InMemoryPendingL1TransactionStorage"; + +let MinaTransactionSender: any; +// eslint-disable-next-line @typescript-eslint/no-unused-vars +let pollTransactionStatus: jest.Mock; + +beforeAll(async () => { + // MinaTransactionSender imports pollTransactionStatus directly, so we need to mock + // the module BEFORE importing MinaTransactionSender (ESM). + jest.unstable_mockModule( + "../../src/settlement/utils/MinaTransactionUtils", + () => ({ + pollTransactionStatus: jest.fn(), + }) + ); + ({ MinaTransactionSender } = await import( + "../../src/settlement/transactions/MinaTransactionSender" + )); + const utils = await import("../../src/settlement/utils/MinaTransactionUtils"); + pollTransactionStatus = utils.pollTransactionStatus as unknown as jest.Mock; +}); + +function makeSender(pendingStorage: PendingL1TransactionStorage) { + const sender = new MinaTransactionSender( + { + createFlow: jest.fn(() => ({ + withFlow: async (fn: any) => + await new Promise((resolve, reject) => { + fn(resolve, reject); + }), + pushTask: async ( + _task: any, + params: { transaction: any }, + onResult: (result: any) => Promise + ) => { + await onResult({ transaction: params.transaction }); + }, + })), + }, + {}, + { + getAccount: jest.fn(async () => ({ nonce: { toString: () => "0" } })), + getAccounts: jest.fn(async () => undefined), + applyTransaction: jest.fn(async () => undefined), + } as any, + { config: { network: { type: "local" } } } as any, + pendingStorage as any, + { + shouldRetry: jest.fn(async () => true), + prepareRetryTransaction: jest.fn( + async (record: any) => record.transaction + ), + } as any, + { signTx: jest.fn((tx: any) => tx) } as any, + { getFee: () => 0 } as any + ); + return sender as typeof MinaTransactionSender; +} + +function deferred() { + let resolve!: (value: T) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +async function flush(): Promise { + await new Promise((resolve) => { + setImmediate(resolve); + }); +} + +function makeTx({ senderBase58 = "S", nonce = 0, hash = "TX_HASH" } = {}) { + const waitDeferred = deferred<{ hash: string }>(); + + const pendingTx = { + hash, + wait: jest.fn(async () => await waitDeferred.promise), + }; + + const tx: any = { + // used by proveAndSendTransaction() before proving + transaction: { + feePayer: { + body: { + publicKey: { toBase58: () => senderBase58 }, + nonce: { toString: () => String(nonce) }, + }, + }, + accountUpdates: [], + }, + setFee: jest.fn(async () => tx), + send: jest.fn(async () => pendingTx), + toPretty: () => "", + }; + + return { tx: tx as Transaction, pendingTx, waitDeferred }; +} + +describe("MinaTransactionSender (unit)", () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + it("proveAndSendTransaction should immediately send if there is no lower-nonce pending tx, and resolve 'sent'", async () => { + const pendingStorage: PendingL1TransactionStorage = + new InMemoryPendingL1TransactionStorage(); + const sender = makeSender(pendingStorage); + await sender.close(); // to avoid polling + + const { tx } = makeTx({ senderBase58: "S", nonce: 0, hash: "H1" }); + + const { transactionId } = await sender.proveAndSendTransaction(tx, "sent"); + + const record = await pendingStorage.findById(transactionId as string); + + expect(record?.status).toBe("sent"); + expect(record?.hash).toBe("H1"); + expect(record?.attempts).toBe(1); + expect(tx.send).toHaveBeenCalledTimes(1); + }); + + it("proveAndSendTransaction should transition sent -> included after pendingTx.wait resolves", async () => { + const pendingStorage: PendingL1TransactionStorage = + new InMemoryPendingL1TransactionStorage(); + const sender = makeSender(pendingStorage); + await sender.close(); + + const { tx, waitDeferred } = makeTx({ + senderBase58: "S", + nonce: 0, + hash: "H2", + }); + + const txnPromise = sender.proveAndSendTransaction(tx, "included"); + // Ensure sendTransaction installed the wait().then handlers before resolving + await flush(); + waitDeferred.resolve({ hash: "H2" }); + const { transactionId } = await txnPromise; + // waitPromise resolves on emitter "included" (before the DB update finishes) + await flush(); + const record = await pendingStorage.findById(transactionId as string); + expect(record?.status).toBe("included"); + }); + + it("should work for multiple transactions", async () => { + const pendingStorage: PendingL1TransactionStorage = + new InMemoryPendingL1TransactionStorage(); + const sender = makeSender(pendingStorage); + await sender.close(); // we'll manually drive polling + + // tx0 will be sent immediately, but we delay inclusion so tx1 should be queued. + const tx0 = makeTx({ senderBase58: "S", nonce: 0, hash: "H0" }); + const tx1 = makeTx({ senderBase58: "S", nonce: 1, hash: "H1" }); + + const sent0 = await sender.proveAndSendTransaction(tx0.tx, "sent"); + expect(tx0.tx.send).toHaveBeenCalledTimes(1); + + const p1 = sender.proveAndSendTransaction(tx1.tx, "sent"); + + // tx1 must NOT be sent while tx0 is still pending (lower nonce). + await flush(); + expect(tx1.tx.send).toHaveBeenCalledTimes(0); + + // Include tx0, then run the poll loop once to send the next queued tx. + tx0.waitDeferred.resolve({ hash: "H0" }); + // Let the inclusion handler update storage + clear the active emitter + await flush(); + await flush(); + + await (sender as any).processPendingTransactions(); + await flush(); + + // Now tx1 should get sent and the waiting promise should resolve. + expect(tx1.tx.send).toHaveBeenCalledTimes(1); + const sent1 = await p1; + + const r0 = await pendingStorage.findById(sent0.transactionId as string); + const r1 = await pendingStorage.findById(sent1.transactionId as string); + expect(r0).toBeDefined(); + expect(r1).toBeDefined(); + }); + + it("should retry a transaction if first attempt fails", async () => { + const pendingStorage: PendingL1TransactionStorage = + new InMemoryPendingL1TransactionStorage(); + const sender = makeSender(pendingStorage); + await sender.close(); + + const { tx } = makeTx({ senderBase58: "S", nonce: 0, hash: "H-R1" }); + + // First send returns hash H-R1 and then wait() rejects. + const wait1 = deferred<{ hash: string }>(); + const wait2 = deferred<{ hash: string }>(); + const pending1 = { + hash: "H-R1", + wait: jest.fn(async () => await wait1.promise), + }; + const pending2 = { + hash: "H-R2", + wait: jest.fn(async () => await wait2.promise), + }; + + (tx as any).send = jest + .fn() + .mockImplementationOnce(async () => pending1) + .mockImplementationOnce(async () => pending2); + + // Wait for first send ("sent" resolves immediately after tx.send()) so handlers are installed. + const { transactionId } = await sender.proveAndSendTransaction(tx, "sent"); + + // Fail first attempt (wait rejects), which should trigger retryTransaction and a second send. + wait1.reject(new Error("first attempt failed")); + await flush(); + await flush(); + + // Second send should happen. + expect((tx as any).send).toHaveBeenCalledTimes(2); + + // Let second attempt include. + wait2.resolve({ hash: "H-R2" }); + + const record = await pendingStorage.findById(transactionId as string); + + expect(record?.status).toBe("sent"); + expect(record?.hash).toBe("H-R2"); + // Note: attempts increments by 2 on retry due to how retryTransaction builds the next record. + expect(record?.attempts).toBeGreaterThanOrEqual(2); + }); + + it("should stop retrying when shouldRetry returns false", async () => { + const pendingStorage: PendingL1TransactionStorage = + new InMemoryPendingL1TransactionStorage(); + const sender = makeSender(pendingStorage); + await sender.close(); + + // Force "no retry" + sender.retryStrategy.shouldRetry = jest.fn(async () => false); + + const { tx } = makeTx({ senderBase58: "S", nonce: 0, hash: "H-F" }); + + const waitFail = deferred<{ hash: string }>(); + const pending = { + hash: "H-F", + wait: jest.fn(async () => await waitFail.promise), + }; + (tx as any).send = jest.fn(async () => pending); + + const promise = sender.proveAndSendTransaction(tx, "included"); + + await flush(); + waitFail.reject(new Error("no more attempts")); + + await expect(promise).rejects.toBeDefined(); + }); +}); +/* eslint-enable @typescript-eslint/no-unsafe-assignment */ diff --git a/packages/sequencer/test/settlement/Settlement.ts b/packages/sequencer/test/settlement/Settlement.ts index cfc97c045..01485fd11 100644 --- a/packages/sequencer/test/settlement/Settlement.ts +++ b/packages/sequencer/test/settlement/Settlement.ts @@ -59,11 +59,11 @@ import { import { BlockProofSerializer } from "../../src/protocol/production/tasks/serializers/BlockProofSerializer"; import { testingSequencerModules } from "../TestingSequencer"; import { createTransaction } from "../integration/utils"; -import { FeeStrategy } from "../../src/protocol/baselayer/fees/FeeStrategy"; import { BridgingModule } from "../../src/settlement/BridgingModule"; import { FungibleTokenContractModule } from "../../src/settlement/utils/FungibleTokenContractModule"; import { FungibleTokenAdminContractModule } from "../../src/settlement/utils/FungibleTokenAdminContractModule"; import { MinaNetworkUtils } from "../../src/protocol/baselayer/network-utils/MinaNetworkUtils"; +import { DefaultL1TransactionRetryStrategy } from "../../src/settlement/transactions/DefaultL1TransactionRetryStrategy"; import { Balances, BalancesKey } from "./mocks/Balances"; import { WithdrawalMessageProcessor, Withdrawals } from "./mocks/Withdrawals"; @@ -107,8 +107,6 @@ export const settlementTestFn = ( let blockQueue: BlockQueue; let userPublicKey: PublicKey; - let feeStrategy: FeeStrategy; - let blockSerializer: BlockProofSerializer; const bridgedTokenId = @@ -130,6 +128,7 @@ export const settlementTestFn = ( BaseLayer: MinaBaseLayer, SettlementModule: SettlementModule, SettlementSigner: InMemoryMinaSigner, + L1TransactionRetryStrategy: DefaultL1TransactionRetryStrategy, }, { SettlementProvingTask, @@ -185,6 +184,7 @@ export const settlementTestFn = ( FeeStrategy: {}, SettlementModule: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, TaskQueue: { simulatedDuration: 0, @@ -280,7 +280,6 @@ export const settlementTestFn = ( "BlockTrigger" ); blockQueue = appChain.sequencer.resolve("BlockQueue") as BlockQueue; - feeStrategy = appChain.sequencer.resolve("FeeStrategy") as FeeStrategy; blockSerializer = appChain.sequencer.dependencyContainer.resolve(BlockProofSerializer); @@ -360,7 +359,6 @@ export const settlementTestFn = ( sender: sequencerKey.toPublicKey(), memo: "Deploy custom token", nonce: nonceCounter++, - fee: feeStrategy.getFee(), }, async () => { AccountUpdate.fundNewAccount(sequencerKey.toPublicKey(), 3); @@ -428,7 +426,6 @@ export const settlementTestFn = ( sender: sequencerKey.toPublicKey(), memo: "Mint custom token", nonce: nonceCounter++, - fee: feeStrategy.getFee(), }, async () => { AccountUpdate.fundNewAccount(sequencerKey.toPublicKey(), 1); @@ -743,12 +740,10 @@ export const settlementTestFn = ( const amount = BigInt(1e9 * 10); - const fee = feeStrategy.getFee(); const tx = await Mina.transaction( { sender: userKey.toPublicKey(), nonce: user0Nonce++, - fee, memo: "Redeem withdrawal", }, async () => { @@ -791,7 +786,7 @@ export const settlementTestFn = ( ).balance.toBigInt(); // tx fee - const minaFees = BigInt(fee); + const minaFees = BigInt(tx.transaction.feePayer.body.fee.toString()); expect((balanceAfter - balanceBefore).toString()).toBe( (amount - (tokenConfig === undefined ? minaFees : 0n)).toString()