-
Notifications
You must be signed in to change notification settings - Fork 0
Feat/order trade worker service #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis PR refactors the matching engine's event system by adding depth and ticker event generation, restructures the database schema from maker/taker to buyer/seller order references, migrates order creation logic from order-worker to order-trade-worker service via Kafka event consumption, and addresses thread-safety and persistence issues. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 Buf (1.63.0)packages/proto-defs/proto/engine/order_matching.protoFailure: no .proto files were targeted. This can occur if no .proto files are found in your input, --path points to files that do not exist, or --exclude-path excludes all files. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 16
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
apps/matching-engine/internal/kafka.go (2)
93-96: Ignoring the error fromReadFromTocan mask WAL read failures.If the WAL read fails (e.g., corruption, I/O error), this silently continues with an empty/partial
eventsslice, potentially causing data loss or stale checkpoints.Proposed fix
- events, _ := kpw.wal.ReadFromTo( + events, err := kpw.wal.ReadFromTo( startOffset+1, startOffset+uint64(kpw.batchSize), ) + if err != nil { + log.Println("WAL read failed:", err) + return + }
46-71: Resource leak:checkpointFileis never closed.The file handle opened at line 57 is stored in the struct but there's no
Close()method or cleanup mechanism. Over time or on repeated instantiation, this leaks file descriptors.Proposed fix: Add a Close method
+func (kpw *KafkaProducerWorker) Close() error { + if kpw.checkpointFile != nil { + return kpw.checkpointFile.Close() + } + return nil +}Then ensure callers invoke
Close()when shutting down the worker.apps/matching-engine/internal/wal.go (1)
146-154: CRC verification will fail with the new 8‑byte sequence encoding.
Write now uses 8 bytes of sequence number, but the verifier still uses a single byte, so most entries will fail checksum validation during replay. Please updateunmarshalAndVerifyEntryto match the new CRC input.🧩 Proposed fix (update checksum verification)
func unmarshalAndVerifyEntry(data []byte) (*pbTypes.WAL_Entry, error) { @@ - actualChecksum := crc32.ChecksumIEEE(append(entry.GetData(), byte(entry.GetSequenceNumber()))) + var seqBytes [8]byte + binary.LittleEndian.PutUint64(seqBytes[:], entry.GetSequenceNumber()) + actualChecksum := crc32.ChecksumIEEE(append(entry.GetData(), seqBytes[:]...))
🤖 Fix all issues with AI agents
In `@apps/matching-engine/internal/engine.go`:
- Around line 1261-1263: During WAL replay the code incorrectly accumulates
cancelled amounts by using order.CancelledQuantity +=
event.NewCancelledQuantity; change this to assign the replayed value directly
(order.CancelledQuantity = event.NewCancelledQuantity) and ensure
RemainingQuantity is likewise set from event.NewRemainingQuantity
(order.RemainingQuantity = event.NewRemainingQuantity) so replay is idempotent;
update the logic in the replay handler where these fields are applied to use
direct assignment instead of incremental updates.
- Line 488: me.getTickerEvent is being called with me.Bids.BestPriceLevel.Price
and me.Asks.BestPriceLevel.Price which can panic if either BestPriceLevel is
nil; before the call, compute safe bestBidPrice and bestAskPrice by checking
me.Bids.BestPriceLevel != nil and me.Asks.BestPriceLevel != nil and use a
sensible default (e.g., 0 or NaN) or an optional/nullable value when nil, then
call me.getTickerEvent(trade.Price, bestBidPrice, bestAskPrice); ensure the
variable name tikcer is preserved or corrected after you replace the direct
field access to avoid nil dereference.
- Around line 488-494: The code currently appends tikcer to events when err !=
nil and then calls err.Error() when tikcer == nil which can cause logic
inversion and nil deref; update the block around me.getTickerEvent(trade.Price,
me.Bids.BestPriceLevel.Price, me.Asks.BestPriceLevel.Price) so that you append
the returned tikcer to events only when err == nil, and when tikcer is nil only
print or log err if err is non-nil (i.e., check err != nil before calling
err.Error()); also correct any misspelling of the variable name (tikcer ->
ticker) to avoid confusion and ensure the error variable used is the one
returned by getTickerEvent rather than a stale err from another call.
- Around line 1290-1307: The code can panic if order.PriceLevel is nil; before
calling level.Remove(order) and level.IsEmpty() (and before
obs.RemovePriceLevel(level)), add a nil check for order.PriceLevel: retrieve
level := order.PriceLevel, and if level != nil then call level.Remove(order),
compute obs (using a.engine.Asks / a.engine.Bids as currently done) and if
level.IsEmpty() call obs.RemovePriceLevel(level); always still delete the order
from a.engine.AllOrders (delete(a.engine.AllOrders, order.ClientOrderID))
regardless of price level being nil. This touches the logic around
a.engine.AllOrders, order.PriceLevel, level.Remove, level.IsEmpty and
obs.RemovePriceLevel.
- Around line 1018-1051: The getDepthEvent depth loops are wrong: they start at
i=1 (skipping the best level), append asks into the bids slice, and advance temp
then access temp.NextPrice causing nil deref; fix by iterating from i=0 up to
depthLevel-1, use a loop condition that checks temp != nil (e.g., for i:=0;
i<depthLevel && temp!=nil; i++), append the constructed pb.PriceLevel to bids
for the bids loop and to asks for the asks loop, and advance temp safely by
setting temp = temp.NextPrice only after checking temp is non-nil (and avoid
accessing temp.NextPrice directly without verifying temp != nil).
- Around line 691-695: The code updates order.CancelledQuantity twice—first with
"order.CancelledQuantity += oldRemaining - newRemaining" and then immediately
with "order.CancelledQuantity = newCancelledQuantity" (where
newCancelledQuantity was computed as order.CancelledQuantity + volumeDelta and
volumeDelta == oldRemaining - newRemaining), which is redundant and confusing;
remove the incremental assignment (the "+=" line) and keep the direct assignment
to newCancelledQuantity so only one clear update to order.CancelledQuantity
remains, referencing the existing variables order.CancelledQuantity,
newCancelledQuantity, volumeDelta, oldRemaining, and newRemaining to locate the
logic.
In `@apps/order-trade-worker-service/src/kafka.consumer.ts`:
- Around line 167-179: In the EventType.ORDER_REJECTED branch you’re calling
this.orderController.updateOrderForCancelled which forces status CANCELLED;
change this to update the order as REJECTED instead—either call a dedicated
method like updateOrderForRejected on OrderController (create it if missing)
that sets status = OrderStatus.REJECTED and applies
cancelledQuantity/remainingQuantity, or reuse a generic
updateOrder/updateOrderStatus method to set status to REJECTED with the same
payload; update the handler for EventType.ORDER_REJECTED to invoke that
rejected-specific update rather than updateOrderForCancelled.
- Around line 47-51: startConsuming() currently calls
this.kafkaClient.subscribe(...) but does not await it, so startup/connect
failures run in background; change startConsuming to await the promise returned
by this.kafkaClient.subscribe(...) (i.e., add await before
this.kafkaClient.subscribe(...) or return that promise) so errors bubble to the
caller; ensure you reference the existing subscription call
(this.kafkaClient.subscribe with
KAFKA_CONSUMER_GROUP_ID.ORDER_CONSUMER_SERVICE_1 and KAFKA_TOPICS.ORDERS) and
let exceptions propagate (or rethrow) so startup failures surface to the caller.
- Around line 58-60: The handler is passing a parsed JSON object to
EngineEvent.decode which expects protobuf binary; update the code in the Kafka
message handling to match the actual producer format: if messages are JSON,
replace EngineEvent.decode(event) with EngineEvent.fromObject(event) (or the
JSON deserializer provided by the protobuf runtime) and remove the misleading
Buffer type annotation in the subscribe callback; if messages are sent as
protobuf binary, stop JSON.parse(raw) in KafkaClient.subscribe so the callback
receives a Buffer/Uint8Array and keep EngineEvent.decode(event). Also verify
producer encoding to choose the correct branch.
In
`@packages/prisma/prisma/migrations/20260121103519_update_order_trade_table/migration.sql`:
- Line 75: The migration adds a misspelled column "canelled_quantity"; update
the migration SQL to use the correct column name "cancelled_quantity" (replace
"canelled_quantity" in the ADD COLUMN statement) and then search for and update
any schema/model references, queries, or seed data that use "canelled_quantity"
(e.g., in Prisma schema, model fields, repository methods, or raw SQL) so they
consistently use "cancelled_quantity" before applying the migration to
production.
- Around line 68-88: The migration is unsafe because it adds NOT NULL columns
(average_price, canelled_quantity, client_timeline, engine_timeline,
executedValue, gateway_timeline) to orders and changes DECIMAL columns
(quantity, price) to INTEGER without any backfill or safe conversion; fix by
making the new columns nullable (or add with sensible DEFAULTs), add an explicit
backfill step that populates average_price, canelled_quantity, client_timeline,
engine_timeline, executedValue, gateway_timeline for existing rows (or compute
sensible defaults), then run an ALTER to SET NOT NULL only after backfill; for
quantity/price in function ALTER TABLE "orders" ALTER COLUMN "quantity"/"price"
SET DATA TYPE INTEGER, perform a safe conversion using a temporary column or
USING expression (e.g., ROUND/CAST or multiplied/divided as appropriate) to
preserve values and add a verification/backfill step before dropping the old
column or changing type to INTEGER. Ensure the migration is split into separate
steps: add nullable columns, backfill/update rows, convert DECIMAL to INTEGER
safely with explicit conversion logic, then enforce NOT NULL constraints.
In
`@packages/prisma/prisma/migrations/20260121105000_add_buyer_seller_relation_in_trade_table/migration.sql`:
- Around line 8-13: The migration drops "user_id" from the "trades" table
without backfilling the new "buyer_id" and "seller_id" columns, risking data
loss; update the migration to either (A) add a data-migration step that runs
before dropping "user_id" to populate "buyer_id" or "seller_id" from existing
"user_id" (e.g., UPDATE trades SET buyer_id = user_id WHERE <appropriate
condition>; or seller_id = user_id as applicable) and ensure any
NULLs/constraints are handled, or (B) add an explicit safety check/assertion
that the "trades" table is empty at deployment time so dropping "user_id" is
safe; locate this logic around the ALTER TABLE block that touches "trades",
"user_id", "buyer_id", and "seller_id" and implement the chosen approach before
removing the "user_id" column.
In
`@packages/prisma/prisma/migrations/20260122163311_add_timestamp_trade/migration.sql`:
- Around line 8-9: The migration adds a NOT NULL "updated_at" to "trades" which
will fail on non-empty tables; change it to a 2-step migration: first ALTER
TABLE "trades" ADD COLUMN "updated_at" TIMESTAMP(3) NULL (no NOT NULL), then
backfill existing rows (e.g. UPDATE "trades" SET "updated_at" =
COALESCE("created_at", CURRENT_TIMESTAMP) or a chosen timestamp), and finally
ALTER TABLE "trades" ALTER COLUMN "updated_at" SET NOT NULL (optionally add a
DEFAULT CURRENT_TIMESTAMP if desired). Ensure the referenced column names
"trades", "updated_at", and "created_at" are used exactly as in the diff.
In `@packages/prisma/prisma/schema/order.prisma`:
- Line 19: Rename the misspelled field canelled_quantity to cancelled_quantity
in the Order model (replace the canelled_quantity field definition with
cancelled_quantity Int), update all code references/usages of canelled_quantity
to the new cancelled_quantity symbol, then regenerate and apply schema changes
(run prisma generate and create/apply a migration with prisma migrate dev or
create a manual SQL rename migration if preserving production data is required)
so the database column and Prisma client stay consistent.
In `@packages/prisma/prisma/schema/trade.prisma`:
- Around line 8-9: The schema adds non-nullable fields trade_sequence (Int) and
is_buyer_maker (Boolean) without defaults which will break migrations on
non-empty tables; either add safe defaults in the Prisma model (e.g.,
`@default`(0) on trade_sequence and `@default`(false) on is_buyer_maker) or modify
the generated migration to first ALTER TABLE ADD COLUMN as nullable, run an
UPDATE to backfill existing rows (set trade_sequence = 0 and is_buyer_maker =
false), then ALTER TABLE to set NOT NULL (or re-add with constraints), and
ensure the model fields trade_sequence and is_buyer_maker match the final
non-nullable definition.
- Around line 6-8: Change the Prisma field types for price, quantity, and
trade_sequence from Int to BigInt in the trade model (fields: price, quantity,
trade_sequence), then run a Prisma migration (prisma migrate dev or apply a
migration) to update the DB; also update any application code that reads/writes
these fields to handle BigInt values (adjust TypeScript types/usages from number
to bigint or string as your runtime requires) so conversions/serialisation are
correct.
🧹 Nitpick comments (8)
apps/matching-engine/internal/kafka.go (2)
70-70: No graceful shutdown mechanism.The context is initialized with
context.Background()and there's no way to cancel it externally. TheRun()loop will run forever with no way to stop it cleanly.Proposed fix: Accept context from caller
-func NewKafkaProducerWorker(symbol string, dirPath string, wal *SymbolWAL, batchSize int, emitTime int) (*KafkaProducerWorker, error) { +func NewKafkaProducerWorker(ctx context.Context, symbol string, dirPath string, wal *SymbolWAL, batchSize int, emitTime int) (*KafkaProducerWorker, error) { brokers := []string{"localhost:19092", "localhost:19093", "localhost:19094"} producer, err := GetProducer(brokers) if err != nil { return nil, err } file, err := os.OpenFile(filepath.Join(dirPath, symbol, "checkpoint.meta"), os.O_CREATE|os.O_RDWR, 0644) if err != nil { return nil, err } return &KafkaProducerWorker{ producer: producer, wal: wal, batchSize: batchSize, emitTimeMM: emitTime, dirPath: dirPath, Symbol: symbol, checkpointFile: file, - ctx: context.Background(), + ctx: ctx, }, nil }Also applies to: 74-88
51-51: Hardcoded broker addresses.Consider making the broker list configurable via environment variables or constructor parameters to support different deployment environments.
apps/order-trade-worker-service/src/kafka.consumer.ts (1)
54-56: TODO idempotency still open.
If sequence headers are intended to dedupe, consider persisting processed offsets/sequence IDs.If you want, I can propose an idempotency strategy and add a small persistence layer or Redis-backed dedupe cache.
apps/order-trade-worker-service/src/controllers/trade.controller.ts (1)
5-26: Align repository null-guard with typing and log labels.
tradeRepois non-optional but guarded; either make it optional (like the order controller) or remove the guard. Also, the warning and log metadata should reference trades rather than orders.♻️ Suggested cleanup
export class TradeServerController { constructor( private readonly logger: Logger, - private tradeRepo: TradeRepository, + private tradeRepo?: TradeRepository, ) {} async createTrade(data: { @@ if (!this.tradeRepo) { - this.logger.warn("TradeRepository not provided, skipping order persistence"); + this.logger.warn("TradeRepository not provided, skipping trade persistence"); return; } @@ - this.logger.info("trade persisted successfully", { orderId: newTrade.id }); + this.logger.info("trade persisted successfully", { tradeId: newTrade.id });Also applies to: 44-44
packages/prisma/src/repositories/trade.ts (1)
41-49: Update method doc to match buyer/seller terminology.
The method now filters by buyer/seller IDs, so the “maker/taker” docstring is slightly stale.apps/matching-engine/internal/engine.go (1)
896-906: Remove commented-out code before merging.This block of commented-out code for depth and ticker event generation should either be removed or uncommented if needed. Leaving it in creates confusion about intent.
packages/prisma/prisma/schema/order.prisma (1)
10-10: Inconsistent naming:executedValueuses camelCase.All other fields use
snake_case(e.g.,average_price,filled_quantity), butexecutedValueuses camelCase. This creates inconsistency in the database schema and API.♻️ Proposed fix
- executedValue Int + executed_value Intpackages/prisma/prisma/schema/trade.prisma (1)
11-21: Confirm optionality of buy/sell/buyer/seller is intentional.If every trade must always have both sides, consider making these relations required to enforce integrity at the DB level.
| fmt.Printf("%s", err.Error()) | ||
| } | ||
|
|
||
| tikcer, _ := me.getTickerEvent(trade.Price, me.Bids.BestPriceLevel.Price, me.Asks.BestPriceLevel.Price) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential nil pointer dereference when accessing BestPriceLevel.
If me.Bids.BestPriceLevel or me.Asks.BestPriceLevel is nil (e.g., after a trade exhausts one side of the book), accessing .Price will cause a panic.
🐛 Proposed fix: Add nil checks before accessing BestPriceLevel
- tikcer, _ := me.getTickerEvent(trade.Price, me.Bids.BestPriceLevel.Price, me.Asks.BestPriceLevel.Price)
+ var bidPrice, askPrice int64
+ if me.Bids.BestPriceLevel != nil {
+ bidPrice = me.Bids.BestPriceLevel.Price
+ }
+ if me.Asks.BestPriceLevel != nil {
+ askPrice = me.Asks.BestPriceLevel.Price
+ }
+ ticker, tickerErr := me.getTickerEvent(trade.Price, bidPrice, askPrice)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tikcer, _ := me.getTickerEvent(trade.Price, me.Bids.BestPriceLevel.Price, me.Asks.BestPriceLevel.Price) | |
| var bidPrice, askPrice int64 | |
| if me.Bids.BestPriceLevel != nil { | |
| bidPrice = me.Bids.BestPriceLevel.Price | |
| } | |
| if me.Asks.BestPriceLevel != nil { | |
| askPrice = me.Asks.BestPriceLevel.Price | |
| } | |
| ticker, tickerErr := me.getTickerEvent(trade.Price, bidPrice, askPrice) |
🤖 Prompt for AI Agents
In `@apps/matching-engine/internal/engine.go` at line 488, me.getTickerEvent is
being called with me.Bids.BestPriceLevel.Price and me.Asks.BestPriceLevel.Price
which can panic if either BestPriceLevel is nil; before the call, compute safe
bestBidPrice and bestAskPrice by checking me.Bids.BestPriceLevel != nil and
me.Asks.BestPriceLevel != nil and use a sensible default (e.g., 0 or NaN) or an
optional/nullable value when nil, then call me.getTickerEvent(trade.Price,
bestBidPrice, bestAskPrice); ensure the variable name tikcer is preserved or
corrected after you replace the direct field access to avoid nil dereference.
| tikcer, _ := me.getTickerEvent(trade.Price, me.Bids.BestPriceLevel.Price, me.Asks.BestPriceLevel.Price) | ||
| if err != nil { | ||
| events = append(events, tikcer) | ||
| } | ||
| if tikcer == nil { | ||
| fmt.Printf("%s", err.Error()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inverted condition and potential nil dereference for ticker event.
The logic here is inverted: the ticker is appended when err != nil (line 489-490), but it should be appended when err == nil. Additionally, if tikcer is nil, calling err.Error() on line 493 will panic if err is also nil (which it could be since err is from the previous getDepthEvent call, not from getTickerEvent).
🐛 Proposed fix
- tikcer, _ := me.getTickerEvent(trade.Price, me.Bids.BestPriceLevel.Price, me.Asks.BestPriceLevel.Price)
- if err != nil {
- events = append(events, tikcer)
- }
- if tikcer == nil {
- fmt.Printf("%s", err.Error())
- }
+ ticker, tickerErr := me.getTickerEvent(trade.Price, me.Bids.BestPriceLevel.Price, me.Asks.BestPriceLevel.Price)
+ if tickerErr == nil && ticker != nil {
+ events = append(events, ticker)
+ } else if tickerErr != nil {
+ fmt.Printf("ticker error: %s", tickerErr.Error())
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tikcer, _ := me.getTickerEvent(trade.Price, me.Bids.BestPriceLevel.Price, me.Asks.BestPriceLevel.Price) | |
| if err != nil { | |
| events = append(events, tikcer) | |
| } | |
| if tikcer == nil { | |
| fmt.Printf("%s", err.Error()) | |
| } | |
| ticker, tickerErr := me.getTickerEvent(trade.Price, me.Bids.BestPriceLevel.Price, me.Asks.BestPriceLevel.Price) | |
| if tickerErr == nil && ticker != nil { | |
| events = append(events, ticker) | |
| } else if tickerErr != nil { | |
| fmt.Printf("ticker error: %s", tickerErr.Error()) | |
| } |
🤖 Prompt for AI Agents
In `@apps/matching-engine/internal/engine.go` around lines 488 - 494, The code
currently appends tikcer to events when err != nil and then calls err.Error()
when tikcer == nil which can cause logic inversion and nil deref; update the
block around me.getTickerEvent(trade.Price, me.Bids.BestPriceLevel.Price,
me.Asks.BestPriceLevel.Price) so that you append the returned tikcer to events
only when err == nil, and when tikcer is nil only print or log err if err is
non-nil (i.e., check err != nil before calling err.Error()); also correct any
misspelling of the variable name (tikcer -> ticker) to avoid confusion and
ensure the error variable used is the one returned by getTickerEvent rather than
a stale err from another call.
Summary by CodeRabbit
Release Notes
New Features
Infrastructure
Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.