From 69614e8ce108a621c0aeab4e6463b74207e224e5 Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 5 May 2025 17:50:09 +0200 Subject: [PATCH 1/2] feat: added function to determent the flow id --- src/flow_store/flow_identifier.rs | 142 ++++++++++++++++++++++++++++++ src/flow_store/mod.rs | 3 +- 2 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 src/flow_store/flow_identifier.rs diff --git a/src/flow_store/flow_identifier.rs b/src/flow_store/flow_identifier.rs new file mode 100644 index 0000000..a35d275 --- /dev/null +++ b/src/flow_store/flow_identifier.rs @@ -0,0 +1,142 @@ +use tucana::shared::{value::Kind, Flow, FlowSetting}; + +fn extract_field(settings: &[FlowSetting], def_key: &str, field_name: &str) -> Option { + settings.iter().find_map(|setting| { + let def = setting.definition.as_ref()?; + if def.key != def_key { + return None; + } + + let obj = setting.object.as_ref()?; + obj.fields.iter().find_map(|(k, v)| { + if k == field_name { + if let Some(Kind::StringValue(s)) = &v.kind { + return Some(s.clone()); + } + } + None + }) + }) +} + +/// Every flow identifier needs to start with its +/// flow_id::project_id::protocol_specific_fields +pub fn get_flow_identifier(flow: &Flow) -> Option { + match flow.r#type.as_str() { + "REST" => { + let method = extract_field(&flow.settings, "HTTP_METHOD", "method"); + let host = extract_field(&flow.settings, "HTTP_HOST", "host"); + + let (method, host) = match (method, host) { + (Some(m), Some(h)) => (m, h), + missing => { + eprintln!("missing settings: {:?}", missing); + return None; + } + }; + + Some(format!( + "{}::{}::{}::{}", + flow.flow_id, flow.project_id, host, method + )) + } + _ => return None, + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + + use tucana::shared::{Flow, FlowSetting, FlowSettingDefinition, Struct}; + + use super::get_flow_identifier; + + fn get_string_value(value: &str) -> tucana::shared::Value { + tucana::shared::Value { + kind: Some(tucana::shared::value::Kind::StringValue(String::from( + value, + ))), + } + } + + #[test] + fn test_incorrect_flow_type_id() { + let unkown = Flow { + starting_node: None, + flow_id: 1, + project_id: 1, + r#type: "UNKOWN_FLOW_TYPE_IDENTIFIER".to_string(), + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + settings: vec![], + }; + + assert!(get_flow_identifier(&unkown).is_none()) + } + + #[test] + fn test_rest_flow_type_id_is_correct() { + let rest = Flow { + starting_node: None, + flow_id: 1, + project_id: 1, + r#type: "REST".to_string(), + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + settings: vec![ + FlowSetting { + definition: Some(FlowSettingDefinition { + id: String::from("1424525"), + key: String::from("HTTP_HOST"), + }), + object: Some(Struct { + fields: { + let mut map = HashMap::new(); + map.insert(String::from("host"), get_string_value("abc.code0.tech")); + map + }, + }), + }, + FlowSetting { + definition: Some(FlowSettingDefinition { + id: String::from("14245252352"), + key: String::from("HTTP_METHOD"), + }), + object: Some(Struct { + fields: { + let mut map = HashMap::new(); + map.insert(String::from("method"), get_string_value("GET")); + map + }, + }), + }, + ], + }; + + let id = get_flow_identifier(&rest); + + assert!(id.is_some()); + assert_eq!(id.unwrap(), String::from("1::1::abc.code0.tech::GET")) + } + + #[test] + fn test_rest_flow_type_id_with_missing_settings_fails() { + let rest = Flow { + starting_node: None, + flow_id: 1, + project_id: 1, + r#type: "REST".to_string(), + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + settings: vec![], + }; + + let id = get_flow_identifier(&rest); + + assert!(id.is_none()); + } +} diff --git a/src/flow_store/mod.rs b/src/flow_store/mod.rs index 9def19c..c2c4bab 100644 --- a/src/flow_store/mod.rs +++ b/src/flow_store/mod.rs @@ -1,2 +1,3 @@ +pub mod connection; +mod flow_identifier; pub mod service; -pub mod connection; \ No newline at end of file From f061e4f74cf646bb94ee3231e4e8988dbc8a8b44 Mon Sep 17 00:00:00 2001 From: Raphael Date: Mon, 5 May 2025 17:50:21 +0200 Subject: [PATCH 2/2] feat: adjusted tests & inserting logic --- src/flow_store/service.rs | 187 ++++++++++++++++++++++++++++---------- 1 file changed, 140 insertions(+), 47 deletions(-) diff --git a/src/flow_store/service.rs b/src/flow_store/service.rs index 99ad49d..eac084f 100644 --- a/src/flow_store/service.rs +++ b/src/flow_store/service.rs @@ -1,3 +1,4 @@ +use super::flow_identifier; use crate::flow_store::connection::FlowStore; use async_trait::async_trait; use log::error; @@ -15,6 +16,7 @@ pub struct FlowStoreError { pub enum FlowStoreErrorKind { Serialization, RedisOperation, + NoIdentifier, } /// Trait representing a service for managing flows in a Redis. @@ -46,9 +48,18 @@ impl FlowStoreServiceBase for FlowStoreService { async fn insert_flow(&mut self, flow: Flow) -> Result { let mut connection = self.redis_client_arc.lock().await; - let insert_result: RedisResult<()> = connection - .json_set(flow.flow_id.to_string(), "$", &flow) - .await; + let identifier = match flow_identifier::get_flow_identifier(&flow) { + Some(id) => id, + None => { + return Err(FlowStoreError { + kind: FlowStoreErrorKind::NoIdentifier, + flow_id: flow.flow_id, + reason: String::from("Identifier can't be determent!"), + }); + } + }; + + let insert_result: RedisResult<()> = connection.json_set(identifier, "$", &flow).await; match insert_result { Err(redis_error) => { @@ -78,7 +89,10 @@ impl FlowStoreServiceBase for FlowStoreService { /// Deletes a flow async fn delete_flow(&mut self, flow_id: i64) -> Result { let mut connection = self.redis_client_arc.lock().await; - let deleted_flow: RedisResult = connection.json_del(flow_id, ".").await; + + let identifier = format!("{}::*", flow_id); + let keys: Vec = connection.keys(&identifier).await?; + let deleted_flow: RedisResult = connection.json_del(keys, ".").await; match deleted_flow { Ok(int) => Ok(int), @@ -116,7 +130,18 @@ impl FlowStoreServiceBase for FlowStoreService { } }; - let int_keys: Vec = string_keys + let mut real_keys: Vec = vec![]; + + for key in string_keys { + if key.contains("::") { + let number = key.splitn(2, "::").next(); + if let Some(real_number) = number { + real_keys.push(String::from(real_number)); + } + } + } + + let int_keys: Vec = real_keys .into_iter() .filter_map(|key| key.parse::().ok()) .collect(); @@ -180,6 +205,8 @@ impl FlowStoreServiceBase for FlowStoreService { #[cfg(test)] mod tests { + use std::collections::HashMap; + use crate::flow_store::connection::create_flow_store_connection; use crate::flow_store::connection::FlowStore; use crate::flow_store::service::FlowStoreService; @@ -190,8 +217,50 @@ mod tests { use testcontainers::core::WaitFor; use testcontainers::runners::AsyncRunner; use testcontainers::GenericImage; + use tucana::shared::FlowSetting; + use tucana::shared::FlowSettingDefinition; + use tucana::shared::Struct; use tucana::shared::{Flow, Flows}; + fn get_string_value(value: &str) -> tucana::shared::Value { + tucana::shared::Value { + kind: Some(tucana::shared::value::Kind::StringValue(String::from( + value, + ))), + } + } + + fn get_settings() -> Vec { + vec![ + FlowSetting { + definition: Some(FlowSettingDefinition { + id: String::from("1424525"), + key: String::from("HTTP_HOST"), + }), + object: Some(Struct { + fields: { + let mut map = HashMap::new(); + map.insert(String::from("host"), get_string_value("abc.code0.tech")); + map + }, + }), + }, + FlowSetting { + definition: Some(FlowSettingDefinition { + id: String::from("14245252352"), + key: String::from("HTTP_METHOD"), + }), + object: Some(Struct { + fields: { + let mut map = HashMap::new(); + map.insert(String::from("method"), get_string_value("GET")); + map + }, + }), + }, + ] + } + macro_rules! redis_integration_test { ($test_name:ident, $consumer:expr) => { #[tokio::test] @@ -237,8 +306,8 @@ mod tests { (|connection: FlowStore, mut service: FlowStoreService| async move { let flow = Flow { flow_id: 1, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -253,7 +322,10 @@ mod tests { let redis_result: Option = { let mut redis_cmd = connection.lock().await; - redis_cmd.json_get("1", "$").await.unwrap() + redis_cmd + .json_get("1::1::abc.code0.tech::GET", "$") + .await + .unwrap() }; println!("{}", redis_result.clone().unwrap()); @@ -264,13 +336,31 @@ mod tests { }) ); + redis_integration_test!( + insert_one_flow_fails_no_identifier, + (|_connection: FlowStore, mut service: FlowStoreService| async move { + let flow = Flow { + flow_id: 1, + r#type: "".to_string(), + settings: get_settings(), + starting_node: None, + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + project_id: 1, + }; + + assert!(!service.insert_flow(flow.clone()).await.is_ok()); + }) + ); + redis_integration_test!( insert_will_overwrite_existing_flow, (|connection: FlowStore, mut service: FlowStoreService| async move { let flow = Flow { flow_id: 1, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), data_types: vec![], input_type_identifier: None, return_type_identifier: None, @@ -286,9 +376,9 @@ mod tests { let flow_overwrite = Flow { flow_id: 1, r#type: "REST".to_string(), - settings: vec![], + settings: get_settings(), data_types: vec![], - input_type_identifier: None, + input_type_identifier: Some(String::from("ABC")), return_type_identifier: None, project_id: 1, starting_node: None, @@ -300,13 +390,16 @@ mod tests { let redis_result: Vec = { let mut redis_cmd = connection.lock().await; - redis_cmd.json_get("1", "$").await.unwrap() + redis_cmd + .json_get("1::1::abc.code0.tech::GET", "$") + .await + .unwrap() }; assert_eq!(redis_result.len(), 1); let string: &str = &*redis_result[0]; let redis_flow: Vec = serde_json::from_str(string).unwrap(); - assert_eq!(redis_flow[0].r#type, "REST".to_string()); + assert!(redis_flow[0].r#input_type_identifier.is_some()); }) ); @@ -315,8 +408,8 @@ mod tests { (|_connection: FlowStore, mut service: FlowStoreService| async move { let flow_one = Flow { flow_id: 1, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), data_types: vec![], input_type_identifier: None, return_type_identifier: None, @@ -326,8 +419,8 @@ mod tests { let flow_two = Flow { flow_id: 2, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), data_types: vec![], input_type_identifier: None, return_type_identifier: None, @@ -337,8 +430,8 @@ mod tests { let flow_three = Flow { flow_id: 3, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -359,8 +452,8 @@ mod tests { (|connection: FlowStore, mut service: FlowStoreService| async move { let flow = Flow { flow_id: 1, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -399,8 +492,8 @@ mod tests { (|_connection: FlowStore, mut service: FlowStoreService| async move { let flow_one = Flow { flow_id: 1, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -410,8 +503,8 @@ mod tests { let flow_two = Flow { flow_id: 2, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -421,8 +514,8 @@ mod tests { let flow_three = Flow { flow_id: 3, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -454,8 +547,8 @@ mod tests { (|_connection: FlowStore, mut service: FlowStoreService| async move { let flow_one = Flow { flow_id: 1, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -465,8 +558,8 @@ mod tests { let flow_two = Flow { flow_id: 2, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -476,8 +569,8 @@ mod tests { let flow_three = Flow { flow_id: 3, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -520,8 +613,8 @@ mod tests { (|_connection: FlowStore, mut service: FlowStoreService| async move { let flow_one = Flow { flow_id: 1, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -531,8 +624,8 @@ mod tests { let flow_two = Flow { flow_id: 2, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -542,8 +635,8 @@ mod tests { let flow_three = Flow { flow_id: 3, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -576,8 +669,8 @@ mod tests { (|_connection: FlowStore, mut service: FlowStoreService| async move { let flow_one = Flow { flow_id: 1, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -587,8 +680,8 @@ mod tests { let flow_two = Flow { flow_id: 2, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -598,8 +691,8 @@ mod tests { let flow_three = Flow { flow_id: 3, - r#type: "".to_string(), - settings: vec![], + r#type: "REST".to_string(), + settings: get_settings(), starting_node: None, data_types: vec![], input_type_identifier: None, @@ -617,7 +710,7 @@ mod tests { let amount = service.insert_flows(flows.clone()).await.unwrap(); assert_eq!(amount, 3); - let query_flows = service.query_flows(String::from("1")).await; + let query_flows = service.query_flows(String::from("1::*")).await; assert!(query_flows.is_ok()); assert_eq!(query_flows.unwrap().flows, vec![flow_one])