From 21941f193b0cfc21fd19a90a90f17473e824efd9 Mon Sep 17 00:00:00 2001 From: liam-pulsation Date: Wed, 29 May 2024 16:24:17 +0200 Subject: [PATCH 1/4] adding chain functionality on V2 protocol --- src/app/client.ts | 50 +++++++++++++++++++++---------------------- src/app/worker.ts | 54 ++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 74 insertions(+), 30 deletions(-) diff --git a/src/app/client.ts b/src/app/client.ts index f756732..c9441c4 100644 --- a/src/app/client.ts +++ b/src/app/client.ts @@ -3,12 +3,12 @@ import Base from "./base"; import Task from "./task"; import { AsyncResult } from "./result"; -class TaskMessage { +export class TaskMessage { constructor( - readonly headers: object, - readonly properties: object, - readonly body: [Array, object, object] | object, - readonly sentEvent: object + readonly headers: object, + readonly properties: object, + readonly body: [Array, object, object] | object, + readonly sentEvent: object ) {} } @@ -30,21 +30,21 @@ export default class Client extends Base { // const serializer = 'json'; this.isReady().then(() => - this.broker.publish( - body, - exchange, - this.conf.CELERY_QUEUE, - headers, - properties - ) + this.broker.publish( + body, + exchange, + this.conf.CELERY_QUEUE, + headers, + properties + ) ); } public asTaskV2( - taskId: string, - taskName: string, - args?: Array, - kwargs?: object + taskId: string, + taskName: string, + args?: Array, + kwargs?: object ): TaskMessage { const message: TaskMessage = { headers: { @@ -87,10 +87,10 @@ export default class Client extends Base { * @returns {String} JSON serialized string of celery task message */ public asTaskV1( - taskId: string, - taskName: string, - args?: Array, - kwargs?: object + taskId: string, + taskName: string, + args?: Array, + kwargs?: object ): TaskMessage { const message: TaskMessage = { headers: {}, @@ -126,17 +126,17 @@ export default class Client extends Base { /** * get AsyncResult by task id * @param {string} taskId for task identification. - * @returns {AsyncResult} + * @returns {AsyncResult} */ public asyncResult(taskId: string): AsyncResult { return new AsyncResult(taskId, this.backend); } public sendTask( - taskName: string, - args?: Array, - kwargs?: object, - taskId?: string + taskName: string, + args?: Array, + kwargs?: object, + taskId?: string ): AsyncResult { taskId = taskId || v4(); const message = this.createTaskMessage(taskId, taskName, args, kwargs); diff --git a/src/app/worker.ts b/src/app/worker.ts index e8096be..206f310 100644 --- a/src/app/worker.ts +++ b/src/app/worker.ts @@ -1,5 +1,8 @@ import Base from "./base"; import { Message } from "../kombu/message"; +import {newCeleryBroker} from "../kombu/brokers"; +import {createClient} from "../index"; +import Client, {TaskMessage} from "./client"; export default class Worker extends Base { handlers: object = {}; @@ -128,7 +131,7 @@ export default class Worker extends Base { } // request - const [args, kwargs /*, embed */] = body; + const [args, kwargs , embed ] = body; const taskId = headers["id"]; const handler = this.handlers[taskName]; @@ -137,16 +140,20 @@ export default class Worker extends Base { } console.info( - `celery.node Received task: ${taskName}[${taskId}], args: ${args}, kwargs: ${JSON.stringify( - kwargs - )}` + `celery.node Received task: ${taskName}[${taskId}], args: ${args}, kwargs: ${JSON.stringify( + kwargs + )}` ); const timeStart = process.hrtime(); + const chain = embed.chain const taskPromise = handler(...args, kwargs).then(result => { const diff = process.hrtime(timeStart); + if (chain !== null){ + this.sendChainTask(chain, message) + } console.info( - `celery.node Task ${taskName}[${taskId}] succeeded in ${diff[0] + + `celery.node Task ${taskName}[${taskId}] succeeded in ${diff[0] + diff[1] / 1e9}s: ${result}` ); this.backend.storeResult(taskId, result, "SUCCESS"); @@ -184,4 +191,41 @@ export default class Worker extends Base { public stop(): any { throw new Error("not implemented yet"); } + + private sendChainTask(chain: Array, requestMessage: Message): void { + const chainToSend : any[] = chain + const children = chainToSend.pop() + const client : Client = createClient( + this.conf.CELERY_BROKER, + this.conf.CELERY_BACKEND, + children.options.queue + ) + const message : TaskMessage = { + headers: { + lang: "js", + task: children.task, + id: children.options.task_id, + root_id: requestMessage.headers['root_id'], + parent_id: requestMessage.headers['id'] + }, + properties: { + correlation_id: children.options.task_id, + reply_to: requestMessage.properties['reply_to'], + delivery_mode: 2, + priority: 0, + }, + body: [ + children.args, + children.kwargs, + { + callbacks: null, + errbacks: null, + chain: chain, + chord: null + } + ], + sentEvent:null + }; + client.sendTaskMessage(children.task, message) + } } From 03ec13a36c876f0738b471e9c212ad4d91a1b390 Mon Sep 17 00:00:00 2001 From: liam-pulsation Date: Wed, 29 May 2024 16:32:25 +0200 Subject: [PATCH 2/4] cleaning code --- src/app/client.ts | 4 ++-- src/app/worker.ts | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/app/client.ts b/src/app/client.ts index c9441c4..52a9853 100644 --- a/src/app/client.ts +++ b/src/app/client.ts @@ -1,7 +1,7 @@ -import { v4 } from "uuid"; +import {v4} from "uuid"; import Base from "./base"; import Task from "./task"; -import { AsyncResult } from "./result"; +import {AsyncResult} from "./result"; export class TaskMessage { constructor( diff --git a/src/app/worker.ts b/src/app/worker.ts index 206f310..5159691 100644 --- a/src/app/worker.ts +++ b/src/app/worker.ts @@ -1,6 +1,5 @@ import Base from "./base"; -import { Message } from "../kombu/message"; -import {newCeleryBroker} from "../kombu/brokers"; +import {Message} from "../kombu/message"; import {createClient} from "../index"; import Client, {TaskMessage} from "./client"; From 3e8310ac4269ff02bfa8fd4e13b9fa1cd4f173f3 Mon Sep 17 00:00:00 2001 From: liam-pulsation Date: Thu, 30 May 2024 11:39:55 +0200 Subject: [PATCH 3/4] refacto: using prettier linting --- src/app/client.ts | 50 +++++++++++++++--------------- src/app/worker.ts | 78 +++++++++++++++++++++++++---------------------- 2 files changed, 66 insertions(+), 62 deletions(-) diff --git a/src/app/client.ts b/src/app/client.ts index 52a9853..c786eba 100644 --- a/src/app/client.ts +++ b/src/app/client.ts @@ -1,14 +1,14 @@ -import {v4} from "uuid"; +import { v4 } from "uuid"; import Base from "./base"; import Task from "./task"; -import {AsyncResult} from "./result"; +import { AsyncResult } from "./result"; export class TaskMessage { constructor( - readonly headers: object, - readonly properties: object, - readonly body: [Array, object, object] | object, - readonly sentEvent: object + readonly headers: object, + readonly properties: object, + readonly body: [Array, object, object] | object, + readonly sentEvent: object ) {} } @@ -30,21 +30,21 @@ export default class Client extends Base { // const serializer = 'json'; this.isReady().then(() => - this.broker.publish( - body, - exchange, - this.conf.CELERY_QUEUE, - headers, - properties - ) + this.broker.publish( + body, + exchange, + this.conf.CELERY_QUEUE, + headers, + properties + ) ); } public asTaskV2( - taskId: string, - taskName: string, - args?: Array, - kwargs?: object + taskId: string, + taskName: string, + args?: Array, + kwargs?: object ): TaskMessage { const message: TaskMessage = { headers: { @@ -87,10 +87,10 @@ export default class Client extends Base { * @returns {String} JSON serialized string of celery task message */ public asTaskV1( - taskId: string, - taskName: string, - args?: Array, - kwargs?: object + taskId: string, + taskName: string, + args?: Array, + kwargs?: object ): TaskMessage { const message: TaskMessage = { headers: {}, @@ -133,10 +133,10 @@ export default class Client extends Base { } public sendTask( - taskName: string, - args?: Array, - kwargs?: object, - taskId?: string + taskName: string, + args?: Array, + kwargs?: object, + taskId?: string ): AsyncResult { taskId = taskId || v4(); const message = this.createTaskMessage(taskId, taskName, args, kwargs); diff --git a/src/app/worker.ts b/src/app/worker.ts index 5159691..059e9ef 100644 --- a/src/app/worker.ts +++ b/src/app/worker.ts @@ -1,7 +1,7 @@ import Base from "./base"; -import {Message} from "../kombu/message"; -import {createClient} from "../index"; -import Client, {TaskMessage} from "./client"; +import { Message } from "../kombu/message"; +import { createClient } from "../index"; +import Client, { TaskMessage } from "./client"; export default class Worker extends Base { handlers: object = {}; @@ -130,7 +130,7 @@ export default class Worker extends Base { } // request - const [args, kwargs , embed ] = body; + const [args, kwargs, embed] = body; const taskId = headers["id"]; const handler = this.handlers[taskName]; @@ -139,29 +139,33 @@ export default class Worker extends Base { } console.info( - `celery.node Received task: ${taskName}[${taskId}], args: ${args}, kwargs: ${JSON.stringify( - kwargs - )}` + `celery.node Received task: ${taskName}[${taskId}], args: ${args}, kwargs: ${JSON.stringify( + kwargs + )}` ); const timeStart = process.hrtime(); - const chain = embed.chain - const taskPromise = handler(...args, kwargs).then(result => { - const diff = process.hrtime(timeStart); - if (chain !== null){ - this.sendChainTask(chain, message) - } - console.info( + const chain = embed.chain; + const taskPromise = handler(...args, kwargs) + .then(result => { + const diff = process.hrtime(timeStart); + if (chain !== null) { + this.sendChainTask(chain, message); + } + console.info( `celery.node Task ${taskName}[${taskId}] succeeded in ${diff[0] + - diff[1] / 1e9}s: ${result}` - ); - this.backend.storeResult(taskId, result, "SUCCESS"); - this.activeTasks.delete(taskPromise); - }).catch(err => { - console.info(`celery.node Task ${taskName}[${taskId}] failed: [${err}]`); - this.backend.storeResult(taskId, err, "FAILURE"); - this.activeTasks.delete(taskPromise); - }); + diff[1] / 1e9}s: ${result}` + ); + this.backend.storeResult(taskId, result, "SUCCESS"); + this.activeTasks.delete(taskPromise); + }) + .catch(err => { + console.info( + `celery.node Task ${taskName}[${taskId}] failed: [${err}]` + ); + this.backend.storeResult(taskId, err, "FAILURE"); + this.activeTasks.delete(taskPromise); + }); // record the executing task this.activeTasks.add(taskPromise); @@ -192,26 +196,26 @@ export default class Worker extends Base { } private sendChainTask(chain: Array, requestMessage: Message): void { - const chainToSend : any[] = chain - const children = chainToSend.pop() - const client : Client = createClient( - this.conf.CELERY_BROKER, - this.conf.CELERY_BACKEND, - children.options.queue - ) - const message : TaskMessage = { + const chainToSend: any[] = chain; + const children = chainToSend.pop(); + const client: Client = createClient( + this.conf.CELERY_BROKER, + this.conf.CELERY_BACKEND, + children.options.queue + ); + const message: TaskMessage = { headers: { lang: "js", task: children.task, id: children.options.task_id, - root_id: requestMessage.headers['root_id'], - parent_id: requestMessage.headers['id'] + root_id: requestMessage.headers["root_id"], + parent_id: requestMessage.headers["id"] }, properties: { correlation_id: children.options.task_id, - reply_to: requestMessage.properties['reply_to'], + reply_to: requestMessage.properties["reply_to"], delivery_mode: 2, - priority: 0, + priority: 0 }, body: [ children.args, @@ -223,8 +227,8 @@ export default class Worker extends Base { chord: null } ], - sentEvent:null + sentEvent: null }; - client.sendTaskMessage(children.task, message) + client.sendTaskMessage(children.task, message); } } From 0ec091886ccd76641a87c6c3322f89f92178e2b6 Mon Sep 17 00:00:00 2001 From: liam-pulsation Date: Thu, 30 May 2024 11:44:33 +0200 Subject: [PATCH 4/4] refacto: adding infos on the method --- src/app/worker.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/app/worker.ts b/src/app/worker.ts index 059e9ef..4798559 100644 --- a/src/app/worker.ts +++ b/src/app/worker.ts @@ -195,6 +195,12 @@ export default class Worker extends Base { throw new Error("not implemented yet"); } + /** + * @method Worker#sendChainTask + * @param chain the chain sent by the local/remote client + * @param requestMessage the message sent by the local/remote client + * @private + */ private sendChainTask(chain: Array, requestMessage: Message): void { const chainToSend: any[] = chain; const children = chainToSend.pop();