diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 12f2c2ca..c9f3ee5b 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -2508,6 +2508,7 @@ def export_graph( user_id: str | None = None, page: int | None = None, page_size: int | None = None, + filter: dict | None = None, **kwargs, ) -> dict[str, Any]: """ @@ -2518,6 +2519,13 @@ def export_graph( user_id (str, optional): User ID for filtering page (int, optional): Page number (starts from 1). If None, exports all data without pagination. page_size (int, optional): Number of items per page. If None, exports all data without pagination. + filter (dict, optional): Filter dictionary for metadata filtering. Supports "and", "or" logic and operators: + - "=": equality + - "in": value in list + - "contains": array contains value + - "gt", "lt", "gte", "lte": comparison operators + - "like": fuzzy matching + Example: {"and": [{"created_at": {"gte": "2025-01-01"}}, {"tags": {"contains": "AI"}}]} Returns: { @@ -2528,7 +2536,7 @@ def export_graph( } """ logger.info( - f"[export_graph] include_embedding: {include_embedding}, user_name: {user_name}, user_id: {user_id}, page: {page}, page_size: {page_size}" + f"[export_graph] include_embedding: {include_embedding}, user_name: {user_name}, user_id: {user_id}, page: {page}, page_size: {page_size}, filter: {filter}" ) user_id = user_id if user_id else self._get_config_value("user_id") @@ -2563,6 +2571,12 @@ def export_graph( f"ag_catalog.agtype_access_operator(properties, '\"user_id\"'::agtype) = '\"{user_id}\"'::agtype" ) + # Build filter conditions using common method + filter_conditions = self._build_filter_conditions_sql(filter) + logger.info(f"[export_graph] filter_conditions: {filter_conditions}") + if filter_conditions: + where_conditions.extend(filter_conditions) + where_clause = "" if where_conditions: where_clause = f"WHERE {' AND '.join(where_conditions)}" @@ -2652,6 +2666,22 @@ def export_graph( cypher_where_conditions.append(f"a.user_id = '{user_id}'") cypher_where_conditions.append(f"b.user_id = '{user_id}'") + # Build filter conditions for edges (apply to both source and target nodes) + filter_where_clause = self._build_filter_conditions_cypher(filter) + logger.info(f"[export_graph edges] filter_where_clause: {filter_where_clause}") + if filter_where_clause: + # _build_filter_conditions_cypher returns a string that starts with " AND " if filter exists + # Remove the leading " AND " and replace n. with a. for source node and b. for target node + filter_clause = filter_where_clause.strip() + if filter_clause.startswith("AND "): + filter_clause = filter_clause[4:].strip() + # Replace n. with a. for source node and create a copy for target node + source_filter = filter_clause.replace("n.", "a.") + target_filter = filter_clause.replace("n.", "b.") + # Combine source and target filters with AND + combined_filter = f"({source_filter}) AND ({target_filter})" + cypher_where_conditions.append(combined_filter) + cypher_where_clause = "" if cypher_where_conditions: cypher_where_clause = f"WHERE {' AND '.join(cypher_where_conditions)}" @@ -4416,70 +4446,133 @@ def build_cypher_filter_condition(condition_dict: dict) -> str: elif op == "in": # Handle in operator (for checking if field value is in a list) # Supports array format: {"field": {"in": ["value1", "value2"]}} - # Generates: n.field IN ['value1', 'value2'] or (n.field = 'value1' OR n.field = 'value2') + # For array fields (like file_ids, tags, sources), uses CONTAINS logic + # For scalar fields, uses equality or IN clause if not isinstance(op_value, list): raise ValueError( f"in operator only supports array format. " f"Use {{'{key}': {{'in': ['{op_value}']}}}} instead of {{'{key}': {{'in': '{op_value}'}}}}" ) + # Check if key is an array field + is_array_field = key in ("file_ids", "tags", "sources") + # Check if key starts with "info." prefix if key.startswith("info."): info_field = key[5:] # Remove "info." prefix - # Build OR conditions for nested properties (Apache AGE compatibility) + # Check if info field is an array field + is_info_array = info_field in ("tags", "sources", "file_ids") + if len(op_value) == 0: # Empty list means no match condition_parts.append("false") elif len(op_value) == 1: - # Single value, use equality + # Single value item = op_value[0] - if isinstance(item, str): - escaped_value = escape_cypher_string(item) - condition_parts.append( - f"n.info.{info_field} = '{escaped_value}'" - ) + if is_info_array: + # For array fields, use CONTAINS (value IN array_field) + if isinstance(item, str): + escaped_value = escape_cypher_string(item) + condition_parts.append( + f"'{escaped_value}' IN n.info.{info_field}" + ) + else: + condition_parts.append( + f"{item} IN n.info.{info_field}" + ) else: - condition_parts.append(f"n.info.{info_field} = {item}") - else: - # Multiple values, use OR conditions instead of IN (Apache AGE compatibility) - or_conditions = [] - for item in op_value: + # For scalar fields, use equality if isinstance(item, str): escaped_value = escape_cypher_string(item) - or_conditions.append( + condition_parts.append( f"n.info.{info_field} = '{escaped_value}'" ) else: - or_conditions.append( + condition_parts.append( f"n.info.{info_field} = {item}" ) + else: + # Multiple values, use OR conditions + or_conditions = [] + for item in op_value: + if is_info_array: + # For array fields, use CONTAINS (value IN array_field) + if isinstance(item, str): + escaped_value = escape_cypher_string(item) + or_conditions.append( + f"'{escaped_value}' IN n.info.{info_field}" + ) + else: + or_conditions.append( + f"{item} IN n.info.{info_field}" + ) + else: + # For scalar fields, use equality + if isinstance(item, str): + escaped_value = escape_cypher_string(item) + or_conditions.append( + f"n.info.{info_field} = '{escaped_value}'" + ) + else: + or_conditions.append( + f"n.info.{info_field} = {item}" + ) if or_conditions: condition_parts.append( f"({' OR '.join(or_conditions)})" ) else: # Direct property access - # Build array for IN clause or OR conditions if len(op_value) == 0: # Empty list means no match condition_parts.append("false") elif len(op_value) == 1: - # Single value, use equality + # Single value item = op_value[0] - if isinstance(item, str): - escaped_value = escape_cypher_string(item) - condition_parts.append(f"n.{key} = '{escaped_value}'") + if is_array_field: + # For array fields, use CONTAINS (value IN array_field) + if isinstance(item, str): + escaped_value = escape_cypher_string(item) + condition_parts.append( + f"'{escaped_value}' IN n.{key}" + ) + else: + condition_parts.append(f"{item} IN n.{key}") else: - condition_parts.append(f"n.{key} = {item}") + # For scalar fields, use equality + if isinstance(item, str): + escaped_value = escape_cypher_string(item) + condition_parts.append( + f"n.{key} = '{escaped_value}'" + ) + else: + condition_parts.append(f"n.{key} = {item}") else: - # Multiple values, use IN clause - escaped_items = [ - f"'{escape_cypher_string(str(item))}'" - if isinstance(item, str) - else str(item) - for item in op_value - ] - array_str = "[" + ", ".join(escaped_items) + "]" - condition_parts.append(f"n.{key} IN {array_str}") + # Multiple values + if is_array_field: + # For array fields, use OR conditions with CONTAINS + or_conditions = [] + for item in op_value: + if isinstance(item, str): + escaped_value = escape_cypher_string(item) + or_conditions.append( + f"'{escaped_value}' IN n.{key}" + ) + else: + or_conditions.append(f"{item} IN n.{key}") + if or_conditions: + condition_parts.append( + f"({' OR '.join(or_conditions)})" + ) + else: + # For scalar fields, use IN clause + escaped_items = [ + f"'{escape_cypher_string(str(item))}'" + if isinstance(item, str) + else str(item) + for item in op_value + ] + array_str = "[" + ", ".join(escaped_items) + "]" + condition_parts.append(f"n.{key} IN {array_str}") elif op == "like": # Handle like operator (for fuzzy matching, similar to SQL LIKE '%value%') # Check if key starts with "info." prefix @@ -4710,78 +4803,116 @@ def build_filter_condition(condition_dict: dict) -> str: elif op == "in": # Handle in operator (for checking if field value is in a list) # Supports array format: {"field": {"in": ["value1", "value2"]}} + # For array fields (like file_ids, tags, sources), uses @> operator (contains) + # For scalar fields, uses = operator (equality) if not isinstance(op_value, list): raise ValueError( f"in operator only supports array format. " f"Use {{'{key}': {{'in': ['{op_value}']}}}} instead of {{'{key}': {{'in': '{op_value}'}}}}" ) + # Check if key is an array field + is_array_field = key in ("file_ids", "tags", "sources") + # Check if key starts with "info." prefix if key.startswith("info."): info_field = key[5:] # Remove "info." prefix - # Build OR conditions for nested properties + # Check if info field is an array field + is_info_array = info_field in ("tags", "sources", "file_ids") + if len(op_value) == 0: # Empty list means no match condition_parts.append("false") elif len(op_value) == 1: - # Single value, use equality + # Single value item = op_value[0] - if isinstance(item, str): - escaped_value = escape_sql_string(item) + if is_info_array: + # For array fields, use @> operator (contains) + escaped_value = escape_sql_string(str(item)) condition_parts.append( - f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = '\"{escaped_value}\"'::agtype" + f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) @> '[\"{escaped_value}\"]'::agtype" ) else: - condition_parts.append( - f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = {item}::agtype" - ) - else: - # Multiple values, use OR conditions - or_conditions = [] - for item in op_value: + # For scalar fields, use equality if isinstance(item, str): escaped_value = escape_sql_string(item) - or_conditions.append( + condition_parts.append( f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = '\"{escaped_value}\"'::agtype" ) else: - or_conditions.append( + condition_parts.append( f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = {item}::agtype" ) + else: + # Multiple values, use OR conditions + or_conditions = [] + for item in op_value: + if is_info_array: + # For array fields, use @> operator (contains) to check if array contains the value + escaped_value = escape_sql_string(str(item)) + or_conditions.append( + f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) @> '[\"{escaped_value}\"]'::agtype" + ) + else: + # For scalar fields, use equality + if isinstance(item, str): + escaped_value = escape_sql_string(item) + or_conditions.append( + f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = '\"{escaped_value}\"'::agtype" + ) + else: + or_conditions.append( + f"ag_catalog.agtype_access_operator(VARIADIC ARRAY[properties, '\"info\"'::ag_catalog.agtype, '\"{info_field}\"'::ag_catalog.agtype]) = {item}::agtype" + ) if or_conditions: condition_parts.append( f"({' OR '.join(or_conditions)})" ) else: # Direct property access - # Build OR conditions if len(op_value) == 0: # Empty list means no match condition_parts.append("false") elif len(op_value) == 1: - # Single value, use equality + # Single value item = op_value[0] - if isinstance(item, str): - escaped_value = escape_sql_string(item) + if is_array_field: + # For array fields, use @> operator (contains) + escaped_value = escape_sql_string(str(item)) condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" + f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '[\"{escaped_value}\"]'::agtype" ) else: - condition_parts.append( - f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {item}::agtype" - ) - else: - # Multiple values, use OR conditions - or_conditions = [] - for item in op_value: + # For scalar fields, use equality if isinstance(item, str): escaped_value = escape_sql_string(item) - or_conditions.append( + condition_parts.append( f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" ) else: - or_conditions.append( + condition_parts.append( f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {item}::agtype" ) + else: + # Multiple values, use OR conditions + or_conditions = [] + for item in op_value: + if is_array_field: + # For array fields, use @> operator (contains) to check if array contains the value + escaped_value = escape_sql_string(str(item)) + or_conditions.append( + f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) @> '[\"{escaped_value}\"]'::agtype" + ) + else: + # For scalar fields, use equality + if isinstance(item, str): + escaped_value = escape_sql_string(item) + or_conditions.append( + f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = '\"{escaped_value}\"'::agtype" + ) + else: + or_conditions.append( + f"ag_catalog.agtype_access_operator(properties, '\"{key}\"'::agtype) = {item}::agtype" + ) if or_conditions: condition_parts.append( f"({' OR '.join(or_conditions)})"