From fbc56c3a859e499a7bfa060a3cfb4547b9098fd5 Mon Sep 17 00:00:00 2001 From: Raphael Date: Sat, 15 Feb 2025 14:54:26 +0100 Subject: [PATCH 1/3] dependencies: added serial tests as dev dependencies to run tests sequentially --- Cargo.lock | 41 +++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + 2 files changed, 42 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 4642f0d..30722a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -620,6 +620,7 @@ dependencies = [ "log", "redis", "serde_json", + "serial_test", "testcontainers", "tokio", "tucana", @@ -2458,6 +2459,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "scc" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea091f6cac2595aa38993f04f4ee692ed43757035c36e67c180b6828356385b1" +dependencies = [ + "sdd", +] + [[package]] name = "schannel" version = "0.1.27" @@ -2484,6 +2494,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "sdd" +version = "3.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b07779b9b918cc05650cb30f404d4d7835d26df37c235eded8a6832e2fb82cca" + [[package]] name = "security-framework" version = "2.11.1" @@ -2605,6 +2621,31 @@ dependencies = [ "syn", ] +[[package]] +name = "serial_test" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b258109f244e1d6891bf1053a55d63a5cd4f8f4c30cf9a1280989f80e7a1fa9" +dependencies = [ + "futures", + "log", + "once_cell", + "parking_lot", + "scc", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "sha1" version = "0.10.6" diff --git a/Cargo.toml b/Cargo.toml index ade7f7b..11bb231 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ lapin = "2.5.0" [dev-dependencies] testcontainers = "0.23.2" +serial_test = "3.2.0" [lib] doctest = true From 8cfe54cff4c0a2102835ea5b037324e680516c0d Mon Sep 17 00:00:00 2001 From: Raphael Date: Sat, 15 Feb 2025 14:56:31 +0100 Subject: [PATCH 2/3] feat: added ping test --- src/flow_store/connection.rs | 70 ++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 23 deletions(-) diff --git a/src/flow_store/connection.rs b/src/flow_store/connection.rs index 91329ad..81b0435 100644 --- a/src/flow_store/connection.rs +++ b/src/flow_store/connection.rs @@ -1,66 +1,90 @@ -use std::sync::Arc; use redis::aio::MultiplexedConnection; -use redis::{Client}; +use redis::Client; +use std::sync::Arc; use tokio::sync::Mutex; pub type FlowStore = Arc>>; -fn build_connection(redis_url: String) -> Client { +pub fn build_connection(redis_url: String) -> Client { match Client::open(redis_url) { Ok(client) => client, - Err(con_error) => panic!("Cannot create FlowStore (Redis) connection! Reason: {}", con_error), + Err(con_error) => panic!( + "Cannot create Client (Redis) connection! Reason: {}", + con_error + ), } } pub async fn create_flow_store_connection(url: String) -> FlowStore { - let client = match build_connection(url).get_multiplexed_async_connection().await { + let client = match build_connection(url) + .get_multiplexed_async_connection() + .await + { Ok(connection) => connection, - Err(error) => panic!("Cannot create FlowStore (Redis) connection! Reason: {}", error), + Err(error) => panic!( + "Cannot create FlowStore (Redis) connection! Reason: {}", + error + ), }; - + Arc::new(Mutex::new(Box::new(client))) } - #[cfg(test)] mod tests { + use crate::flow_store::connection::create_flow_store_connection; + use redis::{AsyncCommands, RedisResult}; + use serial_test::serial; use testcontainers::core::IntoContainerPort; + use testcontainers::core::WaitFor; use testcontainers::runners::AsyncRunner; use testcontainers::GenericImage; - use testcontainers::core::WaitFor; - use crate::flow_store::connection::build_connection; macro_rules! redis_container_test { ($test_name:ident, $consumer:expr) => { - #[tokio::test] + #[serial] async fn $test_name() { let port: u16 = 6379; let image_name = "redis"; let wait_message = "Ready to accept connections"; - + let container = GenericImage::new(image_name, "latest") .with_exposed_port(port.tcp()) .with_wait_for(WaitFor::message_on_stdout(wait_message)) .start() .await .unwrap(); - + let host = container.get_host().await.unwrap(); let host_port = container.get_host_port_ipv4(port).await.unwrap(); let url = format!("redis://{host}:{host_port}"); - + $consumer(url).await; + + let _ = container.stop().await; } }; } - - redis_container_test!(test_redis_startup, (|url: String| async move { - println!("Redis server started correctly on: {}", url); - })); - - redis_container_test!(test_redis_connection, (|url: String| async move { - let result = build_connection(url).get_connection(); - assert!(result.is_ok()); - })); + + redis_container_test!( + test_redis_startup, + (|url: String| async move { + println!("Redis server started correctly on: {}", url); + }) + ); + + redis_container_test!( + test_redis_ping, + (|url: String| async move { + println!("Redis server started correctly on: {}", url.clone()); + + let flow_store = create_flow_store_connection(url.clone()).await; + let mut con = flow_store.lock().await; + + let ping_res: RedisResult = con.ping().await; + assert!(ping_res.is_ok()); + assert_eq!(ping_res.unwrap(), "PONG"); + }) + ); } From e1f18dd1e6e82927f7ddbb0e2464a344173f6add Mon Sep 17 00:00:00 2001 From: Raphael Date: Sat, 15 Feb 2025 14:56:45 +0100 Subject: [PATCH 3/3] feat: added flow store tests --- src/flow_store/service.rs | 304 +++++++++++++++++++++++++++++++++++--- 1 file changed, 283 insertions(+), 21 deletions(-) diff --git a/src/flow_store/service.rs b/src/flow_store/service.rs index 093cf40..f2919c6 100644 --- a/src/flow_store/service.rs +++ b/src/flow_store/service.rs @@ -1,8 +1,8 @@ +use crate::flow_store::connection::FlowStore; use async_trait::async_trait; -use log::{debug, error}; -use redis::{AsyncCommands, RedisError}; +use log::error; +use redis::{AsyncCommands, RedisError, RedisResult}; use tucana::sagittarius::{Flow, Flows}; -use crate::flow_store::connection::FlowStore; #[derive(Debug)] pub struct FlowStoreError { @@ -19,7 +19,7 @@ enum FlowStoreErrorKind { /// Trait representing a service for managing flows in a Redis. #[async_trait] -pub trait FlowStoreService { +pub trait FlowStoreServiceBase { async fn new(redis_client_arc: FlowStore) -> Self; async fn insert_flow(&mut self, flow: Flow) -> Result; async fn insert_flows(&mut self, flows: Flows) -> Result; @@ -29,15 +29,16 @@ pub trait FlowStoreService { } /// Struct representing a service for managing flows in a Redis. -struct FlowServiceBase { +#[derive(Clone)] +pub struct FlowStoreService { pub(crate) redis_client_arc: FlowStore, } /// Implementation of a service for managing flows in a Redis. #[async_trait] -impl FlowStoreService for FlowServiceBase { - async fn new(redis_client_arc: FlowStore) -> Self { - Self { redis_client_arc } +impl FlowStoreServiceBase for FlowStoreService { + async fn new(redis_client_arc: FlowStore) -> FlowStoreService { + FlowStoreService { redis_client_arc } } /// Insert a list of flows into Redis @@ -56,15 +57,9 @@ impl FlowStoreService for FlowServiceBase { } }; - let parsed_flow = connection - .set::(flow.flow_id.to_string(), serialized_flow) - .await; + let insert_result: RedisResult<()> = connection.set(flow.flow_id, serialized_flow).await; - match parsed_flow { - Ok(modified) => { - debug!("Inserted flow"); - Ok(modified) - } + match insert_result { Err(redis_error) => { error!("An Error occurred {}", redis_error); Err(FlowStoreError { @@ -73,6 +68,7 @@ impl FlowStoreService for FlowServiceBase { reason: redis_error.to_string(), }) } + _ => Ok(1), } } @@ -91,13 +87,10 @@ impl FlowStoreService for FlowServiceBase { /// Deletes a flow async fn delete_flow(&mut self, flow_id: i64) -> Result { let mut connection = self.redis_client_arc.lock().await; - let deleted_flow = connection.del::(flow_id).await; + let deleted_flow: RedisResult = connection.del(flow_id).await; match deleted_flow { - Ok(changed_amount) => { - debug!("{} flows where deleted", changed_amount); - deleted_flow - } + Ok(int) => Ok(int), Err(redis_error) => { error!("An Error occurred {}", redis_error); Err(redis_error) @@ -140,3 +133,272 @@ impl FlowStoreService for FlowServiceBase { Ok(int_keys) } } + +#[cfg(test)] +mod tests { + use crate::flow_store::connection::create_flow_store_connection; + use crate::flow_store::connection::FlowStore; + use crate::flow_store::service::FlowStoreService; + use crate::flow_store::service::FlowStoreServiceBase; + use redis::AsyncCommands; + use serial_test::serial; + use testcontainers::core::IntoContainerPort; + use testcontainers::core::WaitFor; + use testcontainers::runners::AsyncRunner; + use testcontainers::GenericImage; + use tucana::sagittarius::{Flow, Flows}; + + macro_rules! redis_integration_test { + ($test_name:ident, $consumer:expr) => { + #[tokio::test] + #[serial] + async fn $test_name() { + let port: u16 = 6379; + let image_name = "redis"; + let wait_message = "Ready to accept connections"; + + let container = GenericImage::new(image_name, "latest") + .with_exposed_port(port.tcp()) + .with_wait_for(WaitFor::message_on_stdout(wait_message)) + .start() + .await + .unwrap(); + + let host = container.get_host().await.unwrap(); + let host_port = container.get_host_port_ipv4(port).await.unwrap(); + let url = format!("redis://{host}:{host_port}"); + println!("Redis server started correctly on: {}", url.clone()); + + let connection = create_flow_store_connection(url).await; + let base = FlowStoreService::new(connection.clone()).await; + + $consumer(connection, base).await; + let _ = container.stop().await; + } + }; + } + + redis_integration_test!( + insert_one_flow, + (|connection: FlowStore, mut service: FlowStoreService| async move { + let flow = Flow { + flow_id: 1, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + match service.insert_flow(flow.clone()).await { + Ok(i) => println!("{}", i), + Err(err) => println!("{}", err.reason), + }; + + let redis_result: Option = { + let mut redis_cmd = connection.lock().await; + redis_cmd.get("1").await.unwrap() + }; + + println!("{}", redis_result.clone().unwrap()); + + assert!(redis_result.is_some()); + let redis_flow: Flow = serde_json::from_str(&*redis_result.unwrap()).unwrap(); + assert_eq!(redis_flow, flow); + }) + ); + + redis_integration_test!( + insert_will_overwrite_existing_flow, + (|connection: FlowStore, mut service: FlowStoreService| async move { + let flow = Flow { + flow_id: 1, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + match service.insert_flow(flow.clone()).await { + Ok(i) => println!("{}", i), + Err(err) => println!("{}", err.reason), + }; + + let flow_overwrite = Flow { + flow_id: 1, + r#type: "ABC".to_string(), + settings: vec![], + starting_node: None, + }; + + let _ = service.insert_flow(flow_overwrite).await; + let amount = service.get_all_flow_ids().await; + assert_eq!(amount.unwrap().len(), 1); + + let redis_result: Option = { + let mut redis_cmd = connection.lock().await; + redis_cmd.get("1").await.unwrap() + }; + + println!("{}", redis_result.clone().unwrap()); + + assert!(redis_result.is_some()); + let redis_flow: Flow = serde_json::from_str(&*redis_result.unwrap()).unwrap(); + assert_eq!(redis_flow.r#type, "ABC".to_string()); + }) + ); + + redis_integration_test!( + insert_many_flows, + (|_connection: FlowStore, mut service: FlowStoreService| async move { + let flow_one = Flow { + flow_id: 1, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + let flow_two = Flow { + flow_id: 2, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + let flow_three = Flow { + flow_id: 3, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()]; + let flows = Flows { flows: flow_vec }; + + let amount = service.insert_flows(flows).await.unwrap(); + assert_eq!(amount, 3); + }) + ); + + redis_integration_test!( + delete_one_existing_flow, + (|connection: FlowStore, mut service: FlowStoreService| async move { + let flow = Flow { + flow_id: 1, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + match service.insert_flow(flow.clone()).await { + Ok(i) => println!("{}", i), + Err(err) => println!("{}", err.reason), + }; + + let result = service.delete_flow(1).await; + + assert_eq!(result.unwrap(), 1); + + let redis_result: Option = { + let mut redis_cmd = connection.lock().await; + redis_cmd.get("1").await.unwrap() + }; + + assert!(redis_result.is_none()); + }) + ); + + redis_integration_test!( + delete_one_non_existing_flow, + (|_connection: FlowStore, mut service: FlowStoreService| async move { + let result = service.delete_flow(1).await; + assert_eq!(result.unwrap(), 0); + }) + ); + + redis_integration_test!( + delete_many_existing_flows, + (|_connection: FlowStore, mut service: FlowStoreService| async move { + let flow_one = Flow { + flow_id: 1, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + let flow_two = Flow { + flow_id: 2, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + let flow_three = Flow { + flow_id: 3, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()]; + let flows = Flows { flows: flow_vec }; + + let amount = service.insert_flows(flows).await.unwrap(); + assert_eq!(amount, 3); + + let deleted_amount = service.delete_flows(vec![1, 2, 3]).await; + assert_eq!(deleted_amount.unwrap(), 3); + }) + ); + + redis_integration_test!( + delete_many_non_existing_flows, + (|_connection: FlowStore, mut service: FlowStoreService| async move { + let deleted_amount = service.delete_flows(vec![1, 2, 3]).await; + assert_eq!(deleted_amount.unwrap(), 0); + }) + ); + + redis_integration_test!( + get_existing_flow_ids, + (|_connection: FlowStore, mut service: FlowStoreService| async move { + let flow_one = Flow { + flow_id: 1, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + let flow_two = Flow { + flow_id: 2, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + let flow_three = Flow { + flow_id: 3, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + }; + + let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()]; + let flows = Flows { flows: flow_vec }; + + let amount = service.insert_flows(flows).await.unwrap(); + assert_eq!(amount, 3); + + let mut flow_ids = service.get_all_flow_ids().await.unwrap(); + flow_ids.sort(); + + assert_eq!(flow_ids, vec![1, 2, 3]); + }) + ); + + redis_integration_test!( + get_empty_flow_ids, + (|_connection: FlowStore, mut service: FlowStoreService| async move { + let flow_ids = service.get_all_flow_ids().await; + assert_eq!(flow_ids.unwrap(), Vec::::new()); + }) + ); + +}