Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nx.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"prisma:generate": {
"inputs": [
"{projectRoot}/package.json",
"{projectRoot}/prisma/*",
"{projectRoot}/prisma/**/*"
]
},
Expand Down
263 changes: 204 additions & 59 deletions package-lock.json

Large diffs are not rendered by default.

17 changes: 14 additions & 3 deletions packages/api/src/graphql/modules/MerkleWitnessResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import { Length } from "class-validator";
import { inject } from "tsyringe";
import { RollupMerkleTree, RollupMerkleTreeWitness } from "@proto-kit/common";
import {
AsyncMerkleTreeStore,
BlockStorage,
CachedMerkleTreeStore,
MaskName,
TreeStoreCreator,
} from "@proto-kit/sequencer";

import { GraphqlModule, graphqlModule } from "../GraphqlModule";
Expand Down Expand Up @@ -34,7 +36,9 @@ export class MerkleWitnessDTO {
@graphqlModule()
export class MerkleWitnessResolver extends GraphqlModule<object> {
public constructor(
@inject("AsyncMerkleStore") private readonly treeStore: AsyncMerkleTreeStore
@inject("TreeStoreCreator")
private readonly treeStoreCreator: TreeStoreCreator,
@inject("BlockStorage") private readonly blockStorage: BlockStorage
) {
super();
}
Expand All @@ -44,7 +48,14 @@ export class MerkleWitnessResolver extends GraphqlModule<object> {
"Allows retrieval of merkle witnesses corresponding to a specific path in the appchain's state tree. These proves are generally retrieved from the current 'proven' state",
})
public async witness(@Arg("path") path: string) {
const syncStore = new CachedMerkleTreeStore(this.treeStore);
const latestBlock = await this.blockStorage.getLatestBlock();
const maskName =
latestBlock !== undefined
? MaskName.block(latestBlock.block.height)
: MaskName.base();
const treeStore = this.treeStoreCreator.getMask(maskName);

const syncStore = new CachedMerkleTreeStore(treeStore);
await syncStore.preloadKey(BigInt(path));

const tree = new RollupMerkleTree(syncStore);
Expand Down
2 changes: 1 addition & 1 deletion packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"dependencies": {
"@envelop/extended-validation": "^4.1.0",
"@inkjs/ui": "^1.0.0",
"@prisma/client": "^5.18.0",
"@prisma/client": "^5.22.0",
"@types/yargs": "^17.0.29",
"figlet": "^1.7.0",
"ink": "^4.4.1",
Expand Down
4 changes: 2 additions & 2 deletions packages/persistance/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
"access": "public"
},
"dependencies": {
"@prisma/client": "^5.18.0",
"@prisma/client": "^5.22.0",
"lodash": "^4.17.21",
"prisma": "^5.18.0",
"prisma": "^5.22.0",
"redis": "^4.6.12",
"reflect-metadata": "^0.1.13"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Warnings:

- The primary key for the `State` table will be changed. If it partially fails, the table could be left without primary key constraint.
- You are about to drop the column `mask` on the `State` table. All the data in the column will be lost.
- Added the required column `maskId` to the `State` table without a default value. This is not possible if the table is not empty.

*/
-- AlterTable
TRUNCATE TABLE "State";
ALTER TABLE "State" DROP CONSTRAINT "State_pkey",
DROP COLUMN "mask",
ADD COLUMN "maskId" INTEGER NOT NULL,
ADD CONSTRAINT "State_pkey" PRIMARY KEY ("path", "maskId");

-- CreateTable
CREATE TABLE "Mask" (
"id" SERIAL NOT NULL,
"name" VARCHAR(256) NOT NULL,
"parent" INTEGER,

CONSTRAINT "Mask_pkey" PRIMARY KEY ("id")
);

-- AddForeignKey
ALTER TABLE "State" ADD CONSTRAINT "State_maskId_fkey" FOREIGN KEY ("maskId") REFERENCES "Mask"("id") ON DELETE RESTRICT ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "Mask" ADD CONSTRAINT "Mask_parent_fkey" FOREIGN KEY ("parent") REFERENCES "Mask"("id") ON DELETE SET NULL ON UPDATE CASCADE;
19 changes: 14 additions & 5 deletions packages/persistance/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

generator client {
provider = "prisma-client-js"
// output = "./../node_modules/.prisma/client"
// Enable after upgrade to 5.9.0
// previewFeatures = ["relationJoins"]
}

datasource db {
Expand All @@ -16,9 +13,21 @@ datasource db {
model State {
path Decimal @db.Decimal(78, 0)
values Decimal[] @db.Decimal(78, 0)
mask String @db.VarChar(256)

@@id([path, mask])
maskId Int
mask Mask @relation(fields: [maskId], references: [id])

@@id([path, maskId])
}

model Mask {
id Int @id @default(autoincrement())
name String @db.VarChar(256)

parent Int?
parentMask Mask? @relation("ParentMasks", fields: [parent], references: [id])
children Mask[] @relation("ParentMasks")
State State[]
}

model Transaction {
Expand Down
14 changes: 6 additions & 8 deletions packages/persistance/src/PrismaDatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import {
} from "@proto-kit/sequencer";
import { DependencyFactory, OmitKeys } from "@proto-kit/common";

import { PrismaStateService } from "./services/prisma/PrismaStateService";
import { PrismaBatchStore } from "./services/prisma/PrismaBatchStore";
import { PrismaBlockStorage } from "./services/prisma/PrismaBlockStorage";
import { PrismaSettlementStorage } from "./services/prisma/PrismaSettlementStorage";
import { PrismaMessageStorage } from "./services/prisma/PrismaMessageStorage";
import { PrismaTransactionStorage } from "./services/prisma/PrismaTransactionStorage";
import { PrismaStateServiceCreator } from "./creators/PrismaStateServiceCreator";

export interface PrismaDatabaseConfig {
// Either object-based config or connection string
Expand Down Expand Up @@ -49,12 +49,9 @@ export class PrismaDatabaseConnection

public dependencies(): OmitKeys<
StorageDependencyMinimumDependencies,
"asyncMerkleStore" | "blockTreeStore" | "unprovenMerkleStore"
"blockTreeStore" | "treeStoreCreator"
> {
return {
asyncStateService: {
useFactory: () => new PrismaStateService(this, "batch"),
},
batchStorage: {
useClass: PrismaBatchStore,
},
Expand All @@ -64,9 +61,6 @@ export class PrismaDatabaseConnection
blockStorage: {
useClass: PrismaBlockStorage,
},
unprovenStateService: {
useFactory: () => new PrismaStateService(this, "block"),
},
settlementStorage: {
useClass: PrismaSettlementStorage,
},
Expand All @@ -76,6 +70,9 @@ export class PrismaDatabaseConnection
transactionStorage: {
useClass: PrismaTransactionStorage,
},
stateServiceCreator: {
useClass: PrismaStateServiceCreator,
},
};
}

Expand All @@ -90,6 +87,7 @@ export class PrismaDatabaseConnection
"Settlement",
"IncomingMessageBatch",
"IncomingMessageBatchTransaction",
"Mask",
];

await this.prismaClient.$transaction(
Expand Down
6 changes: 4 additions & 2 deletions packages/persistance/src/PrismaRedisDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
StorageDependencyMinimumDependencies,
Database,
closeable,
Sequencer,
} from "@proto-kit/sequencer";
import { ChildContainerProvider } from "@proto-kit/common";
import { PrismaClient } from "@prisma/client";
Expand All @@ -20,6 +21,7 @@ import {
RedisConnectionModule,
RedisTransaction,
} from "./RedisConnection";
import { inject } from "tsyringe";

export interface PrismaRedisCombinedConfig {
prisma: PrismaDatabaseConfig;
Expand All @@ -36,10 +38,10 @@ export class PrismaRedisDatabase

public redis: RedisConnectionModule;

public constructor() {
public constructor(@inject("Sequencer") sequencer: Sequencer<any>) {
super();
this.prisma = new PrismaDatabaseConnection();
this.redis = new RedisConnectionModule();
this.redis = new RedisConnectionModule(sequencer);
}

public get prismaClient(): PrismaClient {
Expand Down
31 changes: 23 additions & 8 deletions packages/persistance/src/RedisConnection.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { createClient, RedisClientType } from "redis";
import {
Sequencer,
SequencerModule,
StorageDependencyMinimumDependencies,
} from "@proto-kit/sequencer";
import { DependencyFactory } from "@proto-kit/common";
import { DependencyFactory, log } from "@proto-kit/common";
import isArray from "lodash/isArray";

import { RedisMerkleTreeStore } from "./services/redis/RedisMerkleTreeStore";
import { RedisTreeStoreCreator } from "./creators/RedisTreeStoreCreator";
import { TreeMaskRecovery } from "./TreeMaskRecovery";

export interface RedisConnectionConfig {
host: string;
Expand All @@ -26,6 +29,10 @@ export class RedisConnectionModule
extends SequencerModule<RedisConnectionConfig>
implements DependencyFactory, RedisConnection
{
public constructor(private readonly sequencer: Sequencer<any>) {
super();
}

private client?: RedisClientType;

public get redisClient(): RedisClientType {
Expand All @@ -39,18 +46,15 @@ export class RedisConnectionModule

public dependencies(): Pick<
StorageDependencyMinimumDependencies,
"asyncMerkleStore" | "blockTreeStore" | "unprovenMerkleStore"
"blockTreeStore" | "treeStoreCreator"
> {
return {
asyncMerkleStore: {
useFactory: () => new RedisMerkleTreeStore(this),
},
unprovenMerkleStore: {
useFactory: () => new RedisMerkleTreeStore(this, "unproven"),
},
blockTreeStore: {
useFactory: () => new RedisMerkleTreeStore(this, "blockHash"),
},
treeStoreCreator: {
useClass: RedisTreeStoreCreator,
},
};
}

Expand Down Expand Up @@ -81,8 +85,19 @@ export class RedisConnectionModule
}
}

private async recoverMemoryMasks() {
log.info("Starting recovery of tree masks");

const recovery =
this.sequencer.dependencyContainer.resolve(TreeMaskRecovery);

await recovery.recreateMasks();
}

public async start(): Promise<void> {
await this.init();

await this.recoverMemoryMasks();
}

public async close() {
Expand Down
87 changes: 87 additions & 0 deletions packages/persistance/src/TreeMaskRecovery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { inject, injectable } from "tsyringe";
import {
applyStateDiff,
assertBlockHasResult,
AsyncMerkleTreeStore,
BlockQueue,
BlockWithResult,
CachedMerkleTreeStore,
collectStateDiff,
MaskName,
TreeStoreCreator,
} from "@proto-kit/sequencer";
import { reduceSequential } from "@proto-kit/common";

@injectable()
export class TreeMaskRecovery {
public constructor(
@inject("BlockQueue")
private readonly blockQueue: BlockQueue,
@inject("TreeStoreCreator")
private readonly treeStoreCreator: TreeStoreCreator
) {}

/**
* Strategy:
* 1. Collect all STs
* 2. Collect a distinct diff of all to-values (state diff)
* 3. Iteratively apply that to the tree
*/
private async applyBlock(mask: AsyncMerkleTreeStore, block: BlockWithResult) {
const transitions = [block.block.beforeBlockStateTransitions];
const txTransitions = block.block.transactions.flatMap((tx) =>
tx.stateTransitions
.filter((batch) => batch.applied)
.map((batch) => batch.stateTransitions)
);
transitions.push(...txTransitions);
transitions.push(block.result.afterBlockStateTransitions);

const stateDiff = collectStateDiff(transitions.flat());
const cache = new CachedMerkleTreeStore(mask);

await applyStateDiff(cache, stateDiff);

await cache.mergeIntoParent();
}

/**
* This method fetches all pending (i.e. un-batched) blocks and recreates
* their in-memory tree masks.
*
* The crash might have happened between block production and result generation.
* In this case, we need to first recreate all earlier masks, then generate the result.
* The last part happens automatically in BlockProducerModule.start().
* However, since normal block production doesn't do any tree ops, we can skip
* that potential incomplete block altogether.
*/
public async recreateMasks() {
const blocks = await this.blockQueue.getPendingBlocks();

await reduceSequential(
blocks,
async (previousMask, block) => {
if (previousMask === undefined) {
throw new Error(
"More than one result missing for the recent blocks, something is wrong"
);
}
if (block.result !== undefined) {
const maskName = MaskName.block(block.block.height);
const mask = await this.treeStoreCreator.createMask(
maskName,
previousMask
);

assertBlockHasResult(block);

await this.applyBlock(mask, block);

return maskName;
}
return undefined;
},
MaskName.base() as string | undefined
);
}
}
Loading