Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 218 additions & 39 deletions src/flow_store/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub trait FlowStoreServiceBase {
async fn delete_flow(&mut self, flow_id: i64) -> Result<i64, RedisError>;
async fn delete_flows(&mut self, flow_ids: Vec<i64>) -> Result<i64, RedisError>;
async fn get_all_flow_ids(&mut self) -> Result<Vec<i64>, RedisError>;
async fn query_flows(&mut self, pattern: String) -> Result<Flows, FlowStoreError>;
}

/// Struct representing a service for managing flows in a Redis.
Expand Down Expand Up @@ -122,6 +123,59 @@ impl FlowStoreServiceBase for FlowStoreService {

Ok(int_keys)
}

async fn query_flows(&mut self, pattern: String) -> Result<Flows, FlowStoreError> {
let mut connection = self.redis_client_arc.lock().await;

let keys: Vec<String> = {
match connection.keys(pattern).await {
Ok(res) => res,
Err(error) => {
error!("Can't retrieve keys from redis. Reason: {error}");
return Err(FlowStoreError {
kind: FlowStoreErrorKind::RedisOperation,
flow_id: 0,
reason: error.detail().unwrap().to_string(),
});
}
}
};

if keys.is_empty() {
return Ok(Flows { flows: vec![] });
}

match connection
.json_get::<Vec<String>, &str, Vec<String>>(keys, "$")
.await
{
Ok(json_values) => {
let mut all_flows: Vec<Flow> = Vec::new();

for json_str in json_values {
match serde_json::from_str::<Vec<Flow>>(&json_str) {
Ok(mut flows) => all_flows.append(&mut flows),
Err(error) => {
return Err(FlowStoreError {
kind: FlowStoreErrorKind::Serialization,
flow_id: 0,
reason: error.to_string(),
});
}
}
}

return Ok(Flows { flows: all_flows });
}
Err(error) => {
return Err(FlowStoreError {
kind: FlowStoreErrorKind::RedisOperation,
flow_id: 0,
reason: error.detail().unwrap_or("Unknown Redis error").to_string(),
});
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -210,45 +264,52 @@ mod tests {
})
);

// Broke after switching to redis :( need fix
// redis_integration_test!(
// insert_will_overwrite_existing_flow,
// (|connection: FlowStore, mut service: FlowStoreService| async move {
// let flow = Flow {
// flow_id: 1,
// r#type: "".to_string(),
// settings: vec![],
// starting_node: None,
// };
//
// match service.insert_flow(flow.clone()).await {
// Ok(i) => println!("{}", i),
// Err(err) => println!("{}", err.reason),
// };
//
// let flow_overwrite = Flow {
// flow_id: 1,
// r#type: "ABC".to_string(),
// settings: vec![],
// starting_node: None,
// };
//
// let _ = service.insert_flow(flow_overwrite).await;
// let amount = service.get_all_flow_ids().await;
// assert_eq!(amount.unwrap().len(), 1);
//
// let redis_result: Vec<String> = {
// let mut redis_cmd = connection.lock().await;
// redis_cmd.json_get("1", "$").await.unwrap()
// };
//
// assert_eq!(redis_result.len(), 1);
// let string: &str = &*redis_result[0];
// let redis_flow: Flow = serde_json::from_str(string).unwrap();
// assert_eq!(redis_flow.r#type, "ABC".to_string());
// })
// );
//
redis_integration_test!(
insert_will_overwrite_existing_flow,
(|connection: FlowStore, mut service: FlowStoreService| async move {
let flow = Flow {
flow_id: 1,
r#type: "".to_string(),
settings: vec![],
data_types: vec![],
input_type_identifier: None,
return_type_identifier: None,
project_id: 1,
starting_node: None,
};

match service.insert_flow(flow.clone()).await {
Ok(i) => println!("{}", i),
Err(err) => println!("{}", err.reason),
};

let flow_overwrite = Flow {
flow_id: 1,
r#type: "REST".to_string(),
settings: vec![],
data_types: vec![],
input_type_identifier: None,
return_type_identifier: None,
project_id: 1,
starting_node: None,
};

let _ = service.insert_flow(flow_overwrite).await;
let amount = service.get_all_flow_ids().await;
assert_eq!(amount.unwrap().len(), 1);

let redis_result: Vec<String> = {
let mut redis_cmd = connection.lock().await;
redis_cmd.json_get("1", "$").await.unwrap()
};

assert_eq!(redis_result.len(), 1);
let string: &str = &*redis_result[0];
let redis_flow: Vec<Flow> = serde_json::from_str(string).unwrap();
assert_eq!(redis_flow[0].r#type, "REST".to_string());
})
);

redis_integration_test!(
insert_many_flows,
(|_connection: FlowStore, mut service: FlowStoreService| async move {
Expand Down Expand Up @@ -444,4 +505,122 @@ mod tests {
assert_eq!(flow_ids.unwrap(), Vec::<i64>::new());
})
);

redis_integration_test!(
query_empty_flow_store,
(|_connection: FlowStore, mut service: FlowStoreService| async move {
let flows = service.query_flows(String::from("*")).await;
assert!(flows.is_ok());
assert!(flows.unwrap().flows.is_empty());
})
);

redis_integration_test!(
query_all_flows,
(|_connection: FlowStore, mut service: FlowStoreService| async move {
let flow_one = Flow {
flow_id: 1,
r#type: "".to_string(),
settings: vec![],
starting_node: None,
data_types: vec![],
input_type_identifier: None,
return_type_identifier: None,
project_id: 1,
};

let flow_two = Flow {
flow_id: 2,
r#type: "".to_string(),
settings: vec![],
starting_node: None,
data_types: vec![],
input_type_identifier: None,
return_type_identifier: None,
project_id: 1,
};

let flow_three = Flow {
flow_id: 3,
r#type: "".to_string(),
settings: vec![],
starting_node: None,
data_types: vec![],
input_type_identifier: None,
return_type_identifier: None,
project_id: 1,
};

let flows = service.query_flows(String::from("*")).await;
assert!(flows.is_ok());
assert!(flows.unwrap().flows.is_empty());

let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
let flows = Flows { flows: flow_vec };

let amount = service.insert_flows(flows.clone()).await.unwrap();
assert_eq!(amount, 3);

let query_flows = service.query_flows(String::from("*")).await;

println!("{:?}", &query_flows);

assert!(query_flows.is_ok());

assert_eq!(flows.flows.len(), query_flows.unwrap().flows.len())
})
);

redis_integration_test!(
query_one_existing_flow,
(|_connection: FlowStore, mut service: FlowStoreService| async move {
let flow_one = Flow {
flow_id: 1,
r#type: "".to_string(),
settings: vec![],
starting_node: None,
data_types: vec![],
input_type_identifier: None,
return_type_identifier: None,
project_id: 1,
};

let flow_two = Flow {
flow_id: 2,
r#type: "".to_string(),
settings: vec![],
starting_node: None,
data_types: vec![],
input_type_identifier: None,
return_type_identifier: None,
project_id: 1,
};

let flow_three = Flow {
flow_id: 3,
r#type: "".to_string(),
settings: vec![],
starting_node: None,
data_types: vec![],
input_type_identifier: None,
return_type_identifier: None,
project_id: 1,
};

let flows = service.query_flows(String::from("*")).await;
assert!(flows.is_ok());
assert!(flows.unwrap().flows.is_empty());

let flow_vec = vec![flow_one.clone(), flow_two.clone(), flow_three.clone()];
let flows = Flows { flows: flow_vec };

let amount = service.insert_flows(flows.clone()).await.unwrap();
assert_eq!(amount, 3);

let query_flows = service.query_flows(String::from("1")).await;

assert!(query_flows.is_ok());
assert_eq!(query_flows.unwrap().flows, vec![flow_one])
})
);
}