From 7a31976efe50bdfdf4715cd7154fff8cd7b6f54b Mon Sep 17 00:00:00 2001 From: Raphael Date: Sun, 4 May 2025 19:50:43 +0200 Subject: [PATCH] feat: added query function for flows --- src/flow_store/service.rs | 257 ++++++++++++++++++++++++++++++++------ 1 file changed, 218 insertions(+), 39 deletions(-) diff --git a/src/flow_store/service.rs b/src/flow_store/service.rs index f2fb34a..99ad49d 100644 --- a/src/flow_store/service.rs +++ b/src/flow_store/service.rs @@ -26,6 +26,7 @@ pub trait FlowStoreServiceBase { async fn delete_flow(&mut self, flow_id: i64) -> Result; async fn delete_flows(&mut self, flow_ids: Vec) -> Result; async fn get_all_flow_ids(&mut self) -> Result, RedisError>; + async fn query_flows(&mut self, pattern: String) -> Result; } /// Struct representing a service for managing flows in a Redis. @@ -122,6 +123,59 @@ impl FlowStoreServiceBase for FlowStoreService { Ok(int_keys) } + + async fn query_flows(&mut self, pattern: String) -> Result { + let mut connection = self.redis_client_arc.lock().await; + + let keys: Vec = { + match connection.keys(pattern).await { + Ok(res) => res, + Err(error) => { + error!("Can't retrieve keys from redis. Reason: {error}"); + return Err(FlowStoreError { + kind: FlowStoreErrorKind::RedisOperation, + flow_id: 0, + reason: error.detail().unwrap().to_string(), + }); + } + } + }; + + if keys.is_empty() { + return Ok(Flows { flows: vec![] }); + } + + match connection + .json_get::, &str, Vec>(keys, "$") + .await + { + Ok(json_values) => { + let mut all_flows: Vec = Vec::new(); + + for json_str in json_values { + match serde_json::from_str::>(&json_str) { + Ok(mut flows) => all_flows.append(&mut flows), + Err(error) => { + return Err(FlowStoreError { + kind: FlowStoreErrorKind::Serialization, + flow_id: 0, + reason: error.to_string(), + }); + } + } + } + + return Ok(Flows { flows: all_flows }); + } + Err(error) => { + return Err(FlowStoreError { + kind: FlowStoreErrorKind::RedisOperation, + flow_id: 0, + reason: error.detail().unwrap_or("Unknown Redis error").to_string(), + }); + } + } + } } #[cfg(test)] @@ -210,45 +264,52 @@ mod tests { }) ); - // Broke after switching to redis :( need fix - // redis_integration_test!( - // insert_will_overwrite_existing_flow, - // (|connection: FlowStore, mut service: FlowStoreService| async move { - // let flow = Flow { - // flow_id: 1, - // r#type: "".to_string(), - // settings: vec![], - // starting_node: None, - // }; - // - // match service.insert_flow(flow.clone()).await { - // Ok(i) => println!("{}", i), - // Err(err) => println!("{}", err.reason), - // }; - // - // let flow_overwrite = Flow { - // flow_id: 1, - // r#type: "ABC".to_string(), - // settings: vec![], - // starting_node: None, - // }; - // - // let _ = service.insert_flow(flow_overwrite).await; - // let amount = service.get_all_flow_ids().await; - // assert_eq!(amount.unwrap().len(), 1); - // - // let redis_result: Vec = { - // let mut redis_cmd = connection.lock().await; - // redis_cmd.json_get("1", "$").await.unwrap() - // }; - // - // assert_eq!(redis_result.len(), 1); - // let string: &str = &*redis_result[0]; - // let redis_flow: Flow = serde_json::from_str(string).unwrap(); - // assert_eq!(redis_flow.r#type, "ABC".to_string()); - // }) - // ); - // + redis_integration_test!( + insert_will_overwrite_existing_flow, + (|connection: FlowStore, mut service: FlowStoreService| async move { + let flow = Flow { + flow_id: 1, + r#type: "".to_string(), + settings: vec![], + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + project_id: 1, + starting_node: None, + }; + + match service.insert_flow(flow.clone()).await { + Ok(i) => println!("{}", i), + Err(err) => println!("{}", err.reason), + }; + + let flow_overwrite = Flow { + flow_id: 1, + r#type: "REST".to_string(), + settings: vec![], + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + project_id: 1, + starting_node: None, + }; + + let _ = service.insert_flow(flow_overwrite).await; + let amount = service.get_all_flow_ids().await; + assert_eq!(amount.unwrap().len(), 1); + + let redis_result: Vec = { + let mut redis_cmd = connection.lock().await; + redis_cmd.json_get("1", "$").await.unwrap() + }; + + assert_eq!(redis_result.len(), 1); + let string: &str = &*redis_result[0]; + let redis_flow: Vec = serde_json::from_str(string).unwrap(); + assert_eq!(redis_flow[0].r#type, "REST".to_string()); + }) + ); + redis_integration_test!( insert_many_flows, (|_connection: FlowStore, mut service: FlowStoreService| async move { @@ -444,4 +505,122 @@ mod tests { assert_eq!(flow_ids.unwrap(), Vec::::new()); }) ); + + redis_integration_test!( + query_empty_flow_store, + (|_connection: FlowStore, mut service: FlowStoreService| async move { + let flows = service.query_flows(String::from("*")).await; + assert!(flows.is_ok()); + assert!(flows.unwrap().flows.is_empty()); + }) + ); + + redis_integration_test!( + query_all_flows, + (|_connection: FlowStore, mut service: FlowStoreService| async move { + let flow_one = Flow { + flow_id: 1, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + project_id: 1, + }; + + let flow_two = Flow { + flow_id: 2, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + project_id: 1, + }; + + let flow_three = Flow { + flow_id: 3, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + project_id: 1, + }; + + let flows = service.query_flows(String::from("*")).await; + assert!(flows.is_ok()); + assert!(flows.unwrap().flows.is_empty()); + + let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()]; + let flows = Flows { flows: flow_vec }; + + let amount = service.insert_flows(flows.clone()).await.unwrap(); + assert_eq!(amount, 3); + + let query_flows = service.query_flows(String::from("*")).await; + + println!("{:?}", &query_flows); + + assert!(query_flows.is_ok()); + + assert_eq!(flows.flows.len(), query_flows.unwrap().flows.len()) + }) + ); + + redis_integration_test!( + query_one_existing_flow, + (|_connection: FlowStore, mut service: FlowStoreService| async move { + let flow_one = Flow { + flow_id: 1, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + project_id: 1, + }; + + let flow_two = Flow { + flow_id: 2, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + project_id: 1, + }; + + let flow_three = Flow { + flow_id: 3, + r#type: "".to_string(), + settings: vec![], + starting_node: None, + data_types: vec![], + input_type_identifier: None, + return_type_identifier: None, + project_id: 1, + }; + + let flows = service.query_flows(String::from("*")).await; + assert!(flows.is_ok()); + assert!(flows.unwrap().flows.is_empty()); + + let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()]; + let flows = Flows { flows: flow_vec }; + + let amount = service.insert_flows(flows.clone()).await.unwrap(); + assert_eq!(amount, 3); + + let query_flows = service.query_flows(String::from("1")).await; + + assert!(query_flows.is_ok()); + assert_eq!(query_flows.unwrap().flows, vec![flow_one]) + }) + ); }