Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
83d1576
fix bugs: try to fix bugs in _submit_web_logs
tangg555 Dec 18, 2025
e50c56c
fix bugs: try to address bugs
tangg555 Dec 18, 2025
74f1da0
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 18, 2025
58eb6b8
fix bugs
tangg555 Dec 18, 2025
392b6df
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 18, 2025
0d72ce7
refactor: modify examples
tangg555 Dec 18, 2025
2fe965b
revise add operation and fix an unbelievable bug
tangg555 Dec 18, 2025
26267f4
Merge branch 'dev' into scheduler
tangg555 Dec 18, 2025
eecfa51
address the bug issues
tangg555 Dec 22, 2025
7c6b7da
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 22, 2025
f2da3a7
the doc file has a format problem which has been fixed in this commit
tangg555 Dec 24, 2025
a6881b4
add a range of new feats for the add operation
tangg555 Dec 24, 2025
7f39e7e
address the incompatible issue of local scheduler
tangg555 Dec 24, 2025
6778cc4
address the conflicts
tangg555 Dec 24, 2025
3fe9cb0
feat(scheduler): optimize redis queue consumer group management
tangg555 Dec 24, 2025
b35096f
fix(tests): resolve AttributeError in SimpleStructMemReader tests
tangg555 Dec 24, 2025
a7f5b77
Merge branch 'dev' into dev
tangg555 Dec 24, 2025
ded7ac6
Merge branch 'dev' into dev
tangg555 Dec 24, 2025
8943ba8
fix(mem_reader): pass info dict to add_before_search for correct user…
tangg555 Dec 24, 2025
78a4327
refactor add_before_search from mem_reader to SingleCubeView
tangg555 Dec 24, 2025
a5fc4c0
address bugs
tangg555 Dec 24, 2025
45224dd
fix: fix the qsize bug of task queue, and accept change from hotfix/s…
tangg555 Dec 25, 2025
f3c4f6c
fix: address some issues to run old scheduler example and kv cache ex…
tangg555 Dec 26, 2025
d634851
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 26, 2025
e9b60db
fix: address the issue of Top-level import of unavailable module 'torch'
tangg555 Dec 26, 2025
c6bdb22
fix: resolve linting errors and make optional dependencies lazy loaded
tangg555 Dec 26, 2025
077f529
Merge branch 'dev' into scheduler
tangg555 Dec 29, 2025
5abbe23
Merge branch 'dev' into scheduler
tangg555 Dec 29, 2025
ad3620a
refactor: revise the rewrite prompt to make it better
tangg555 Dec 29, 2025
2475286
refactor: update examples
tangg555 Dec 30, 2025
24c9b18
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 30, 2025
0ecee35
Merge branch 'dev' into scheduler
tangg555 Dec 30, 2025
a196dcb
refactor: update examples for scheduler
tangg555 Dec 30, 2025
af39bfc
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Dec 30, 2025
6139af8
fix bugs: address the unsupported xautoclaim command when redis versi…
tangg555 Jan 6, 2026
686d7c1
refactor: review settings
tangg555 Jan 7, 2026
3dd5068
refactor: adjust examples to make it run better for code debugging
tangg555 Jan 7, 2026
211b4dc
refactor: review slow add apis to get a better performance on Halumen
tangg555 Jan 7, 2026
f0a9c13
fix bugs: address the issue when set user_redis_queue to false, the s…
tangg555 Jan 7, 2026
079a3e3
refactor: allow the code to run without rabbitmq
tangg555 Jan 8, 2026
e01863d
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Jan 8, 2026
3a75d3a
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Jan 8, 2026
d2e6c68
refactor: create a _parse_pending_entry for redis queue
tangg555 Jan 8, 2026
412b05a
refactor: add a try/catch for status_tracker
tangg555 Jan 8, 2026
55dc088
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Jan 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
.trae

# VSCode
.vscode*
Expand Down
1 change: 1 addition & 0 deletions docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ API_SCHEDULER_ON=true
API_SEARCH_WINDOW_SIZE=5
# Specify how many rounds of previous conversations (history) to retrieve and consider during the 'hybrid search' (fast search+asynchronous fine search). This helps provide context aware search results
API_SEARCH_HISTORY_TURNS=5
MEMSCHEDULER_USE_REDIS_QUEUE=false

## Graph / vector stores
# Neo4j database selection mode
Expand Down
8 changes: 5 additions & 3 deletions examples/mem_scheduler/quick_start_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def kv_cache_only():

def run_scheduler_example():
# 使用 MemScheduler 加载主 MOS(Memory-Oriented System)配置文件
config = parse_yaml("./examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml")
config = parse_yaml(
f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml"
)
# 将解析出的配置字典传入 MOSConfig 构造器, 构建配置对象
mos_config = MOSConfig(**config)
# 使用配置对象初始化 MOS 系统实例
Expand All @@ -159,12 +161,12 @@ def run_scheduler_example():

# 从 YAML 文件加载 MemCube(记忆立方体)的通用配置
config = GeneralMemCubeConfig.from_yaml_file(
"./examples/data/config/mem_scheduler/mem_cube_config.yaml"
f"{BASE_DIR}/examples/data/config/mem_scheduler/mem_cube_config.yaml"
)
# 定义 MemCube 的唯一标识符
mem_cube_id = "mem_cube_5"
# 定义 MemCube 的本地存储路径(路径中包含用户 ID 和 MemCube ID)
mem_cube_name_or_path = f"./outputs/mem_scheduler/{user_id}/{mem_cube_id}"
mem_cube_name_or_path = f"{BASE_DIR}/outputs/mem_scheduler/{user_id}/{mem_cube_id}"

# 如果该路径已存在, 则先删除旧目录
if Path(mem_cube_name_or_path).exists():
Expand Down
2 changes: 1 addition & 1 deletion examples/mem_scheduler/scheduler_for_async_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def submit_tasks():
TEST_HANDLER_LABEL = "test_handler"
mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler})

# 10s to restart
# 5s to restart
mem_scheduler.orchestrator.tasks_min_idle_ms[TEST_HANDLER_LABEL] = 5_000

tmp_dir = Path("./tmp")
Expand Down
4 changes: 1 addition & 3 deletions src/memos/mem_reader/simple_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,11 +614,9 @@ def _read_memory(
serialized_origin_memories = json.dumps(
[one.memory for one in original_memory_group], indent=2
)
revised_memory_list = self.rewrite_memories(
revised_memory_list = self.filter_hallucination_in_memories(
messages=combined_messages,
memory_list=original_memory_group,
user_only=os.getenv("SIMPLE_STRUCT_REWRITE_USER_ONLY", "true").lower()
== "false",
)
serialized_revised_memories = json.dumps(
[one.memory for one in revised_memory_list], indent=2
Expand Down
9 changes: 6 additions & 3 deletions src/memos/mem_scheduler/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def initialize_modules(
process_llm = chat_llm

try:
if redis_client:
if redis_client and self.use_redis_queue:
self.status_tracker = TaskStatusTracker(redis_client)
if self.dispatcher:
self.dispatcher.status_tracker = self.status_tracker
Expand Down Expand Up @@ -305,7 +305,7 @@ def status_tracker(self) -> TaskStatusTracker | None:
available via RedisSchedulerModule. This mirrors the lazy pattern used
by `mem_cube` so downstream modules can safely access the tracker.
"""
if self._status_tracker is None:
if self._status_tracker is None and self.use_redis_queue:
try:
self._status_tracker = TaskStatusTracker(self.redis)
# Propagate to submodules when created lazily
Expand All @@ -314,7 +314,8 @@ def status_tracker(self) -> TaskStatusTracker | None:
if self.memos_message_queue:
self.memos_message_queue.set_status_tracker(self._status_tracker)
except Exception as e:
logger.warning(f"Failed to lazily initialize status_tracker: {e}", exc_info=True)
logger.warning(f"Failed to lazy-initialize status_tracker: {e}", exc_info=True)

return self._status_tracker

@status_tracker.setter
Expand Down Expand Up @@ -869,6 +870,8 @@ def _submit_web_logs(
messages = [messages] # transform single message to list

for message in messages:
if self.rabbitmq_config is None:
return
try:
# Always call publish; the publisher now caches when offline and flushes after reconnect
logger.info(
Expand Down
31 changes: 0 additions & 31 deletions src/memos/mem_scheduler/task_schedule_modules/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ def __init__(
)

self.metrics = metrics
self._status_tracker: TaskStatusTracker | None = None
# Use setter to allow propagation and keep a single source of truth
self.status_tracker = status_tracker
self.submit_web_logs = submit_web_logs # ADDED

Expand All @@ -118,35 +116,6 @@ def on_messages_enqueued(self, msgs: list[ScheduleMessageItem]) -> None:
return
# This is handled in BaseScheduler now

@property
def status_tracker(self) -> TaskStatusTracker | None:
"""Lazy-initialized status tracker for the dispatcher.

If the tracker is None, attempt to initialize from the Redis-backed
components available to the dispatcher (queue or orchestrator).
"""
if self._status_tracker is None:
try:
self._status_tracker = TaskStatusTracker(self.redis)
# Propagate to submodules when created lazily
if self.memos_message_queue:
self.memos_message_queue.set_status_tracker(self._status_tracker)
except Exception as e:
logger.warning(f"Failed to lazily initialize status_tracker: {e}", exc_info=True)
return self._status_tracker

@status_tracker.setter
def status_tracker(self, value: TaskStatusTracker | None) -> None:
self._status_tracker = value
# Propagate to the queue if possible
try:
if self.memos_message_queue and hasattr(self.memos_message_queue, "status_tracker"):
self.memos_message_queue.status_tracker = value
except Exception as e:
logger.warning(
f"Failed to propagate dispatcher status_tracker to queue: {e}", exc_info=True
)

def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem):
"""
Create a wrapper around the handler to track task execution and capture results.
Expand Down
Loading