|
52 | 52 | # Increase batch size for parallel processing |
53 | 53 | EMBEDDING_BATCH_SIZE = 16 # Process embeddings in batches for better throughput |
54 | 54 | PROGRESS_LOG_INTERVAL = 10 # Log progress every N completed files |
55 | | -EMBEDDING_TIMEOUT = 30 # Timeout in seconds for each embedding API call |
| 55 | +# Timeout for future.result() must account for retries: (max_retries + 1) × SDK_timeout + buffer |
| 56 | +# With SDK timeout of 15s and max_retries=2, this allows 3 × 15s = 45s + 15s buffer = 60s |
| 57 | +EMBEDDING_TIMEOUT = 60 # Timeout in seconds for each embedding API call (including retries) |
56 | 58 | FILE_PROCESSING_TIMEOUT = 300 # Timeout in seconds for processing a single file (5 minutes) |
57 | | -_THREADPOOL_WORKERS = max(16, EMBEDDING_CONCURRENCY + 8) |
58 | | -_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=_THREADPOOL_WORKERS) |
| 59 | + |
| 60 | +_FILE_EXECUTOR_WORKERS = 4 |
| 61 | +_EMBEDDING_EXECUTOR_WORKERS = 4 |
| 62 | +_FILE_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=_FILE_EXECUTOR_WORKERS) |
| 63 | +_EMBEDDING_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=_EMBEDDING_EXECUTOR_WORKERS) |
59 | 64 |
|
60 | 65 | logger = get_logger(__name__) |
61 | 66 |
|
@@ -217,7 +222,7 @@ def _process_file_sync( |
217 | 222 | for idx, chunk_doc in batch: |
218 | 223 | # Submit task to executor; semaphore will be acquired inside the worker |
219 | 224 | embedding_start_time = time.time() |
220 | | - future = _EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, rel_path, idx, embedding_model) |
| 225 | + future = _EMBEDDING_EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, rel_path, idx, embedding_model) |
221 | 226 | embedding_futures.append((idx, chunk_doc, future, embedding_start_time)) |
222 | 227 |
|
223 | 228 | # Wait for batch to complete and store results |
@@ -397,7 +402,7 @@ def analyze_local_path_sync( |
397 | 402 | counters[0] += 1 |
398 | 403 | file_num = counters[0] |
399 | 404 |
|
400 | | - fut = _EXECUTOR.submit( |
| 405 | + fut = _FILE_EXECUTOR.submit( |
401 | 406 | _process_file_sync, |
402 | 407 | semaphore, |
403 | 408 | database_path, |
|
0 commit comments