diff --git a/src/app/client.ts b/src/app/client.ts index f756732..c786eba 100644 --- a/src/app/client.ts +++ b/src/app/client.ts @@ -3,7 +3,7 @@ import Base from "./base"; import Task from "./task"; import { AsyncResult } from "./result"; -class TaskMessage { +export class TaskMessage { constructor( readonly headers: object, readonly properties: object, @@ -126,7 +126,7 @@ export default class Client extends Base { /** * get AsyncResult by task id * @param {string} taskId for task identification. - * @returns {AsyncResult} + * @returns {AsyncResult} */ public asyncResult(taskId: string): AsyncResult { return new AsyncResult(taskId, this.backend); diff --git a/src/app/worker.ts b/src/app/worker.ts index e8096be..4798559 100644 --- a/src/app/worker.ts +++ b/src/app/worker.ts @@ -1,5 +1,7 @@ import Base from "./base"; import { Message } from "../kombu/message"; +import { createClient } from "../index"; +import Client, { TaskMessage } from "./client"; export default class Worker extends Base { handlers: object = {}; @@ -128,7 +130,7 @@ export default class Worker extends Base { } // request - const [args, kwargs /*, embed */] = body; + const [args, kwargs, embed] = body; const taskId = headers["id"]; const handler = this.handlers[taskName]; @@ -143,19 +145,27 @@ export default class Worker extends Base { ); const timeStart = process.hrtime(); - const taskPromise = handler(...args, kwargs).then(result => { - const diff = process.hrtime(timeStart); - console.info( - `celery.node Task ${taskName}[${taskId}] succeeded in ${diff[0] + - diff[1] / 1e9}s: ${result}` - ); - this.backend.storeResult(taskId, result, "SUCCESS"); - this.activeTasks.delete(taskPromise); - }).catch(err => { - console.info(`celery.node Task ${taskName}[${taskId}] failed: [${err}]`); - this.backend.storeResult(taskId, err, "FAILURE"); - this.activeTasks.delete(taskPromise); - }); + const chain = embed.chain; + const taskPromise = handler(...args, kwargs) + .then(result => { + const diff = process.hrtime(timeStart); + if (chain !== null) { + this.sendChainTask(chain, message); + } + console.info( + `celery.node Task ${taskName}[${taskId}] succeeded in ${diff[0] + + diff[1] / 1e9}s: ${result}` + ); + this.backend.storeResult(taskId, result, "SUCCESS"); + this.activeTasks.delete(taskPromise); + }) + .catch(err => { + console.info( + `celery.node Task ${taskName}[${taskId}] failed: [${err}]` + ); + this.backend.storeResult(taskId, err, "FAILURE"); + this.activeTasks.delete(taskPromise); + }); // record the executing task this.activeTasks.add(taskPromise); @@ -184,4 +194,47 @@ export default class Worker extends Base { public stop(): any { throw new Error("not implemented yet"); } + + /** + * @method Worker#sendChainTask + * @param chain the chain sent by the local/remote client + * @param requestMessage the message sent by the local/remote client + * @private + */ + private sendChainTask(chain: Array, requestMessage: Message): void { + const chainToSend: any[] = chain; + const children = chainToSend.pop(); + const client: Client = createClient( + this.conf.CELERY_BROKER, + this.conf.CELERY_BACKEND, + children.options.queue + ); + const message: TaskMessage = { + headers: { + lang: "js", + task: children.task, + id: children.options.task_id, + root_id: requestMessage.headers["root_id"], + parent_id: requestMessage.headers["id"] + }, + properties: { + correlation_id: children.options.task_id, + reply_to: requestMessage.properties["reply_to"], + delivery_mode: 2, + priority: 0 + }, + body: [ + children.args, + children.kwargs, + { + callbacks: null, + errbacks: null, + chain: chain, + chord: null + } + ], + sentEvent: null + }; + client.sendTaskMessage(children.task, message); + } }