From 7cc1d7b6804de51484e3b6da994c4e5aeab3220d Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Mon, 24 Nov 2025 23:59:03 +0100 Subject: [PATCH 1/7] dependencies: activated tokio signal feature --- Cargo.lock | 10 ++++++++++ Cargo.toml | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index e701770..2d35cea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1565,6 +1565,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" +dependencies = [ + "libc", +] + [[package]] name = "signatory" version = "0.27.1" @@ -1761,6 +1770,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", diff --git a/Cargo.toml b/Cargo.toml index 7e154fe..36a89dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ edition = "2024" [workspace.dependencies] code0-flow = { version = "0.0.18" } tucana = { version = "0.0.40" } -tokio = { version = "1.44.1", features = ["rt-multi-thread"] } +tokio = { version = "1.44.1", features = ["rt-multi-thread", "signal"] } log = "0.4.27" futures-lite = "2.6.0" rand = "0.9.1" From 3f3c8fa5c1d73b55acf3ae54a295a7c91d147c91 Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Mon, 24 Nov 2025 23:59:41 +0100 Subject: [PATCH 2/7] feat: added definition to env. config --- .env-example | 3 ++- taurus/src/config/mod.rs | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.env-example b/.env-example index d1859ac..c6307e4 100644 --- a/.env-example +++ b/.env-example @@ -4,4 +4,5 @@ NATS_URL='nats://localhost:4222' AQUILA_URL='http://localhost:8080' WITH_HEALTH_SERVICE=false GRPC_HOST='127.0.0.1' -GRPC_PORT=50051 \ No newline at end of file +GRPC_PORT=50051 +DEFINITIONS='./definitions' \ No newline at end of file diff --git a/taurus/src/config/mod.rs b/taurus/src/config/mod.rs index 9ec5fd9..9637e0d 100644 --- a/taurus/src/config/mod.rs +++ b/taurus/src/config/mod.rs @@ -26,6 +26,8 @@ pub struct Config { pub grpc_host: String, pub grpc_port: u16, + + pub definitions: String, } /// Implementation for all relevant `Aquila` startup configurations @@ -36,12 +38,13 @@ impl Config { pub fn new() -> Self { Config { environment: env_with_default("ENVIRONMENT", Environment::Development), - mode: env_with_default("MODE", Mode::STATIC), + mode: env_with_default("MODE", Mode::DYNAMIC), nats_url: env_with_default("NATS_URL", String::from("nats://localhost:4222")), aquila_url: env_with_default("AQUILA_URL", String::from("http://localhost:50051")), with_health_service: env_with_default("WITH_HEALTH_SERVICE", false), grpc_host: env_with_default("GRPC_HOST", "127.0.0.1".to_string()), grpc_port: env_with_default("GRPC_PORT", 50051), + definitions: env_with_default("DEFINITIONS", String::from("./definitions")), } } } From 682687430a03aa84ef0fd10ca73ef5b7108bf5a7 Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Mon, 24 Nov 2025 23:59:55 +0100 Subject: [PATCH 3/7] feat: started working on graceful shutdown --- taurus/src/main.rs | 153 +++++++++++++++++++++++++++++++++------------ 1 file changed, 112 insertions(+), 41 deletions(-) diff --git a/taurus/src/main.rs b/taurus/src/main.rs index 56eec1b..4d93ca6 100644 --- a/taurus/src/main.rs +++ b/taurus/src/main.rs @@ -8,12 +8,16 @@ use crate::context::executor::Executor; use crate::context::registry::FunctionStore; use crate::context::signal::Signal; use crate::implementation::collect; + use code0_flow::flow_config::load_env_file; +use code0_flow::flow_config::mode::Mode::DYNAMIC; +use code0_flow::flow_definition::FlowUpdateService; use context::context::Context; use futures_lite::StreamExt; use log::error; use prost::Message; use std::collections::HashMap; +use tokio::signal; use tonic_health::pb::health_server::HealthServer; use tucana::shared::value::Kind; use tucana::shared::{ExecutionFlow, NodeFunction, Value}; @@ -43,13 +47,17 @@ async fn main() { store.populate(collect()); let client = match async_nats::connect(config.nats_url.clone()).await { - Ok(client) => client, + Ok(client) => { + log::info!("Connected to nats server"); + client + } Err(err) => { panic!("Failed to connect to NATS server: {}", err); } }; - if config.with_health_service { + // Optional health service task + let health_task = if config.with_health_service { let health_service = code0_flow::flow_health::HealthService::new(config.nats_url.clone()); let address = match format!("{}:{}", config.grpc_host, config.grpc_port).parse() { Ok(address) => address, @@ -59,51 +67,114 @@ async fn main() { } }; - tokio::spawn(async move { - let _ = tonic::transport::Server::builder() + log::info!("Health server starting at {}", address); + + Some(tokio::spawn(async move { + if let Err(err) = tonic::transport::Server::builder() .add_service(HealthServer::new(health_service)) .serve(address) - .await; - }); + .await + { + log::error!("Health server error: {:?}", err); + } else { + log::info!("Health server started!"); + } + })) + } else { + None + }; - println!("Health server started at {}", address); + // Optional: dynamic mode sync at startup + if config.mode == DYNAMIC { + FlowUpdateService::from_url( + config.aquila_url.clone(), + config.definitions.clone().as_str(), + ) + .send() + .await; } - match client - .queue_subscribe(String::from("execution.*"), "taurus".into()) - .await - { - Ok(mut sub) => { - println!("Subscribed to 'execution.*'"); - - while let Some(msg) = sub.next().await { - let flow: ExecutionFlow = match ExecutionFlow::decode(&*msg.payload) { - Ok(flow) => flow, - Err(err) => { - println!("Failed to deserialize flow: {}, {:?}", err, &msg.payload); - continue; - } - }; - - let value = match handle_message(flow, &store) { - Signal::Failure(error) => error.as_value(), - Signal::Success(v) => v, - Signal::Return(v) => v, - Signal::Respond(v) => v, - Signal::Stop => Value { - kind: Some(Kind::NullValue(0)), - }, - }; - - // Send a response to the reply subject - if let Some(reply) = msg.reply { - match client.publish(reply, value.encode_to_vec().into()).await { - Ok(_) => println!("Response sent"), - Err(err) => println!("Failed to send response: {}", err), - } + let mut worker_task = tokio::spawn(async move { + let mut sub = match client + .queue_subscribe(String::from("execution.*"), "taurus".into()) + .await + { + Ok(sub) => { + log::info!("Subscribed to 'execution.*'"); + sub + } + Err(err) => { + log::error!("Failed to subscribe to 'execution.*': {:?}", err); + return; + } + }; + + while let Some(msg) = sub.next().await { + let flow: ExecutionFlow = match ExecutionFlow::decode(&*msg.payload) { + Ok(flow) => flow, + Err(err) => { + log::error!( + "Failed to deserialize flow: {:?}, payload: {:?}", + err, + &msg.payload + ); + continue; + } + }; + + let value = match handle_message(flow, &store) { + Signal::Failure(error) => error.as_value(), + Signal::Success(v) => v, + Signal::Return(v) => v, + Signal::Respond(v) => v, + Signal::Stop => Value { + kind: Some(Kind::NullValue(0)), + }, + }; + + // Send a response to the reply subject + if let Some(reply) = msg.reply { + match client.publish(reply, value.encode_to_vec().into()).await { + Ok(_) => log::info!("Response sent"), + Err(err) => log::error!("Failed to send response: {:?}", err), } } } - Err(err) => panic!("Failed to subscribe to 'execution.*': {}", err), - }; + + log::info!("NATS worker loop ended"); + }); + + match health_task { + Some(mut health_task) => { + // both are mutable JoinHandle<()> so we can borrow them in select! + tokio::select! { + _ = &mut worker_task => { + log::warn!("NATS worker task finished, shutting down"); + health_task.abort(); + } + _ = &mut health_task => { + log::warn!("Health server task finished, shutting down"); + worker_task.abort(); + } + _ = signal::ctrl_c() => { + log::info!("Ctrl+C/Exit signal received, shutting down"); + worker_task.abort(); + health_task.abort(); + } + } + } + None => { + tokio::select! { + _ = &mut worker_task => { + log::warn!("NATS worker task finished, shutting down"); + } + _ = signal::ctrl_c() => { + log::info!("Ctrl+C/Exit signal received, shutting down"); + worker_task.abort(); + } + } + } + } + + log::info!("Taurus shutdown complete"); } From 9f2469a69637a56cfa668aeb144d919b42e6638c Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Thu, 27 Nov 2025 16:46:26 +0100 Subject: [PATCH 4/7] dependencies: updated tucana, code0-flow and async-nats --- Cargo.lock | 78 +++++++++++++++++++++++------------------------ Cargo.toml | 6 ++-- taurus/Cargo.toml | 2 +- 3 files changed, 42 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2d35cea..5ac9510 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,9 +69,9 @@ checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "async-nats" -version = "0.44.2" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f834a80c3ab6109b9c8f5ca6661a578cf31e088e831b6ce07c6b23cca04f6742" +checksum = "86dde77d8a733a9dbaf865a9eb65c72e09c88f3d14d3dd0d2aecf511920ee4fe" dependencies = [ "base64", "bytes", @@ -231,34 +231,24 @@ dependencies = [ "num-traits", ] -[[package]] -name = "code0-definition-reader" -version = "0.0.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3e3a3aec6ffc35ac360113d8b96206c38961afef56b0c5632cb77cd2f4429b9" -dependencies = [ - "serde", - "serde_json", - "tucana 0.0.39", -] - [[package]] name = "code0-flow" -version = "0.0.18" +version = "0.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f90356cc27d5de4c9870a29163e50e6be77ebeb408220f8057d7d8ed57f2a0fb" +checksum = "bd517223799dc011207b599f7e2a818d95eb53e20dbdfc54d01f47e1c0e6baba" dependencies = [ "async-nats", "async-trait", - "code0-definition-reader", "dotenv", "futures-core", "log", "regex", + "serde", "serde_json", "tonic", "tonic-health", - "tucana 0.0.39", + "tucana", + "walkdir", ] [[package]] @@ -1447,6 +1437,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.28" @@ -1683,7 +1682,7 @@ dependencies = [ "tokio", "tonic", "tonic-health", - "tucana 0.0.40", + "tucana", ] [[package]] @@ -2004,29 +2003,9 @@ dependencies = [ [[package]] name = "tucana" -version = "0.0.39" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e6300e0f85e9352904cace7b285366a10995dbf690c6ed8fa022cd57fa8607b" -dependencies = [ - "pbjson", - "pbjson-build", - "pbjson-types", - "prost", - "prost-build", - "prost-types", - "serde", - "serde_json", - "tonic", - "tonic-build", - "tonic-prost", - "tonic-prost-build", -] - -[[package]] -name = "tucana" -version = "0.0.40" +version = "0.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "565d382969ce8de8484d22516808cfef658710a6f3342a566ad1e247ccb852cf" +checksum = "22ab56226ccbbda9b2bd7505d4296712a2e0757254fbe601c30785ae9e2d09e6" dependencies = [ "pbjson", "pbjson-build", @@ -2096,6 +2075,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -2138,6 +2127,15 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "windows-link" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 36a89dd..a86b9c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,15 +7,15 @@ version = "0.1.0" edition = "2024" [workspace.dependencies] -code0-flow = { version = "0.0.18" } -tucana = { version = "0.0.40" } +code0-flow = { version = "0.0.19" } +tucana = { version = "0.0.42" } tokio = { version = "1.44.1", features = ["rt-multi-thread", "signal"] } log = "0.4.27" futures-lite = "2.6.0" rand = "0.9.1" base64 = "0.22.1" env_logger = "0.11.8" -async-nats = "0.44.2" +async-nats = "0.45.0" prost = "0.14.1" tonic-health = "0.14.1" tonic = "0.14.1" diff --git a/taurus/Cargo.toml b/taurus/Cargo.toml index 7298d37..007f381 100644 --- a/taurus/Cargo.toml +++ b/taurus/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -code0-flow = { workspace = true } +code0-flow = { workspace = true, features = ["flow_service"] } tucana = { workspace = true } tokio = { workspace = true } log = { workspace = true } From febab0820f5af4b95ec529cc8699c6561d5c35c6 Mon Sep 17 00:00:00 2001 From: raphael-goetz Date: Thu, 27 Nov 2025 16:54:19 +0100 Subject: [PATCH 5/7] ref: cargo fmt --- taurus/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/taurus/src/main.rs b/taurus/src/main.rs index 4d93ca6..0d5b2f8 100644 --- a/taurus/src/main.rs +++ b/taurus/src/main.rs @@ -8,10 +8,10 @@ use crate::context::executor::Executor; use crate::context::registry::FunctionStore; use crate::context::signal::Signal; use crate::implementation::collect; +use code0_flow::flow_service::FlowUpdateService; use code0_flow::flow_config::load_env_file; use code0_flow::flow_config::mode::Mode::DYNAMIC; -use code0_flow::flow_definition::FlowUpdateService; use context::context::Context; use futures_lite::StreamExt; use log::error; @@ -90,8 +90,8 @@ async fn main() { config.aquila_url.clone(), config.definitions.clone().as_str(), ) - .send() - .await; + .send() + .await; } let mut worker_task = tokio::spawn(async move { From efd2ab0aa7ca9b0dcec51f8c67df46cf6ba491e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Thu, 27 Nov 2025 16:57:07 +0100 Subject: [PATCH 6/7] Update taurus/src/main.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- taurus/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taurus/src/main.rs b/taurus/src/main.rs index 0d5b2f8..d59c8ce 100644 --- a/taurus/src/main.rs +++ b/taurus/src/main.rs @@ -77,7 +77,7 @@ async fn main() { { log::error!("Health server error: {:?}", err); } else { - log::info!("Health server started!"); + log::info!("Health server stopped gracefully."); } })) } else { From 677d874b8d55093cef151c5e42833efb51a0276e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Thu, 27 Nov 2025 16:57:19 +0100 Subject: [PATCH 7/7] Update taurus/src/main.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- taurus/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taurus/src/main.rs b/taurus/src/main.rs index d59c8ce..53c9db0 100644 --- a/taurus/src/main.rs +++ b/taurus/src/main.rs @@ -135,7 +135,7 @@ async fn main() { // Send a response to the reply subject if let Some(reply) = msg.reply { match client.publish(reply, value.encode_to_vec().into()).await { - Ok(_) => log::info!("Response sent"), + Ok(_) => log::debug!("Response sent"), Err(err) => log::error!("Failed to send response: {:?}", err), } }