diff --git a/Cargo.lock b/Cargo.lock index 69122b4..a80d3da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -592,9 +592,11 @@ name = "code0-flow" version = "0.0.0" dependencies = [ "async-trait", + "futures-lite 2.6.0", "lapin", "log", "redis", + "serde", "serde_json", "serial_test", "testcontainers", diff --git a/Cargo.toml b/Cargo.toml index bb1eaad..b6cf719 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,9 @@ redis = { version = "0.29.0", features = [ "json", ] } serde_json = "1.0.138" +serde = "1.0.138" lapin = "2.5.0" +futures-lite = "2.6.0" [dev-dependencies] testcontainers = "0.23.2" diff --git a/src/flow_queue/connection.rs b/src/flow_queue/connection.rs index 9885e2e..8336274 100644 --- a/src/flow_queue/connection.rs +++ b/src/flow_queue/connection.rs @@ -1,64 +1,56 @@ -use lapin::{Channel, Connection, ConnectionProperties}; -use std::sync::Arc; -use tokio::sync::Mutex; +use lapin::Connection; -pub type FlowQueue = Arc>>; - -pub type FlowChannel = Arc>>; - -async fn build_connection(rabbitmq_url: &str) -> Connection { - match Connection::connect(rabbitmq_url, ConnectionProperties::default()).await { +pub async fn build_connection(rabbitmq_url: &str) -> Connection { + match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await { Ok(env) => env, - Err(error) => panic!("Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}", error), - } -} - -pub async fn create_flow_channel_connection(uri: &str) -> FlowChannel { - let connection = build_connection(uri).await; - - match connection.create_channel().await { - Ok(channel) => Arc::new(Mutex::new(Box::new(channel))), - Err(error) => panic!("Cannot create channel {:?}", error), + Err(error) => panic!( + "Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}", + error + ), } } #[cfg(test)] mod tests { + use crate::flow_queue::connection::build_connection; use testcontainers::core::{IntoContainerPort, WaitFor}; use testcontainers::runners::AsyncRunner; use testcontainers::GenericImage; - use crate::flow_queue::connection::build_connection; macro_rules! rabbitmq_container_test { ($test_name:ident, $consumer:expr) => { - #[tokio::test] async fn $test_name() { let port: u16 = 5672; let image_name = "rabbitmq"; let wait_message = "Server startup complete"; - + let container = GenericImage::new(image_name, "latest") .with_exposed_port(port.tcp()) .with_wait_for(WaitFor::message_on_stdout(wait_message)) .start() .await .unwrap(); - + let host_port = container.get_host_port_ipv4(port).await.unwrap(); let url = format!("amqp://guest:guest@localhost:{}", host_port); - + $consumer(url).await; } }; } - rabbitmq_container_test!(test_rabbitmq_startup, (|url: String| async move { - println!("RabbitMQ started with the url: {}", url); - })); - - rabbitmq_container_test!(test_rabbitmq_connection, (|url: String| async move { - build_connection(&*url).await; - })); - + rabbitmq_container_test!( + test_rabbitmq_startup, + (|url: String| async move { + println!("RabbitMQ started with the url: {}", url); + }) + ); + + rabbitmq_container_test!( + test_rabbitmq_connection, + (|url: String| async move { + build_connection(&*url).await; + }) + ); } diff --git a/src/flow_queue/delegate.rs b/src/flow_queue/delegate.rs deleted file mode 100644 index 7e03a0f..0000000 --- a/src/flow_queue/delegate.rs +++ /dev/null @@ -1,55 +0,0 @@ -use lapin::message::{Delivery, DeliveryResult}; -use lapin::ConsumerDelegate; -use log::debug; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; - -/// Delegate trait to implement. -pub trait Delegate: Sync + Send { - fn handle_delivery(&self, delivery: Delivery); -} - -pub struct QueueDelegate { - pub delegate: Arc, // Use Arc for safe ownership transfer -} - -impl QueueDelegate { - pub fn new(delegate: T) -> Self { - QueueDelegate { - delegate: Arc::new(delegate), - } - } - - pub fn deliver(&self, delivery: Delivery) { - self.delegate.handle_delivery(delivery); - } -} - -impl ConsumerDelegate for QueueDelegate { - fn on_new_delivery( - &self, - delivery: DeliveryResult, - ) -> Pin + Send + 'static>> { - let delegate = Arc::clone(&self.delegate); - - Box::pin(async move { - let optional_delivery = match delivery { - Ok(option) => option, - Err(_) => return, - }; - let delivery = match optional_delivery { - Some(del) => del, - None => return, - }; - - delegate.handle_delivery(delivery); // Use cloned delegate - }) - } - - fn drop_prefetched_messages(&self) -> Pin + Send + 'static>> { - Box::pin(async move { - debug!("Dropping prefetched messages..."); - }) - } -} diff --git a/src/flow_queue/handler.rs b/src/flow_queue/handler.rs deleted file mode 100644 index 9448b07..0000000 --- a/src/flow_queue/handler.rs +++ /dev/null @@ -1,119 +0,0 @@ -use crate::flow_queue::connection::FlowChannel; -use crate::flow_queue::delegate::{Delegate, QueueDelegate}; -use crate::flow_queue::name::QueueName; -use lapin::options::{BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions}; -use lapin::types::FieldTable; -use lapin::Error; -use log::info; -use std::sync::Arc; - -/// # Declares all given queues -/// -/// ## Expected behavior -/// If a queue cannot be created, the services stops -pub async fn declare_queues(flow_channel: FlowChannel, names: Vec) { - let channel_arc = flow_channel.lock().await; - for name in names { - let channel_name = name.prefix + name.protocol; - let queue_result = channel_arc - .queue_declare( - &*channel_name, - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .await; - - match queue_result { - Ok(_) => { - info!("Declared queue: {}", channel_name) - } - Err(error) => { - panic!("Cannot declare queue: {}, Reason: {}", channel_name, error); - } - }; - } -} - -/// Sends a message into a queue -pub async fn send_message( - flow_channel: FlowChannel, - queue_name: QueueName, - payload: &Vec, -) -> Result { - let channel_arc = flow_channel.lock().await; - let name = queue_name.prefix + queue_name.protocol; - - let result = channel_arc - .basic_publish( - "", - &*name, - BasicPublishOptions::default(), - payload, - Default::default(), - ) - .await; - - match result { - Ok(_) => Ok(true), - Err(error) => Err(error), - } -} - -/// Consumes a message -/// -/// Creates a delegate that waits on messages and consumes them. -/// -/// # Params -/// - channel: FlowChannel of the send message -/// - queue_name: Name of the Queue that should be listened to -/// - delegate: Consumer delegate of the message -/// -/// # Example -/// ``` ignore -/// use lapin::message::Delivery; -/// use code0_flow::flow_queue::delegate::Delegate; -/// use code0_flow::flow_queue::connection::get_flow_channel; -/// use code0_flow::flow_queue::name::{QueueName, QueuePrefix, QueueProtocol}; -/// use code0_flow::flow_queue::handler::consume_message; -/// -/// struct HttpDelegate; -/// -/// impl Delegate for HttpDelegate { -/// fn handle_delivery(&self, delivery: Delivery) { -/// todo!("Handle delivery!") -/// } -/// } -/// -/// async fn main() { -/// let uri = "abc"; -/// let channel = get_flow_channel(uri).await; -/// let queue_name = QueueName { -/// prefix: QueuePrefix::Send, -/// protocol: QueueProtocol::Rest, -/// }; -/// -/// consume_message(channel, queue_name, HttpDelegate).await; -/// } -/// ``` -pub async fn consume_message( - channel: FlowChannel, - queue_name: QueueName, - delegate: T, -) { - let name = queue_name.prefix + queue_name.protocol; - let channel_arc = channel.lock().await; - - let consumer = channel_arc - .basic_consume( - &*name, - "", - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await - .unwrap(); - - consumer.set_delegate(QueueDelegate { - delegate: Arc::new(delegate), - }); -} diff --git a/src/flow_queue/mod.rs b/src/flow_queue/mod.rs index d373785..f20b456 100644 --- a/src/flow_queue/mod.rs +++ b/src/flow_queue/mod.rs @@ -1,4 +1,2 @@ pub mod connection; -pub mod name; -pub mod handler; -pub mod delegate; \ No newline at end of file +pub mod service; diff --git a/src/flow_queue/name.rs b/src/flow_queue/name.rs deleted file mode 100644 index 8aa3ce7..0000000 --- a/src/flow_queue/name.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::ops::Add; - -pub struct QueueName { - pub prefix: QueuePrefix, - pub protocol: QueueProtocol, -} - -/// # Queue Prefix -/// - Every incoming message will have the `Send` prefix -/// - Every processed message (answer) from taurus will have the `Receive` prefix -pub enum QueuePrefix { - Send, - Receive, -} - -/// Supported Protocols -pub enum QueueProtocol { - Rest, - WebSocket, -} - -/// Implementation to turn a protocol into a str -impl QueueProtocol { - /// Function to turn a protocol into a str - /// - /// # Example: - /// ``` - /// use code0_flow::flow_queue::name::QueueProtocol; - /// let proto_str = QueueProtocol::Rest.as_str().to_string(); - /// let result = "REST".to_string(); - /// - /// assert_eq!(result, proto_str); - /// ``` - pub fn as_str(&self) -> &'static str { - match self { - QueueProtocol::Rest => "REST", - QueueProtocol::WebSocket => "WS", - } - } -} - -/// Implementation to add a prefix and a protocol to a queue name -impl Add for QueuePrefix { - type Output = String; - - /// Function to add a prefix and a protocol to a queue name - /// - /// # Example: - /// ``` - /// use code0_flow::flow_queue::name::{QueuePrefix, QueueProtocol}; - /// let send_rest_queue_name = QueuePrefix::Send + QueueProtocol::Rest; - /// let result = "S_REST".to_string(); - /// - /// assert_eq!(result, send_rest_queue_name); - /// ``` - fn add(self, rhs: QueueProtocol) -> Self::Output { - match self { - QueuePrefix::Send => "S_".to_string() + rhs.as_str(), - QueuePrefix::Receive => "R_".to_string() + rhs.as_str(), - } - } -} diff --git a/src/flow_queue/service.rs b/src/flow_queue/service.rs new file mode 100644 index 0000000..e113428 --- /dev/null +++ b/src/flow_queue/service.rs @@ -0,0 +1,204 @@ +use std::{sync::Arc, time::Duration}; + +use futures_lite::StreamExt; +use lapin::{options::QueueDeclareOptions, types::FieldTable, Channel}; +use log::debug; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; + +use super::connection::build_connection; + +#[derive(Serialize, Deserialize)] +pub enum MessageType { + ExecuteFlow, + TestExecuteFlow, +} + +#[derive(Serialize, Deserialize)] +pub struct Sender { + pub name: String, + pub protocol: String, + pub version: String, +} + +#[derive(Serialize, Deserialize)] +pub struct Message { + pub message_type: MessageType, + pub sender: Sender, + pub timestamp: i64, + pub message_id: String, + pub body: String, +} + +pub struct RabbitmqClient { + pub channel: Arc>, +} + +#[derive(Debug)] +pub enum RabbitMqError { + LapinError(lapin::Error), + ConnectionError(String), + TimeoutError, + DeserializationError, +} + +impl From for RabbitMqError { + fn from(error: lapin::Error) -> Self { + RabbitMqError::LapinError(error) + } +} + +impl From for RabbitMqError { + fn from(error: std::io::Error) -> Self { + RabbitMqError::ConnectionError(error.to_string()) + } +} + +impl std::fmt::Display for RabbitMqError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RabbitMqError::LapinError(err) => write!(f, "RabbitMQ error: {}", err), + RabbitMqError::ConnectionError(msg) => write!(f, "Connection error: {}", msg), + RabbitMqError::TimeoutError => write!(f, "Operation timed out"), + RabbitMqError::DeserializationError => write!(f, "Failed to deserialize message"), + } + } +} + +impl RabbitmqClient { + // Create a new RabbitMQ client with channel + pub async fn new(rabbitmq_url: &str) -> Self { + let connection = build_connection(rabbitmq_url).await; + let channel = connection.create_channel().await.unwrap(); + + match channel + .queue_declare( + "send_queue", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + { + Ok(_) => (), + Err(err) => log::error!("Failed to declare send_queue: {}", err), + } + + match channel + .queue_declare( + "recieve_queue", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + { + Ok(_) => (), + Err(err) => log::error!("Failed to declare recieve_queue: {}", err), + } + + RabbitmqClient { + channel: Arc::new(Mutex::new(channel)), + } + } + + // Send message to the queue + pub async fn send_message( + &self, + message_json: String, + queue_name: &str, + ) -> Result<(), lapin::Error> { + let channel = self.channel.lock().await; + + channel + .basic_publish( + "", // exchange + queue_name, // routing key (queue name) + lapin::options::BasicPublishOptions::default(), + message_json.as_bytes(), + lapin::BasicProperties::default(), + ) + .await?; + + Ok(()) + } + + // Receive messages from a queue + pub async fn await_message( + &self, + queue_name: &str, + message_id: String, + timeout: Duration, + ack_on_success: bool, + ) -> Result { + let mut consumer = { + let channel = self.channel.lock().await; + + let consumer_res = channel + .basic_consume( + queue_name, + "consumer", + lapin::options::BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await; + + match consumer_res { + Ok(consumer) => consumer, + Err(err) => panic!("{}", err), + } + }; + + debug!("Starting to consume from {}", queue_name); + + // Create a future for the next message + let receive_future = async { + while let Some(delivery_result) = consumer.next().await { + let delivery = match delivery_result { + Ok(del) => del, + Err(_) => return Err(RabbitMqError::DeserializationError), + }; + let data = &delivery.data; + let message_str = match std::str::from_utf8(&data) { + Ok(str) => str, + Err(_) => { + return Err(RabbitMqError::DeserializationError); + } + }; + + debug!("Received message: {}", message_str); + + // Parse the message + let message = match serde_json::from_str::(message_str) { + Ok(m) => m, + Err(e) => { + log::error!("Failed to parse message: {}", e); + return Err(RabbitMqError::DeserializationError); + } + }; + + if message.message_id == message_id { + if ack_on_success { + delivery + .ack(lapin::options::BasicAckOptions::default()) + .await + .expect("Failed to acknowledge message"); + } + + return Ok(message); + } + } + Err(RabbitMqError::DeserializationError) + }; + + // Set a timeout of 10 seconds + match tokio::time::timeout(timeout, receive_future).await { + Ok(result) => result, + Err(_) => { + debug!( + "Timeout waiting for message after {} seconds", + timeout.as_millis() / 1000 + ); + Err(RabbitMqError::TimeoutError) + } + } + } +} diff --git a/src/flow_store/service.rs b/src/flow_store/service.rs index e43e1ff..f2b416a 100644 --- a/src/flow_store/service.rs +++ b/src/flow_store/service.rs @@ -1,20 +1,18 @@ use crate::flow_store::connection::FlowStore; use async_trait::async_trait; use log::error; -use redis::aio::ConnectionLike; use redis::{AsyncCommands, JsonAsyncCommands, RedisError, RedisResult}; -use serde_json::to_string; use tucana::shared::{Flow, Flows}; #[derive(Debug)] pub struct FlowStoreError { - kind: FlowStoreErrorKind, - flow_id: i64, - reason: String, + pub kind: FlowStoreErrorKind, + pub flow_id: i64, + pub reason: String, } #[derive(Debug)] -enum FlowStoreErrorKind { +pub enum FlowStoreErrorKind { Serialization, RedisOperation, } @@ -164,7 +162,6 @@ mod tests { let connection = create_flow_store_connection(url).await; { - use redis::AsyncCommands; let mut con = connection.lock().await; let _: () = redis::cmd("FLUSHALL") diff --git a/src/lib.rs b/src/lib.rs index 84093cf..0cc81d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,4 +2,4 @@ pub mod flow_store; #[cfg(feature = "flow_queue")] -pub mod flow_queue; \ No newline at end of file +pub mod flow_queue;