Skip to content

Conversation

@sameerkrdev
Copy link
Owner

@sameerkrdev sameerkrdev commented Feb 1, 2026

Summary by CodeRabbit

  • New Features

    • Matching engine: better detection/emission for resting orders filled during matching; trade records include engine_id; orders now record client and gateway timestamps.
  • Bug Fixes

    • WAL checksum and file filtering improved for replay reliability.
  • Chores

    • Removed ledger, market-data, and notification service manifests and configs.
    • Kafka topic renamed to engine-events; tooling and client constants updated.
    • Expanded request/error logging across order APIs; minor consumer/serialization adjustments.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Feb 1, 2026

Caution

Review failed

The pull request is closed.

Walkthrough

This PR renames Kafka topic "orders" → "engine-events", adds engine_id to Trade, updates matching-engine internals (matching, WAL, event building, logging, actor messages), adjusts Kafka client/consumer payload handling, timestamps/orders updates, and removes package manifests/config and startup logs for three services; also adds a utils workspace dep to api-gateway.

Changes

Cohort / File(s) Summary
Service removals
apps/ledger-service/*, apps/market-data-service/*, apps/notification-service/*
Removed package.json and tsconfig.json files, deleted small eslint re-export modules, and removed startup logger calls—effectively decommissioning these app manifests and startup logging.
API Gateway
apps/api-gateway/package.json, apps/api-gateway/src/controllers/order.controller.ts
Added workspace dependency @repo/utils to package.json; replaced three hardcoded userId constants in order controller.
Matching engine — core
apps/matching-engine/internal/engine.go, apps/matching-engine/internal/utils.go
MatchOrder now returns (trades, filledRestingOrders); events building updated to emit ORDER_FILLED for resting orders; EncodeOrderStatusEvent/EncodeOrderReducedEvent signatures extended; market order and cancel/modify accounting adjusted.
Matching engine — infra & actors
apps/matching-engine/internal/actor_registy.go, apps/matching-engine/internal/wal.go, apps/matching-engine/internal/kafka.go, apps/matching-engine/internal/server.go, apps/matching-engine/cmd/matching-engine/main.go, apps/matching-engine/wal/*
Replaced direct ReplayWal call with logged replayFlow and non-fatal errors per symbol; WAL file filtering tightened (.log-only) and CRC computation adjusted; Kafka topic emit changed to "engine-events"; added request/error logging; increased KafkaEmitMM values.
Kafka topic/clients
packages/kafka-client/src/constants.ts, packages/kafka-client/src/index.ts, infra/docker/kafka/create-topics.sh, package.json
Replaced ORDERS → ENGINE_EVENTS constant and infra topic; kafka client now passes raw message to handler (removed parse/validation step); docker create-topics and npm kafka scripts updated (kafka → kafka-1 container reference).
Order / trade surfaces
apps/order-service/src/controllers/order.controller.ts, apps/order-trade-worker-service/src/controllers/*, apps/order-trade-worker-service/src/kafka.consumer.ts
Added clientTimestamp & gatewayTimestamp to PlaceOrderRequest; error handling now returns err.message; trade controller now maps engine_id instead of id; consumer switched to ENGINE_EVENTS topic and changed mapping/conversion to use ToJSON helpers and expanded ORDER_REJECTED handling.
Database schema
packages/prisma/prisma/schema/trade.prisma, packages/prisma/prisma/migrations/*/migration.sql
Added non-null engine_id field to Trade model and corresponding migration SQL (ALTER TABLE ADD COLUMN engine_id TEXT NOT NULL).
Misc / cleanup
apps/websocket-server/internal/gateway.go, apps/matching-engine/wal/SOLUSD/checkpoint.meta, apps/*/eslint.config.mjs
Removed stray debug prints; updated WAL checkpoint metadata; removed several small eslint re-export files.

Sequence Diagram(s)

sequenceDiagram
  rect rgba(66,134,244,0.5)
    participant Client
  end
  rect rgba(52,199,89,0.5)
    participant APIGateway
  end
  rect rgba(255,159,28,0.5)
    participant MatchingEngine
  end
  rect rgba(153,102,255,0.5)
    participant WAL
  end
  rect rgba(242,38,19,0.5)
    participant Kafka
  end

  Client->>APIGateway: PlaceOrder (with clientTimestamp, gatewayTimestamp)
  APIGateway->>MatchingEngine: PlaceOrderMsg (Order, replay chan, Err chan)
  MatchingEngine->>MatchingEngine: MatchOrder => (trades, filledRestingOrders)
  MatchingEngine->>WAL: append EngineEvents (including filled resting orders)
  WAL-->>MatchingEngine: ack
  MatchingEngine->>Kafka: emit "engine-events" (ORDER_ACCEPTED/ORDER_FILLED/etc)
  Kafka-->>Consumers: delivered as raw payloads (ENGINE_EVENTS topic)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Fix/err bugs while testing #18: Appears to contain nearly identical changes across matching-engine, api-gateway, kafka-client constants, and multiple app removals — likely the same or closely related patchset.
  • Feat/matching engine #12: Related changes to matching-engine internals (actor registry, engine logic, WAL replay) that overlap with core engine modifications here.
  • Feat/kafka #9: Related Kafka topic/constant renames and Kafka wiring changes (ORDERS → ENGINE_EVENTS) touching kafka-client and consumers.

Poem

🐇 I hopped through logs and WALs so deep,

Swapped "orders" for "engine-events" in a leap,
Gave trades an engine_id to show,
Filled resting orders now loudly glow,
A tiny rabbit cheers the streaming heap!

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The PR title 'Fix/err bugs while testing' is vague and generic, using non-descriptive terms that don't convey meaningful information about the substantial changes across the codebase. Provide a more specific title that reflects the main change, such as 'Migrate from orders to engine-events topic and enhance matching engine' or similar.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/err-bugs-while-testing

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
apps/order-service/src/controllers/order.controller.ts (2)

37-47: ⚠️ Potential issue | 🟠 Major

Validate/fallback timestamps for backward compatibility.

clientTimestamp/gatewayTimestamp are forwarded as-is; older callers or tests may omit them, resulting in undefined values and potential engine validation failures. Consider defaulting to server time and validating range/finite values.

Suggested fix
-      const request: PlaceOrderRequest = {
+      const now = Date.now();
+      const clientTimestamp = order.clientTimestamp ?? now;
+      const gatewayTimestamp = order.gatewayTimestamp ?? now;
+      if (!Number.isFinite(clientTimestamp) || clientTimestamp <= 0) {
+        throw new Error("Invalid clientTimestamp");
+      }
+
+      const request: PlaceOrderRequest = {
         clientOrderId: orderId,
         symbol: order.symbol,
         price: order.price,
         quantity: order.quantity,
         side: order.side,
         type: order.type,
         userId: order.userId,
-        clientTimestamp: order.clientTimestamp,
-        gatewayTimestamp: order.gatewayTimestamp,
+        clientTimestamp,
+        gatewayTimestamp,
       };

79-87: ⚠️ Potential issue | 🟠 Major

Avoid leaking internal error details to clients.

Returning err.message directly can expose internal system details across place/cancel/modify flows. Prefer a stable public message while logging the full error server-side (you already log err.message/stack).

Suggested fix
-      callback(
-        {
-          code: grpc.status.INTERNAL,
-          message: err.message,
-          name: "CreateOrderError",
-        } as grpc.ServiceError,
-        null,
-      );
+      const publicMessage = "Internal server error";
+      callback(
+        {
+          code: grpc.status.INTERNAL,
+          message: publicMessage,
+          name: "CreateOrderError",
+        } as grpc.ServiceError,
+        null,
+      );

Also applies to: 128-135, 184-191

apps/matching-engine/internal/wal.go (2)

340-340: ⚠️ Potential issue | 🟠 Major

Resource leak: defer file.Close() inside loop accumulates defers.

Using defer inside a loop doesn't close files immediately—all defers execute only when the function returns. If there are many WAL segments, this can exhaust file descriptors.

Proposed fix: Extract to helper or close explicitly
 	for _, dirEntry := range entries {
 		if dirEntry.IsDir() || !strings.HasSuffix(dirEntry.Name(), ".log") {
 			continue
 		}

 		path := filepath.Join(sw.dirPath, dirEntry.Name())
-		file, err := os.Open(path)
-		if err != nil {
-			return nil, err
-		}
-		defer file.Close()
-
-		for {
-			// ... reading logic
-		}
+		segmentEntries, err := sw.readSegmentFile(path, from, to, &results)
+		if err != nil {
+			return nil, err
+		}
+		if segmentEntries != nil {
+			return segmentEntries, nil // early return case
+		}
 	}

Alternatively, close explicitly at the end of each loop iteration:

file, err := os.Open(path)
if err != nil {
    return nil, err
}

// ... reading logic

file.Close() // explicit close before next iteration

404-404: ⚠️ Potential issue | 🟠 Major

Same resource leak issue: defer file.Close() inside loop.

Apply the same fix as suggested for ReadFromTo.

apps/matching-engine/internal/engine.go (2)

789-829: ⚠️ Potential issue | 🟠 Major

New order quantity ignores previously cancelled volume when newQuantity is provided.

If the order was reduced earlier, newOrderQuantity = newQuantity - oldFilledQuantity can resurrect cancelled size. Use the same “executed” definition as ModifyOrderInternal (Quantity - RemainingQuantity).

🐛 Suggested fix
- oldRemainingQuantity := order.RemainingQuantity
- oldFilledQuantity := order.FilledQuantity
+ oldRemainingQuantity := order.RemainingQuantity
+ executed := order.Quantity - oldRemainingQuantity

  // ...
  newOrderQuantity := oldRemainingQuantity
  if newQuantity != nil {
-   newOrderQuantity = *newQuantity - oldFilledQuantity
+   newOrderQuantity = *newQuantity - executed
  }

  if newOrderQuantity <= 0 {
    return nil, fmt.Errorf("nothing remaining after accounting for executed quantity")
  }

1138-1193: ⚠️ Potential issue | 🟠 Major

Avoid dumping full order structs/user IDs to stdout during WAL replay.

These fmt.Println calls include full orders and user identifiers, which is noisy and can leak sensitive data. Please gate behind a debug flag or use a structured logger with redaction.

Also applies to: 1257-1297, 1318-1346

🤖 Fix all issues with AI agents
In `@apps/api-gateway/package.json`:
- Line 26: The package.json declares an unused workspace dependency
"@repo/utils"; remove the "@repo/utils": "workspace:*" entry from package.json,
update the lockfile by running your package manager (npm/yarn/pnpm install) and
run a quick repo-wide search to confirm no imports reference "@repo/utils" (so
you don't break anything). After removal, run the api-gateway build and tests
(e.g., yarn build / yarn test) to ensure nothing was relying on that package.

In `@apps/api-gateway/src/controllers/order.controller.ts`:
- Line 25: The hardcoded const userId in order.controller.ts is a security risk;
replace it by reading the authenticated user id injected by your auth middleware
(e.g., use req.userId or req.user.id) inside the same controller function where
const userId is declared, validate that it exists and return a 401/403 error if
missing, and remove the TODO; update any downstream usage in create/update/get
order handlers to use this dynamic userId and add a unit/integration test to
ensure unauthenticated requests are rejected.

In `@apps/matching-engine/internal/actor_registy.go`:
- Around line 33-40: There is a typo: log messages and the WAL replay method are
using "Replying"/"ReplyWal" instead of "Replaying"/"ReplayWal"; rename the
method ReplyWal to ReplayWal (both its definition and all call sites such as
actor.ReplyWal -> actor.ReplayWal) and update the slog.Info messages that
reference "Replying" to "Replaying" (including the start, failure and completion
messages that format sym.Name and len(actor.engine.AllOrders)); ensure the
renamed method signature and any interface implementations are updated to match.

In `@apps/matching-engine/internal/engine.go`:
- Around line 1347-1350: The ORDER_FILLED handling currently returns an error
when the order ID is missing (order, exists := a.engine.AllOrders[event.OrderId]
... return fmt.Errorf(...)), which aborts WAL replay on duplicates; change this
to be idempotent by skipping missing orders instead of returning an error—e.g.,
when exists is false, log a warning or debug message referencing event.OrderId
and continue processing (mirror ORDER_REJECTED’s continue behavior) so duplicate
or retried ORDER_FILLED events don’t abort replay; update the block in the
ORDER_FILLED handling that reads from a.engine.AllOrders to perform this
check-and-skip behavior.

In `@apps/matching-engine/internal/server.go`:
- Around line 33-34: Change the log message string passed to slog.Info from
"Request to place a order" to "Request to place an order" (the call invoking
slog.Info with the "order" field should be updated; locate the slog.Info(...)
invocation in server.go that logs the order variable).
- Line 67: Update the log message string in the slog.Info call so the grammar is
correct: change "Request for cancel a order" to "Request to cancel an order"
(locate the slog.Info(...) invocation that includes "orderId" and "symbol" in
server.go).
- Around line 83-90: Update the logging call in slog.Info used when modifying an
order: correct the message text from "Request to modify a order" to "Request to
modify an order" and make the log field keys use a consistent casing style
(e.g., change "ClientModifyId", "NewPrice", "NewQuantity" to camelCase
"clientModifyId", "newPrice", "newQuantity") so they match the existing keys
like "orderId", "symbol", "userId"; keep the same call to slog.Info and the same
field values (req.ClientModifyId, req.NewPrice, req.NewQuantity) but rename the
logged key strings to the chosen consistent style.

In `@apps/matching-engine/internal/wal.go`:
- Around line 449-452: The CRC change now appends an 8-byte little-endian
sequence (entry.GetSequenceNumber()) before computing actualChecksum, which
breaks compatibility with older WAL entries that used a single-byte sequence;
fix by adding explicit versioning to WAL entries and making the reader tolerant:
define a WAL version field/constant, update the writer to emit the new version
marker then continue using the 8-byte LE sequence (keep the existing write path
that already uses 8 bytes), and update the reader to first read the version
marker and branch—if version==old (pre-change) compute checksum using the
single-byte sequence path, if version==new compute checksum using the 8-byte
sequence as done where actualChecksum is computed—alternatively provide a
migration/purge tool to upgrade existing WAL files before deployment if you
cannot add versioning now.

In `@apps/order-trade-worker-service/src/controllers/order.controller.ts`:
- Line 58: The property mapping in order.controller.ts uses the misspelled DB
column "canelled_quantity" (seen as canelled_quantity: data.cancelledQuantity);
plan a coordinated refactor: add a Prisma schema change to rename the column to
"cancelled_quantity" and create a migration that preserves data (use a SQL
RENAME or create new column + copy + drop), update the Prisma client and
regenerate, update all service mappings (e.g., the mapping in
order.controller.ts) to use the new "cancelled_quantity" field, and track this
as a migration/refactor task to coordinate deploy order across services and
proto consumers to avoid downtime.

In `@apps/order-trade-worker-service/src/kafka.consumer.ts`:
- Around line 174-188: The ORDER_ACCEPTED and ORDER_REJECTED handlers call
orderController.createOrder using non-null assertions on gatewayTimestamp,
clientTimestamp, and engineTimestamp; instead, validate these optional fields
coming from OrderStatusEvent.decode() before passing them in: check that
data.gatewayTimestamp, data.clientTimestamp, and data.engineTimestamp are
present (or provide safe defaults) and return/log an error or skip creating the
order if any required timestamp is missing; update the code paths in the
handlers that build the createOrder payload (referencing
orderController.createOrder, ORDER_ACCEPTED, ORDER_REJECTED, and
OrderStatusEvent.decode) to remove the "!" assertions and use the
guarded/validated values.
- Around line 22-25: The code is casting JSON strings produced by
sideToJSON/orderTypeToJSON/orderStatusToJSON into Prisma enums via unsafe "as
unknown as" casts; remove those casts and instead implement explicit conversion:
create mapping functions (e.g., mapProtoSideToPrisma(side: number | string):
OrderSide, mapProtoTypeToPrisma(...): OrderType, mapProtoStatusToPrisma(...):
OrderStatus) that accept the proto numeric or JSON string output and return the
correctly typed Prisma enum values, then replace usages of
sideToJSON/orderTypeToJSON/orderStatusToJSON + casts with calls to these mapping
functions in the kafka.consumer handlers (where
sideToJSON/orderTypeToJSON/orderStatusToJSON are used) and in
order.controller.ts (replace lines using the unsafe casts with the mapping
calls) so TypeScript no longer needs unsafe casts and the values match Prisma
enum types.

In `@packages/kafka-client/src/constants.ts`:
- Line 2: Rename the misspelled constant ENGINE_ENVENTS to ENGINE_EVENTS in
packages/kafka-client's constants (update the exported symbol name
ENGINE_ENVENTS -> ENGINE_EVENTS) and update all references/usages across the
codebase to use ENGINE_EVENTS to avoid breaking consumers; ensure exports and
any imports (tests, consumers, or internal modules) are updated and run the test
suite to confirm no missing symbol errors.

In
`@packages/prisma/prisma/migrations/20260131123512_add_trade_engine_id/migration.sql`:
- Around line 1-8: The migration currently adds engine_id as NOT NULL in the
ALTER TABLE "trades" statement which will fail on non-empty tables; change the
migration to first ADD COLUMN "engine_id" TEXT NULL (nullable), run a backfill
step that populates trades.engine_id for existing rows (using the appropriate
default or lookup logic), and then create a follow-up migration that ALTER TABLE
"trades" ALTER COLUMN "engine_id" SET NOT NULL to enforce the constraint once
data is populated; reference the ALTER TABLE "trades" ADD COLUMN "engine_id"
statement when making these edits and ensure the backfill uses the same column
name and expected value source.

In `@packages/prisma/prisma/schema/trade.prisma`:
- Around line 2-3: The TypeScript definitions for Trade and TradeData are
missing the required engine_id field declared in the Prisma schema; update the
interfaces in packages/types/trade.d.ts to add engine_id: string on both the
Trade and TradeData interfaces so the fetched DB objects match the schema (and
scan for any derived types or factory functions that need the new property
included as required).
🧹 Nitpick comments (8)
apps/matching-engine/wal/SOLUSD/checkpoint.meta (1)

1-1: Consider excluding WAL checkpoint files from version control.

This checkpoint metadata file appears to track WAL sequence state, which is runtime data. Committing this file could cause issues when multiple developers work on the codebase or when deploying to different environments, as it may conflict with actual runtime state.

Consider adding apps/matching-engine/wal/ to .gitignore if this directory contains only runtime artifacts.

apps/matching-engine/internal/actor_registy.go (1)

37-37: Use slog.Error or slog.Warn for failure logs.

Using slog.Info for error conditions makes it harder to filter and monitor failures. Consider using an appropriate log level.

Proposed fix
-			slog.Info(fmt.Sprintf("Replying the %s orderbook Failed. Error: %s", sym.Name, err.Error()))
+			slog.Error(fmt.Sprintf("Replaying the %s orderbook Failed", sym.Name), "error", err)
apps/matching-engine/internal/wal.go (2)

331-331: Use strings.HasSuffix instead of strings.Contains for stricter filtering.

strings.Contains(name, ".log") could match unintended files like backup.log.bak or debug.log.old. Using strings.HasSuffix(name, ".log") ensures only actual .log files are processed.

Proposed fix
-		if dirEntry.IsDir() || !strings.Contains(dirEntry.Name(), ".log") {
+		if dirEntry.IsDir() || !strings.HasSuffix(dirEntry.Name(), ".log") {

394-394: Apply the same HasSuffix fix here for consistency.

Proposed fix
-		if entry.IsDir() || !strings.Contains(entry.Name(), ".log") {
+		if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".log") {
apps/matching-engine/internal/server.go (1)

95-103: Consider consistent log field naming.

Error log field names mix casing conventions: orderId (camelCase) vs ClientModifyId, NewPrice, NewQuantity (PascalCase). Using consistent camelCase throughout would improve log parsing and querying.

Proposed fix
 	slog.Error("Failed to modify order",
 		"orderId", req.OrderId,
 		"symbol", req.Symbol,
 		"userId", req.UserId,
-		"ClientModifyId", req.ClientModifyId,
-		"NewPrice", req.NewPrice,
-		"NewQuantity", req.NewQuantity,
+		"clientModifyId", req.ClientModifyId,
+		"newPrice", req.NewPrice,
+		"newQuantity", req.NewQuantity,
 		"error", err,
 	)
packages/kafka-client/src/index.ts (1)

60-68: Avoid @ts-expect-error for the unused schema parameter.

It’s brittle because it fails once the compiler stops reporting an error; a no-op usage keeps the API stable without suppression.

🔧 Suggested adjustment
   async subscribe<T, X>(
     groupId: string,
     topic: string,
     handler: (message: T, topic: string, partition: number, headers: X) => Promise<void>,
-    // eslint-disable-next-line `@typescript-eslint/ban-ts-comment`
-    // `@ts-expect-error`
-    // eslint-disable-next-line `@typescript-eslint/no-unused-vars`
     schema?: z.ZodTypeAny,
   ) {
+    void schema; // keep for API compatibility until removed
apps/matching-engine/internal/utils.go (1)

56-85: Consider fixing the RemainingQuantiy typos in parameter names for clarity.

apps/order-trade-worker-service/src/kafka.consumer.ts (1)

48-52: Use the topic parameter in logs for accuracy and maintainability.

The hard-coded log message "Consumed message from 'orders' topic:" doesn't reflect the actual topic (engine-events). Use the topic parameter to keep logs accurate:

Suggested improvement
-        this.logger.info("Consumed message from 'orders' topic:", { topic, partition });
+        this.logger.info(`Consumed message from '${topic}' topic:`, { topic, partition });

@sameerkrdev sameerkrdev merged commit 64fbaf3 into main Feb 1, 2026
1 check was pending
@sameerkrdev sameerkrdev deleted the fix/err-bugs-while-testing branch February 1, 2026 22:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants