diff --git a/.gitignore b/.gitignore index 8319a4d2..ac31eb41 100644 --- a/.gitignore +++ b/.gitignore @@ -204,6 +204,7 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ +.trae # VSCode .vscode* diff --git a/docker/.env.example b/docker/.env.example index ee26c7bc..3674cd69 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -123,6 +123,7 @@ API_SCHEDULER_ON=true API_SEARCH_WINDOW_SIZE=5 # Specify how many rounds of previous conversations (history) to retrieve and consider during the 'hybrid search' (fast search+asynchronous fine search). This helps provide context aware search results API_SEARCH_HISTORY_TURNS=5 +MEMSCHEDULER_USE_REDIS_QUEUE=false ## Graph / vector stores # Neo4j database selection mode diff --git a/examples/mem_scheduler/quick_start_examples.py b/examples/mem_scheduler/quick_start_examples.py index fbfef4d7..724663be 100644 --- a/examples/mem_scheduler/quick_start_examples.py +++ b/examples/mem_scheduler/quick_start_examples.py @@ -146,7 +146,9 @@ def kv_cache_only(): def run_scheduler_example(): # 使用 MemScheduler 加载主 MOS(Memory-Oriented System)配置文件 - config = parse_yaml("./examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml") + config = parse_yaml( + f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml" + ) # 将解析出的配置字典传入 MOSConfig 构造器, 构建配置对象 mos_config = MOSConfig(**config) # 使用配置对象初始化 MOS 系统实例 @@ -159,12 +161,12 @@ def run_scheduler_example(): # 从 YAML 文件加载 MemCube(记忆立方体)的通用配置 config = GeneralMemCubeConfig.from_yaml_file( - "./examples/data/config/mem_scheduler/mem_cube_config.yaml" + f"{BASE_DIR}/examples/data/config/mem_scheduler/mem_cube_config.yaml" ) # 定义 MemCube 的唯一标识符 mem_cube_id = "mem_cube_5" # 定义 MemCube 的本地存储路径(路径中包含用户 ID 和 MemCube ID) - mem_cube_name_or_path = f"./outputs/mem_scheduler/{user_id}/{mem_cube_id}" + mem_cube_name_or_path = f"{BASE_DIR}/outputs/mem_scheduler/{user_id}/{mem_cube_id}" # 如果该路径已存在, 则先删除旧目录 if Path(mem_cube_name_or_path).exists(): diff --git a/examples/mem_scheduler/scheduler_for_async_tasks.py b/examples/mem_scheduler/scheduler_for_async_tasks.py index a767b57c..7f544c3d 100644 --- a/examples/mem_scheduler/scheduler_for_async_tasks.py +++ b/examples/mem_scheduler/scheduler_for_async_tasks.py @@ -57,7 +57,7 @@ def submit_tasks(): TEST_HANDLER_LABEL = "test_handler" mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler}) -# 10s to restart +# 5s to restart mem_scheduler.orchestrator.tasks_min_idle_ms[TEST_HANDLER_LABEL] = 5_000 tmp_dir = Path("./tmp") diff --git a/src/memos/mem_reader/simple_struct.py b/src/memos/mem_reader/simple_struct.py index 61a7d2b6..fa72bd06 100644 --- a/src/memos/mem_reader/simple_struct.py +++ b/src/memos/mem_reader/simple_struct.py @@ -614,11 +614,9 @@ def _read_memory( serialized_origin_memories = json.dumps( [one.memory for one in original_memory_group], indent=2 ) - revised_memory_list = self.rewrite_memories( + revised_memory_list = self.filter_hallucination_in_memories( messages=combined_messages, memory_list=original_memory_group, - user_only=os.getenv("SIMPLE_STRUCT_REWRITE_USER_ONLY", "true").lower() - == "false", ) serialized_revised_memories = json.dumps( [one.memory for one in revised_memory_list], indent=2 diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 3f5c90b6..4c9310cb 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -225,7 +225,7 @@ def initialize_modules( process_llm = chat_llm try: - if redis_client: + if redis_client and self.use_redis_queue: self.status_tracker = TaskStatusTracker(redis_client) if self.dispatcher: self.dispatcher.status_tracker = self.status_tracker @@ -305,7 +305,7 @@ def status_tracker(self) -> TaskStatusTracker | None: available via RedisSchedulerModule. This mirrors the lazy pattern used by `mem_cube` so downstream modules can safely access the tracker. """ - if self._status_tracker is None: + if self._status_tracker is None and self.use_redis_queue: try: self._status_tracker = TaskStatusTracker(self.redis) # Propagate to submodules when created lazily @@ -314,7 +314,8 @@ def status_tracker(self) -> TaskStatusTracker | None: if self.memos_message_queue: self.memos_message_queue.set_status_tracker(self._status_tracker) except Exception as e: - logger.warning(f"Failed to lazily initialize status_tracker: {e}", exc_info=True) + logger.warning(f"Failed to lazy-initialize status_tracker: {e}", exc_info=True) + return self._status_tracker @status_tracker.setter @@ -869,6 +870,8 @@ def _submit_web_logs( messages = [messages] # transform single message to list for message in messages: + if self.rabbitmq_config is None: + return try: # Always call publish; the publisher now caches when offline and flushes after reconnect logger.info( diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index cdd49118..2099da5a 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -108,8 +108,6 @@ def __init__( ) self.metrics = metrics - self._status_tracker: TaskStatusTracker | None = None - # Use setter to allow propagation and keep a single source of truth self.status_tracker = status_tracker self.submit_web_logs = submit_web_logs # ADDED @@ -118,35 +116,6 @@ def on_messages_enqueued(self, msgs: list[ScheduleMessageItem]) -> None: return # This is handled in BaseScheduler now - @property - def status_tracker(self) -> TaskStatusTracker | None: - """Lazy-initialized status tracker for the dispatcher. - - If the tracker is None, attempt to initialize from the Redis-backed - components available to the dispatcher (queue or orchestrator). - """ - if self._status_tracker is None: - try: - self._status_tracker = TaskStatusTracker(self.redis) - # Propagate to submodules when created lazily - if self.memos_message_queue: - self.memos_message_queue.set_status_tracker(self._status_tracker) - except Exception as e: - logger.warning(f"Failed to lazily initialize status_tracker: {e}", exc_info=True) - return self._status_tracker - - @status_tracker.setter - def status_tracker(self, value: TaskStatusTracker | None) -> None: - self._status_tracker = value - # Propagate to the queue if possible - try: - if self.memos_message_queue and hasattr(self.memos_message_queue, "status_tracker"): - self.memos_message_queue.status_tracker = value - except Exception as e: - logger.warning( - f"Failed to propagate dispatcher status_tracker to queue: {e}", exc_info=True - ) - def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem): """ Create a wrapper around the handler to track task execution and capture results. diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index 557a4546..1c968354 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -81,6 +81,7 @@ def __init__( # Consumer state self._is_listening = False self._message_handler: Callable[[ScheduleMessageItem], None] | None = None + self.supports_xautoclaim = False # Connection state self._is_connected = False @@ -105,6 +106,7 @@ def __init__( # Auto-initialize Redis connection if self.auto_initialize_redis(): self._is_connected = True + self._check_xautoclaim_support() self.seen_streams = set() @@ -143,6 +145,33 @@ def __init__( logger.debug(f"Initial stream keys refresh failed: {e}") self._start_stream_keys_refresh_thread() + def _check_xautoclaim_support(self): + """Check if the Redis server supports xautoclaim (v6.2+).""" + if not self._redis_conn: + return + + try: + info = self._redis_conn.info("server") + version_str = info.get("redis_version", "0.0.0") + # Simple version parsing + parts = [int(p) for p in version_str.split(".") if p.isdigit()] + while len(parts) < 3: + parts.append(0) + + major, minor, _ = parts[:3] + if major > 6 or (major == 6 and minor >= 2): + self.supports_xautoclaim = True + else: + self.supports_xautoclaim = False + + logger.info( + f"[REDIS_QUEUE] Redis version {version_str}. " + f"Supports xautoclaim: {self.supports_xautoclaim}" + ) + except Exception as e: + logger.warning(f"Failed to check Redis version: {e}") + self.supports_xautoclaim = False + def get_stream_key(self, user_id: str, mem_cube_id: str, task_label: str) -> str: stream_key = f"{self.stream_key_prefix}:{user_id}:{mem_cube_id}:{task_label}" return stream_key @@ -623,41 +652,67 @@ def _compute_pending_need( need_pending = max(0, batch_size - new_count) return need_pending if need_pending > 0 else 0 + def _parse_pending_entry(self, entry) -> tuple[str, int]: + """Extract message_id and idle_time from a pending entry (dict, tuple, or object).""" + if isinstance(entry, dict): + return entry.get("message_id"), entry.get("time_since_delivered") + elif isinstance(entry, tuple | list): + return entry[0], entry[2] + else: + # Assume object (redis-py 5.x+ PendingMessage) + return getattr(entry, "message_id", None), getattr(entry, "time_since_delivered", 0) + + def _manual_xautoclaim( + self, stream_key: str, min_idle_time: int, count: int + ) -> tuple[str, list[tuple[str, dict]], list[str]]: + """ + Simulate xautoclaim using xpending and xclaim for compatibility with older Redis versions. + """ + # 1. Get pending entries (fetch slightly more to increase chance of finding idle ones) + fetch_count = count * 3 + pending_entries = self._redis_conn.xpending_range( + stream_key, self.consumer_group, "-", "+", fetch_count + ) + + if not pending_entries: + return "0-0", [], [] + + claim_ids = [] + for entry in pending_entries: + # entry structure depends on redis-py version/decoding + # Assuming list of dicts: {'message_id': '...', 'time_since_delivered': ms, ...} + # or list of tuples + msg_id, idle_time = self._parse_pending_entry(entry) + + if idle_time >= min_idle_time: + claim_ids.append(msg_id) + if len(claim_ids) >= count: + break + + if not claim_ids: + return "0-0", [], [] + + # 2. Claim messages + claimed_messages = self._redis_conn.xclaim( + stream_key, self.consumer_group, self.consumer_name, min_idle_time, claim_ids + ) + + return "0-0", claimed_messages, [] + def _claim_pending_messages( self, stream_key: str, need_pending_count: int, task_label: str ) -> list[tuple[str, list[tuple[str, dict]]]]: """Claim pending messages exceeding idle threshold, with group existence handling.""" - try: - claimed_result = self._redis_conn.xautoclaim( - name=stream_key, - groupname=self.consumer_group, - consumername=self.consumer_name, - min_idle_time=self.orchestrator.get_task_idle_min(task_label=task_label), - start_id="0-0", - count=need_pending_count, - justid=False, - ) - if len(claimed_result) == 2: - next_id, claimed = claimed_result - deleted_ids = [] - elif len(claimed_result) == 3: - next_id, claimed, deleted_ids = claimed_result - else: - raise ValueError(f"Unexpected xautoclaim response length: {len(claimed_result)}") + min_idle = self.orchestrator.get_task_idle_min(task_label=task_label) - return [(stream_key, claimed)] if claimed else [] - except Exception as read_err: - err_msg = str(read_err).lower() - if "nogroup" in err_msg or "no such key" in err_msg: - logger.warning( - f"Consumer group or stream missing for '{stream_key}/{self.consumer_group}'. Attempting to create and retry (xautoclaim)." - ) - self._ensure_consumer_group(stream_key=stream_key) + # Use native xautoclaim if supported (Redis 6.2+) + if self.supports_xautoclaim: + try: claimed_result = self._redis_conn.xautoclaim( name=stream_key, groupname=self.consumer_group, consumername=self.consumer_name, - min_idle_time=self.orchestrator.get_task_idle_min(task_label=task_label), + min_idle_time=min_idle, start_id="0-0", count=need_pending_count, justid=False, @@ -670,25 +725,64 @@ def _claim_pending_messages( else: raise ValueError( f"Unexpected xautoclaim response length: {len(claimed_result)}" - ) from read_err + ) return [(stream_key, claimed)] if claimed else [] - return [] - - def _batch_claim_pending_messages( - self, claims_spec: list[tuple[str, int, str]] - ) -> list[tuple[str, list[tuple[str, dict]]]]: - """Batch-claim pending messages across multiple streams. + except Exception as read_err: + err_msg = str(read_err).lower() + if "nogroup" in err_msg or "no such key" in err_msg: + logger.warning( + f"Consumer group or stream missing for '{stream_key}/{self.consumer_group}'. Attempting to create and retry (xautoclaim)." + ) + self._ensure_consumer_group(stream_key=stream_key) + claimed_result = self._redis_conn.xautoclaim( + name=stream_key, + groupname=self.consumer_group, + consumername=self.consumer_name, + min_idle_time=min_idle, + start_id="0-0", + count=need_pending_count, + justid=False, + ) + if len(claimed_result) == 2: + next_id, claimed = claimed_result + deleted_ids = [] + elif len(claimed_result) == 3: + next_id, claimed, deleted_ids = claimed_result + else: + raise ValueError( + f"Unexpected xautoclaim response length: {len(claimed_result)}" + ) from read_err - Args: - claims_spec: List of tuples (stream_key, need_pending_count, task_label) + return [(stream_key, claimed)] if claimed else [] + return [] - Returns: - A list of (stream_key, claimed_entries) pairs for all successful claims. - """ - if not self._redis_conn or not claims_spec: + # Fallback to manual xautoclaim for older Redis versions + try: + _next, claimed, _deleted = self._manual_xautoclaim( + stream_key, min_idle, need_pending_count + ) + return [(stream_key, claimed)] if claimed else [] + except Exception as read_err: + err_msg = str(read_err).lower() + if "nogroup" in err_msg or "no such key" in err_msg: + logger.warning( + f"Consumer group or stream missing for '{stream_key}/{self.consumer_group}'. Attempting to create and retry (manual xautoclaim)." + ) + self._ensure_consumer_group(stream_key=stream_key) + try: + _next, claimed, _deleted = self._manual_xautoclaim( + stream_key, min_idle, need_pending_count + ) + return [(stream_key, claimed)] if claimed else [] + except Exception: + return [] return [] + def _batch_claim_native( + self, claims_spec: list[tuple[str, int, str]] + ) -> list[tuple[str, list[tuple[str, dict]]]]: + """Batch-claim pending messages using Redis xautoclaim pipeline (Redis 6.2+).""" pipe = self._redis_conn.pipeline(transaction=False) for stream_key, need_count, label in claims_spec: pipe.xautoclaim( @@ -702,14 +796,11 @@ def _batch_claim_pending_messages( ) try: - # Execute with raise_on_error=False so we get exceptions in the results list - # instead of aborting the whole batch. results = pipe.execute(raise_on_error=False) except Exception as e: logger.error(f"Pipeline execution critical failure: {e}") results = [e] * len(claims_spec) - # Handle individual failures (e.g. NOGROUP) by retrying just that stream final_results = [] for i, res in enumerate(results): if isinstance(res, Exception): @@ -736,12 +827,8 @@ def _batch_claim_pending_messages( else: final_results.append(res) - results = final_results - - claimed_pairs: list[tuple[str, list[tuple[str, dict]]]] = [] - for (stream_key, _need_count, _label), claimed_result in zip( - claims_spec, results, strict=False - ): + claimed_pairs = [] + for (stream_key, _, _), claimed_result in zip(claims_spec, final_results, strict=False): try: if not claimed_result: continue @@ -760,6 +847,98 @@ def _batch_claim_pending_messages( return claimed_pairs + def _batch_claim_manual( + self, claims_spec: list[tuple[str, int, str]] + ) -> list[tuple[str, list[tuple[str, dict]]]]: + """Batch-claim pending messages using 2-phase pipeline (Redis < 6.2).""" + # Phase 1: Fetch pending messages for all streams + pending_pipe = self._redis_conn.pipeline(transaction=False) + for stream_key, need_count, _label in claims_spec: + fetch_count = need_count * 3 + pending_pipe.xpending_range(stream_key, self.consumer_group, "-", "+", fetch_count) + + try: + pending_results = pending_pipe.execute(raise_on_error=False) + except Exception as e: + logger.error(f"Pending fetch pipeline failed: {e}") + return [] + + # Phase 2: Filter and prepare claim pipeline + claim_pipe = self._redis_conn.pipeline(transaction=False) + streams_to_claim_indices = [] + claimed_pairs: list[tuple[str, list[tuple[str, dict]]]] = [] + + for i, (stream_key, need_count, label) in enumerate(claims_spec): + pending_res = pending_results[i] + min_idle = self.orchestrator.get_task_idle_min(task_label=label) + + if isinstance(pending_res, Exception): + err_msg = str(pending_res).lower() + if "nogroup" in err_msg or "no such key" in err_msg: + try: + self._ensure_consumer_group(stream_key) + _next, claimed, _ = self._manual_xautoclaim( + stream_key, min_idle, need_count + ) + if claimed: + claimed_pairs.append((stream_key, claimed)) + except Exception as retry_err: + logger.warning(f"Retry manual claim failed for {stream_key}: {retry_err}") + continue + + if not pending_res: + continue + + claim_ids = [] + for entry in pending_res: + msg_id, idle_time = self._parse_pending_entry(entry) + if idle_time >= min_idle: + claim_ids.append(msg_id) + if len(claim_ids) >= need_count: + break + + if claim_ids: + claim_pipe.xclaim( + stream_key, + self.consumer_group, + self.consumer_name, + min_idle, + claim_ids, + ) + streams_to_claim_indices.append(i) + + if streams_to_claim_indices: + try: + claim_results = claim_pipe.execute(raise_on_error=False) + for idx_in_results, original_idx in enumerate(streams_to_claim_indices): + res = claim_results[idx_in_results] + stream_key = claims_spec[original_idx][0] + if isinstance(res, list) and res: + claimed_pairs.append((stream_key, res)) + except Exception as e: + logger.error(f"Claim pipeline failed: {e}") + + return claimed_pairs + + def _batch_claim_pending_messages( + self, claims_spec: list[tuple[str, int, str]] + ) -> list[tuple[str, list[tuple[str, dict]]]]: + """Batch-claim pending messages across multiple streams. + + Args: + claims_spec: List of tuples (stream_key, need_pending_count, task_label) + + Returns: + A list of (stream_key, claimed_entries) pairs for all successful claims. + """ + if not self._redis_conn or not claims_spec: + return [] + + if self.supports_xautoclaim: + return self._batch_claim_native(claims_spec) + + return self._batch_claim_manual(claims_spec) + def _convert_messages( self, messages: list[tuple[str, list[tuple[str, dict]]]] ) -> list[ScheduleMessageItem]: @@ -994,6 +1173,7 @@ def connect(self) -> None: # Test the connection self._redis_conn.ping() self._is_connected = True + self._check_xautoclaim_support() logger.debug("Redis connection established successfully") # Start stream keys refresher when connected self._start_stream_keys_refresh_thread() diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index 5a94d2af..a07934b8 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -30,6 +30,7 @@ def __init__(self): Initialize RabbitMQ connection settings. """ super().__init__() + self.auth_config = None # RabbitMQ settings self.rabbitmq_config: RabbitMQConfig | None = None @@ -99,22 +100,35 @@ def initialize_rabbitmq( ) return + if self.is_rabbitmq_connected(): + logger.warning("RabbitMQ is already connected. Skipping initialization.") + return + from pika.adapters.select_connection import SelectConnection - if config is None: - if config_path is None and AuthConfig.default_config_exists(): - auth_config = AuthConfig.from_local_config() - elif Path(config_path).exists(): - auth_config = AuthConfig.from_local_config(config_path=config_path) + if config is not None: + if isinstance(config, RabbitMQConfig): + self.rabbitmq_config = config + elif isinstance(config, dict): + self.rabbitmq_config = AuthConfig.from_dict(config).rabbitmq else: - auth_config = AuthConfig.from_local_env() - self.rabbitmq_config = auth_config.rabbitmq - elif isinstance(config, RabbitMQConfig): - self.rabbitmq_config = config - elif isinstance(config, dict): - self.rabbitmq_config = AuthConfig.from_dict(config).rabbitmq + logger.error(f"Unsupported config type: {type(config)}") + return + else: - logger.error("Not implemented") + if config_path is not None and Path(config_path).exists(): + self.auth_config = AuthConfig.from_local_config(config_path=config_path) + elif AuthConfig.default_config_exists(): + self.auth_config = AuthConfig.from_local_config() + else: + self.auth_config = AuthConfig.from_local_env() + self.rabbitmq_config = self.auth_config.rabbitmq + + if self.rabbitmq_config is None: + logger.error( + "Failed to load RabbitMQ configuration. Please check your config file or environment variables." + ) + return # Load exchange configuration from config if self.rabbitmq_config: @@ -140,7 +154,7 @@ def initialize_rabbitmq( self.rabbitmq_exchange_type = env_exchange_type logger.info(f"Using env exchange type override: {self.rabbitmq_exchange_type}") - # Start connection process + # Start connection process parameters = self.get_rabbitmq_connection_param() self.rabbitmq_connection = SelectConnection( parameters, @@ -156,7 +170,7 @@ def initialize_rabbitmq( self._io_loop_thread.start() logger.info("RabbitMQ connection process started") except Exception: - logger.error("Fail to initialize auth_config", exc_info=True) + logger.error("Failed to initialize RabbitMQ connection", exc_info=True) finally: with self._rabbitmq_lock: self._rabbitmq_initializing = False diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py index 9432d630..20f8150b 100644 --- a/src/memos/templates/mem_reader_prompts.py +++ b/src/memos/templates/mem_reader_prompts.py @@ -796,43 +796,48 @@ """ SIMPLE_STRUCT_HALLUCINATION_FILTER_PROMPT = """ -You are a strict memory validator. -Your task is to identify and delete hallucinated memories that are not explicitly stated by the user in the provided messages. - -Rules: -1. **User-Only Origin**: Verify facts against USER messages ONLY. If the Assistant repeats a User fact, it is VALID. If the Assistant introduces a new detail (e.g., 'philanthropy') that the User did not explicitly confirm, it is INVALID. -2. **No Inference Allowed**: Do NOT keep memories based on implication, emotion, preference, or generalization. Only verbatim or direct restatements of user-provided facts are valid. However, minor formatting corrections (e.g., adding missing spaces between names, fixing obvious typos) are ALLOWED. -3. **Hallucination = Deletion**: If a memory contains any detail not directly expressed by the user, mark it for deletion. -4. **Timestamp Exception**: Memories may include timestamps (e.g., dates like "On December 19, 2026") derived from conversation metadata. If the date in the memory is likely the conversation time (even if not shown in the `messages` list), do NOT treat it as a hallucination or require a rewrite. - -Examples: -Messages: -- [user]: I love coding in Python. -- [assistant]: That's great! I assume you also contribute to open source projects? -Memory: User enjoys Python and contributes to open source. -Result: {{"keep": false, "reason": "User never stated they contribute to open source; this came from Assistant's assumption."}} - -Messages: -- [user]: I am tired. -- [assistant]: I hear you are tired. Rest is important. -Memory: User stated they are tired. -Result: {{"keep": true, "reason": "Direct restatement of user input, even if Assistant repeated it."}} - -Inputs: -messages: -{messages_inline} - -memories: -{memories_inline} - -Output Format: -- Return a JSON object with string keys ("0", "1", "2", ...) matching the input memory indices. -- Each value must be: {{ "keep": boolean, "reason": string }} -- "keep": true only if the memory is a direct reflection of the user's explicit words. -- "reason": brief, factual, and cites missing or unsupported content. - -Important: Output **only** the JSON. No extra text, explanations, markdown, or fields. -""" + You are a strict memory validator. + Your task is to identify and delete hallucinated memories that are not explicitly stated by the user in the provided messages. + + Rules: + 1. **Explicit Denial & Inconsistency**: If a memory claims something that the user explicitly denied or is clearly inconsistent with the user's statements, mark it for deletion. + 2. **Timestamp Exception**: Memories may include timestamps (e.g., dates like "On December 19, 2026") derived from conversation metadata. If the date in the memory is likely the conversation time (even if not shown in the `messages` list), do NOT treat it as a hallucination or require a rewrite. + + Example: + Messages: + [user]: I'm planning a trip to Japan next month for about a week. + [assistant]: That sounds great! Are you planning to visit Tokyo Disneyland? + [user]: No, I won't be going to Tokyo this time. I plan to stay in Kyoto and Osaka to avoid crowds. + + Memories: + {{ + "0": "User plans to travel to Japan for a week next month.", + "1": "User intends to visit Tokyo Disneyland.", + "2": "User plans to stay in Kyoto and Osaka." + }} + + Output: + {{ + "0": {{ "keep": true, "reason": "Explicitly stated by user." }}, + "1": {{ "keep": false, "reason": "User explicitly denied visiting Tokyo." }}, + "2": {{ "keep": true, "reason": "Explicitly stated by user." }} + }} + + Inputs: + Messages: + {messages_inline} + + Memories: + {memories_inline} + + Output Format: + - Return a JSON object with string keys ("0", "1", "2", ...) matching the input memory indices. + - Each value must be: {{ "keep": boolean, "reason": string }} + - "keep": true only if the memory is a direct reflection of the user's explicit words. + - "reason": brief, factual, and cites missing or unsupported content. + + Important: Output **only** the JSON. No extra text, explanations, markdown, or fields. + """ SIMPLE_STRUCT_ADD_BEFORE_SEARCH_PROMPT = """