|
62 | 62 | # Initialize EmbeddingClient for structured logging and retry logic |
63 | 63 | _embedding_client = EmbeddingClient() |
64 | 64 |
|
| 65 | +# Thread-local storage to track execution state inside futures |
| 66 | +_thread_state = threading.local() |
| 67 | + |
65 | 68 |
|
66 | 69 | def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, file_path: str = "<unknown>", chunk_index: int = 0, model: Optional[str] = None): |
67 | 70 | """ |
68 | 71 | Wrapper to acquire semaphore inside executor task to avoid deadlock. |
69 | 72 | The semaphore is acquired in the worker thread, not the main thread. |
| 73 | + Tracks execution state for debugging timeout issues. |
70 | 74 | """ |
| 75 | + # Initialize thread state tracking |
| 76 | + _thread_state.stage = "acquiring_semaphore" |
| 77 | + _thread_state.file_path = file_path |
| 78 | + _thread_state.chunk_index = chunk_index |
| 79 | + _thread_state.start_time = time.time() |
| 80 | + |
71 | 81 | semaphore.acquire() |
72 | 82 | try: |
73 | | - return _embedding_client.embed_text(text, file_path=file_path, chunk_index=chunk_index) |
| 83 | + _thread_state.stage = "calling_embed_text" |
| 84 | + logger.debug(f"Worker thread starting embed_text for {file_path} chunk {chunk_index}") |
| 85 | + result = _embedding_client.embed_text(text, file_path=file_path, chunk_index=chunk_index) |
| 86 | + _thread_state.stage = "completed" |
| 87 | + logger.debug(f"Worker thread completed embed_text for {file_path} chunk {chunk_index}") |
| 88 | + return result |
| 89 | + except Exception as e: |
| 90 | + _thread_state.stage = f"exception: {type(e).__name__}" |
| 91 | + _thread_state.exception = str(e) |
| 92 | + logger.error(f"Worker thread exception in embed_text for {file_path} chunk {chunk_index}: {e}") |
| 93 | + raise |
74 | 94 | finally: |
| 95 | + _thread_state.stage = "releasing_semaphore" |
75 | 96 | semaphore.release() |
| 97 | + _thread_state.stage = "finished" |
76 | 98 |
|
77 | 99 |
|
78 | 100 | def detect_language(path: str): |
@@ -215,16 +237,44 @@ def _process_file_sync( |
215 | 237 | logger.warning(f"Slow embedding API response for {rel_path} chunk {idx}: {embedding_duration:.2f}s total") |
216 | 238 | except concurrent.futures.TimeoutError: |
217 | 239 | elapsed = time.time() - embedding_start_time |
218 | | - logger.error( |
219 | | - f"Future timeout ({EMBEDDING_TIMEOUT}s) for {rel_path} chunk {idx}:\n" |
220 | | - f" - Elapsed time: {elapsed:.2f}s\n" |
221 | | - f" - Chunk size: {chunk_size} characters\n" |
222 | | - f" - Chunk preview: {chunk_preview!r}\n" |
223 | | - f" - Future state: {future._state if hasattr(future, '_state') else 'unknown'}\n" |
224 | | - f" - The future.result() call timed out waiting for the embedding API.\n" |
225 | | - f" - The embedding request may still be running in the background thread.\n" |
226 | | - f" - Check logs above for 'Embedding API Timeout' messages from the worker thread." |
227 | | - ) |
| 240 | + |
| 241 | + # Try to get exception info from the future if available |
| 242 | + future_exception = None |
| 243 | + try: |
| 244 | + future_exception = future.exception(timeout=0.1) |
| 245 | + except concurrent.futures.TimeoutError: |
| 246 | + future_exception = None # Still running |
| 247 | + except Exception as e: |
| 248 | + future_exception = e |
| 249 | + |
| 250 | + # Build diagnostic information |
| 251 | + diagnostic_info = [ |
| 252 | + f"Future timeout ({EMBEDDING_TIMEOUT}s) for {rel_path} chunk {idx}:", |
| 253 | + f" - Elapsed time: {elapsed:.2f}s", |
| 254 | + f" - Future state: {future._state if hasattr(future, '_state') else 'unknown'}", |
| 255 | + ] |
| 256 | + |
| 257 | + if future_exception: |
| 258 | + diagnostic_info.append(f" - Future exception: {type(future_exception).__name__}: {future_exception}") |
| 259 | + else: |
| 260 | + diagnostic_info.append(f" - Future exception: None (still running or completed)") |
| 261 | + |
| 262 | + # Add information about running status |
| 263 | + if future.running(): |
| 264 | + diagnostic_info.append(f" - Future.running(): True - worker thread is still executing") |
| 265 | + elif future.done(): |
| 266 | + diagnostic_info.append(f" - Future.done(): True - worker thread completed but future.result() timed out retrieving result") |
| 267 | + else: |
| 268 | + diagnostic_info.append(f" - Future status: Pending/Unknown") |
| 269 | + |
| 270 | + diagnostic_info.extend([ |
| 271 | + f" - The future.result() call timed out after {EMBEDDING_TIMEOUT}s", |
| 272 | + f" - This means the worker thread did not complete the embedding request in time", |
| 273 | + f" - Check logs above for messages from the worker thread (search for 'Worker thread')", |
| 274 | + f" - The embedding API logs will show the actual HTTP request state" |
| 275 | + ]) |
| 276 | + |
| 277 | + logger.error("\n".join(diagnostic_info)) |
228 | 278 | emb = None |
229 | 279 | failed_count += 1 |
230 | 280 | except Exception as e: |
|
0 commit comments