diff --git a/Cargo.toml b/Cargo.toml index 66dffa4..5782f5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] version = "0.0.0" name = "code0-flow" -edition = "2021" +edition = "2024" description = "Crate for managing the code0-flows inside of the Flow Queue & FlowStore" repository = "https://github.com/code0-tech/code0-flow" homepage = "https://code0.tech" diff --git a/src/flow_definition/mod.rs b/src/flow_definition/mod.rs index dcdf2e6..308839f 100644 --- a/src/flow_definition/mod.rs +++ b/src/flow_definition/mod.rs @@ -1,9 +1,9 @@ use tucana::{ aquila::{ + DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient, flow_type_service_client::FlowTypeServiceClient, runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient, - DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest, }, shared::{DataType, FlowType, RuntimeFunctionDefinition}, }; @@ -51,14 +51,18 @@ impl FlowUpdateService { async fn update_data_types(&self) { if self.data_types.is_empty() { + log::info!("No data types to update"); return; } log::info!("Updating the current DataTypes!"); let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await { - Ok(client) => client, + Ok(client) => { + log::info!("Successfully connected to the DataTypeService"); + client + } Err(err) => { - log::error!("Failed to connect to DataTypeService: {}", err); + log::error!("Failed to connect to the DataTypeService: {:?}", err); return; } }; @@ -75,23 +79,27 @@ impl FlowUpdateService { ); } Err(err) => { - log::error!("Failed to update data types: {}", err); + log::error!("Failed to update data types: {:?}", err); } } } async fn update_runtime_definitions(&self) { if self.runtime_definitions.is_empty() { + log::info!("No runtime definitions to update"); return; } log::info!("Updating the current RuntimeDefinitions!"); let mut client = match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await { - Ok(client) => client, + Ok(client) => { + log::info!("Connected to RuntimeFunctionDefinitionService"); + client + } Err(err) => { log::error!( - "Failed to connect to RuntimeFunctionDefinitionService: {}", + "Failed to connect to RuntimeFunctionDefinitionService: {:?}", err ); return; @@ -110,21 +118,25 @@ impl FlowUpdateService { ); } Err(err) => { - log::error!("Failed to update runtime function definitions: {}", err); + log::error!("Failed to update runtime function definitions: {:?}", err); } } } async fn update_flow_types(&self) { if self.flow_types.is_empty() { + log::info!("No FlowTypes to update!"); return; } log::info!("Updating the current FlowTypes!"); let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await { - Ok(client) => client, + Ok(client) => { + log::info!("Connected to FlowTypeService!"); + client + } Err(err) => { - log::error!("Failed to connect to FlowTypeService: {}", err); + log::error!("Failed to connect to FlowTypeService: {:?}", err); return; } }; @@ -141,7 +153,7 @@ impl FlowUpdateService { ); } Err(err) => { - log::error!("Failed to update flow types: {}", err); + log::error!("Failed to update flow types: {:?}", err); } } } diff --git a/src/flow_queue/connection.rs b/src/flow_queue/connection.rs index 8336274..586954a 100644 --- a/src/flow_queue/connection.rs +++ b/src/flow_queue/connection.rs @@ -3,19 +3,22 @@ use lapin::Connection; pub async fn build_connection(rabbitmq_url: &str) -> Connection { match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await { Ok(env) => env, - Err(error) => panic!( - "Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}", - error - ), + Err(error) => { + log::error!( + "Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}", + error + ); + panic!("Cannot connect to FlowQueue (RabbitMQ) instance!"); + } } } #[cfg(test)] mod tests { use crate::flow_queue::connection::build_connection; + use testcontainers::GenericImage; use testcontainers::core::{IntoContainerPort, WaitFor}; use testcontainers::runners::AsyncRunner; - use testcontainers::GenericImage; macro_rules! rabbitmq_container_test { ($test_name:ident, $consumer:expr) => { diff --git a/src/flow_queue/service.rs b/src/flow_queue/service.rs index 222014f..511ddc0 100644 --- a/src/flow_queue/service.rs +++ b/src/flow_queue/service.rs @@ -2,9 +2,9 @@ use std::{sync::Arc, time::Duration}; use futures_lite::StreamExt; use lapin::{ + Channel, options::{BasicConsumeOptions, QueueDeclareOptions}, types::FieldTable, - Channel, }; use log::debug; use serde::{Deserialize, Serialize}; @@ -44,6 +44,7 @@ pub enum RabbitMqError { ConnectionError(String), TimeoutError, DeserializationError, + SerializationError, } impl From for RabbitMqError { @@ -65,6 +66,7 @@ impl std::fmt::Display for RabbitMqError { RabbitMqError::ConnectionError(msg) => write!(f, "Connection error: {}", msg), RabbitMqError::TimeoutError => write!(f, "Operation timed out"), RabbitMqError::DeserializationError => write!(f, "Failed to deserialize message"), + RabbitMqError::SerializationError => write!(f, "Failed to serialize message"), } } } @@ -83,8 +85,10 @@ impl RabbitmqClient { ) .await { - Ok(_) => (), - Err(err) => log::error!("Failed to declare send_queue: {}", err), + Ok(_) => { + log::info!("Successfully declared send_queue"); + } + Err(err) => log::error!("Failed to declare send_queue: {:?}", err), } match channel @@ -95,8 +99,10 @@ impl RabbitmqClient { ) .await { - Ok(_) => (), - Err(err) => log::error!("Failed to declare recieve_queue: {}", err), + Ok(_) => { + log::info!("Successfully declared recieve_queue"); + } + Err(err) => log::error!("Failed to declare recieve_queue: {:?}", err), } RabbitmqClient { @@ -109,10 +115,10 @@ impl RabbitmqClient { &self, message_json: String, queue_name: &str, - ) -> Result<(), lapin::Error> { + ) -> Result<(), RabbitMqError> { let channel = self.channel.lock().await; - channel + match channel .basic_publish( "", // exchange queue_name, // routing key (queue name) @@ -120,12 +126,16 @@ impl RabbitmqClient { message_json.as_bytes(), lapin::BasicProperties::default(), ) - .await?; - - Ok(()) + .await + { + Err(err) => { + log::error!("Failed to publish message: {:?}", err); + Err(RabbitMqError::LapinError(err)) + } + Ok(_) => Ok(()), + } } - // Receive messages from a queue // Receive messages from a queue with no timeout pub async fn await_message_no_timeout( &self, @@ -146,8 +156,14 @@ impl RabbitmqClient { .await; match consumer_res { - Ok(consumer) => consumer, - Err(err) => panic!("{}", err), + Ok(consumer) => { + log::info!("Established queue connection to {}", queue_name); + consumer + } + Err(err) => { + log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err); + return Err(RabbitMqError::LapinError(err)); + } } }; @@ -172,17 +188,19 @@ impl RabbitmqClient { let message = match serde_json::from_str::(message_str) { Ok(m) => m, Err(e) => { - log::error!("Failed to parse message: {}", e); + log::error!("Failed to parse message: {:?}", e); return Err(RabbitMqError::DeserializationError); } }; if message.message_id == message_id { if ack_on_success { - delivery + if let Err(delivery_error) = delivery .ack(lapin::options::BasicAckOptions::default()) .await - .expect("Failed to acknowledge message"); + { + log::error!("Failed to acknowledge message: {:?}", delivery_error); + } } return Ok(message); @@ -196,7 +214,7 @@ impl RabbitmqClient { &self, queue_name: &str, handle_message: fn(Message) -> Result, - ) -> Result<(), lapin::Error> { + ) -> Result<(), RabbitMqError> { let mut consumer = { let channel = self.channel.lock().await; @@ -210,8 +228,14 @@ impl RabbitmqClient { .await; match consumer_res { - Ok(consumer) => consumer, - Err(err) => panic!("Cannot consume messages: {}", err), + Ok(consumer) => { + log::info!("Established queue connection to {}", queue_name); + consumer + } + Err(err) => { + log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err); + return Err(RabbitMqError::LapinError(err)); + } } }; @@ -221,8 +245,8 @@ impl RabbitmqClient { let delivery = match delivery { Ok(del) => del, Err(err) => { - log::error!("Error receiving message: {}", err); - return Err(err); + log::error!("Error receiving message: {:?}", err); + return Err(RabbitMqError::LapinError(err)); } }; @@ -233,32 +257,32 @@ impl RabbitmqClient { str } Err(err) => { - log::error!("Error decoding message: {}", err); - return Ok(()); + log::error!("Error decoding message: {:?}", err); + return Err(RabbitMqError::DeserializationError); } }; // Parse the message let inc_message = match serde_json::from_str::(message_str) { Ok(mess) => mess, Err(err) => { - log::error!("Error parsing message: {}", err); - return Ok(()); + log::error!("Error parsing message: {:?}", err); + return Err(RabbitMqError::DeserializationError); } }; let message = match handle_message(inc_message) { Ok(mess) => mess, Err(err) => { - log::error!("Error handling message: {}", err); - return Ok(()); + log::error!("Error handling message: {:?}", err); + return Err(RabbitMqError::DeserializationError); } }; let message_json = match serde_json::to_string(&message) { Ok(json) => json, Err(err) => { - log::error!("Error serializing message: {}", err); - return Ok(()); + log::error!("Error serializing message: {:?}", err); + return Err(RabbitMqError::SerializationError); } }; @@ -267,10 +291,12 @@ impl RabbitmqClient { } // Acknowledge the message - delivery + if let Err(delivery_error) = delivery .ack(lapin::options::BasicAckOptions::default()) .await - .expect("Failed to acknowledge message"); + { + log::error!("Failed to acknowledge message: {:?}", delivery_error); + } } Ok(()) diff --git a/src/flow_store/connection.rs b/src/flow_store/connection.rs index 81b0435..916f5cd 100644 --- a/src/flow_store/connection.rs +++ b/src/flow_store/connection.rs @@ -1,5 +1,5 @@ -use redis::aio::MultiplexedConnection; use redis::Client; +use redis::aio::MultiplexedConnection; use std::sync::Arc; use tokio::sync::Mutex; @@ -7,11 +7,17 @@ pub type FlowStore = Arc>>; pub fn build_connection(redis_url: String) -> Client { match Client::open(redis_url) { - Ok(client) => client, - Err(con_error) => panic!( - "Cannot create Client (Redis) connection! Reason: {}", - con_error - ), + Ok(client) => { + log::info!("Successfully created connection to the FlowStore (Redis)"); + client + } + Err(con_error) => { + log::error!( + "Cannot create FlowStoreClient (Redis) connection! Reason: {:?}", + con_error + ); + panic!("Failed to create Redis client") + } } } @@ -20,11 +26,17 @@ pub async fn create_flow_store_connection(url: String) -> FlowStore { .get_multiplexed_async_connection() .await { - Ok(connection) => connection, - Err(error) => panic!( - "Cannot create FlowStore (Redis) connection! Reason: {}", - error - ), + Ok(connection) => { + log::info!("Successfully created connection to the FlowStore (Redis)"); + connection + } + Err(con_error) => { + log::error!( + "Cannot create FlowStoreClient (Redis) connection! Reason: {:?}", + con_error + ); + panic!("Failed to create Redis client") + } }; Arc::new(Mutex::new(Box::new(client))) @@ -35,10 +47,10 @@ mod tests { use crate::flow_store::connection::create_flow_store_connection; use redis::{AsyncCommands, RedisResult}; use serial_test::serial; + use testcontainers::GenericImage; use testcontainers::core::IntoContainerPort; use testcontainers::core::WaitFor; use testcontainers::runners::AsyncRunner; - use testcontainers::GenericImage; macro_rules! redis_container_test { ($test_name:ident, $consumer:expr) => { diff --git a/src/flow_store/flow_identifier.rs b/src/flow_store/flow_identifier.rs index a577088..595d98d 100644 --- a/src/flow_store/flow_identifier.rs +++ b/src/flow_store/flow_identifier.rs @@ -30,7 +30,7 @@ pub fn get_flow_identifier(flow: &Flow) -> Option { let (method, host) = match (method, host) { (Some(m), Some(h)) => (m, h), missing => { - eprintln!("missing settings: {:?}", missing); + log::error!("Missing settings: {:?}", missing); return None; } };