22import json
33import time
44import traceback
5- import subprocess
6- import asyncio
7- import concurrent .futures
85import sqlite3
96import importlib .resources
107import logging
118from pathlib import Path
129from typing import Optional , Dict , Any , List
1310
11+ import concurrent .futures
12+ import threading
13+
1414from db import create_analysis , store_file , update_analysis_status
1515from external_api import get_embedding_for_text , call_coding_api
1616from llama_index .core import Document
17- import logging
17+
18+ # reduce noise from httpx used by external libs
1819logging .getLogger ("httpx" ).setLevel (logging .WARNING )
1920
2021# language detection by extension
4142_THREADPOOL_WORKERS = max (16 , EMBEDDING_CONCURRENCY + 8 )
4243_EXECUTOR = concurrent .futures .ThreadPoolExecutor (max_workers = _THREADPOOL_WORKERS )
4344
44- # sqlite-vector defaults (sensible fixed defaults per provided API )
45+ # sqlite-vector defaults (sensible fixed defaults)
4546SQLITE_VECTOR_PKG = "sqlite_vector.binaries"
4647SQLITE_VECTOR_RESOURCE = "vector"
4748SQLITE_VECTOR_VERSION_FN = "vector_version" # SELECT vector_version();
@@ -69,16 +70,6 @@ def detect_language(path: str):
6970 return EXT_LANG .get (ext , "text" )
7071
7172
72- # Async helpers ---------------------------------------------------------------
73- async def _run_in_executor (func , * args , ** kwargs ):
74- loop = asyncio .get_running_loop ()
75- return await loop .run_in_executor (_EXECUTOR , lambda : func (* args , ** kwargs ))
76-
77-
78- async def async_get_embedding (text : str , model : Optional [str ] = None ):
79- # Wrap the (possibly blocking) get_embedding_for_text in a threadpool so the event loop isn't blocked.
80- return await _run_in_executor (get_embedding_for_text , text , model )
81-
8273# Simple chunker (character-based). Tunable CHUNK_SIZE, CHUNK_OVERLAP.
8374def chunk_text (text : str , chunk_size : int = CHUNK_SIZE , overlap : int = CHUNK_OVERLAP ) -> List [str ]:
8475 if chunk_size <= 0 :
@@ -129,7 +120,7 @@ def _load_sqlite_vector_extension(conn: sqlite3.Connection) -> None:
129120 if STRICT_VECTOR_INTEGRATION :
130121 raise RuntimeError (f"Failed to load sqlite-vector extension: { e } " ) from e
131122 else :
132- print ( f"[warning] sqlite-vector extension not loaded: { e } " )
123+ logger . warning ( " sqlite-vector extension not loaded: %s" , e )
133124
134125
135126def _ensure_chunks_and_meta (conn : sqlite3 .Connection ):
@@ -272,24 +263,25 @@ def _get_chunk_text(database_path: str, file_id: int, chunk_index: int) -> Optio
272263 conn .close ()
273264
274265
275- # Main async processing for a single file
276- async def _process_file (
277- semaphore : asyncio .Semaphore ,
266+ # Main synchronous processing for a single file
267+ def _process_file_sync (
268+ semaphore : threading .Semaphore ,
278269 database_path : str ,
279270 analysis_id : int ,
280271 full_path : str ,
281272 rel_path : str ,
282273 cfg : Optional [Dict [str , Any ]],
283274):
284275 """
285- Real implementation: read file, skip irrelevant, store file, chunk, compute embeddings per chunk,
286- store audit embedding and insert chunk vectors into chunks.embedding (via sqlite-vector) .
287- Uses retries for DB locked errors.
276+ Synchronous implementation of per- file processing.
277+ Intended to run on a ThreadPoolExecutor worker thread .
278+ Returns a dict: {"stored": bool, "embedded": bool}
288279 """
289280 try :
290- # read file content in threadpool
281+ # read file content
291282 try :
292- content = await _run_in_executor (lambda p : open (p , "r" , encoding = "utf-8" , errors = "ignore" ).read (), full_path )
283+ with open (full_path , "r" , encoding = "utf-8" , errors = "ignore" ) as fh :
284+ content = fh .read ()
293285 except Exception :
294286 return {"stored" : False , "embedded" : False }
295287
@@ -298,25 +290,26 @@ async def _process_file(
298290
299291 lang = detect_language (rel_path )
300292 if lang == "text" :
301- # ignore files whose extensions are not explicitly mapped in EXT_LANG
302293 return {"stored" : False , "embedded" : False }
303294
304- # store file (store_file is sync, run in executor)
305- fid = await _run_in_executor (store_file , database_path , analysis_id , rel_path , content , lang )
295+ # store file (synchronous DB writer)
296+ try :
297+ fid = store_file (database_path , analysis_id , rel_path , content , lang )
298+ except Exception :
299+ logger .exception ("Failed to store file %s for analysis %s" , rel_path , analysis_id )
300+ return {"stored" : False , "embedded" : False }
306301
307- # create Document for compatibility
308302 _ = Document (text = content , extra_info = {"path" : rel_path , "lang" : lang })
309303
310304 embedding_model = None
311305 if isinstance (cfg , dict ):
312306 embedding_model = cfg .get ("embedding_model" )
313307
314- # chunk the content
315308 chunks = chunk_text (content , chunk_size = CHUNK_SIZE , overlap = CHUNK_OVERLAP )
316309 if not chunks :
317310 chunks = [content ]
318311
319- # Ensure extension present and tables created (do once per file)
312+ # Ensure extension present and tables created
320313 conn_test = _connect_db (database_path )
321314 try :
322315 _load_sqlite_vector_extension (conn_test )
@@ -329,45 +322,53 @@ async def _process_file(
329322 for idx , chunk in enumerate (chunks ):
330323 chunk_doc = Document (text = chunk , extra_info = {"path" : rel_path , "lang" : lang , "chunk_index" : idx , "chunk_count" : len (chunks )})
331324
332- await semaphore .acquire ()
325+ # Acquire semaphore to bound concurrent embedding requests
326+ semaphore .acquire ()
333327 try :
334- emb = await async_get_embedding (chunk_doc .text , model = embedding_model )
335- finally :
336- semaphore .release ()
337-
338- if emb :
339- # insert chunk vector into sqlite-vector-backed chunks.embedding with retry
340- def _insert_task (dbp , fid_local , pth , idx_local , vector_local ):
341- conn2 = _connect_db (dbp )
342- try :
343- _load_sqlite_vector_extension (conn2 )
344- return _insert_chunk_vector_with_retry (conn2 , fid_local , pth , idx_local , vector_local )
345- finally :
346- conn2 .close ()
347-
328+ emb = None
348329 try :
349- await _run_in_executor (_insert_task , database_path , fid , rel_path , idx , emb )
350- embedded_any = True
330+ emb = get_embedding_for_text (chunk_doc .text , model = embedding_model )
351331 except Exception as e :
352- # record an error to disk (previously was stored in DB)
332+ logger .exception ("Embedding retrieval failed for %s chunk %d: %s" , rel_path , idx , e )
333+ finally :
334+ # release semaphore as soon as embedding call completes
335+ semaphore .release ()
336+
337+ if emb :
338+ try :
339+ conn2 = _connect_db (database_path )
340+ try :
341+ _load_sqlite_vector_extension (conn2 )
342+ _insert_chunk_vector_with_retry (conn2 , fid , rel_path , idx , emb )
343+ finally :
344+ conn2 .close ()
345+ embedded_any = True
346+ except Exception as e :
347+ try :
348+ err_content = f"Failed to insert chunk vector: { e } \n \n Traceback:\n { traceback .format_exc ()} "
349+ print (err_content )
350+ except Exception :
351+ logger .exception ("Failed to write chunk-insert error to disk for %s chunk %d" , rel_path , idx )
352+ else :
353353 try :
354- err_content = f"Failed to insert chunk vector: { e } \n \n Traceback: \n { traceback . format_exc () } "
354+ err_content = "Embedding API returned no vector for chunk. "
355355 print (err_content )
356356 except Exception :
357- logger .exception ("Failed to write chunk-insert error to disk for %s chunk %d" , rel_path , idx )
358- else :
357+ logger .exception ("Failed to write empty-embedding error to disk for %s chunk %d" , rel_path , idx )
358+
359+ except Exception :
360+ # ensure semaphore is released on unexpected exception
359361 try :
360- err_content = "Embedding API returned no vector for chunk."
361- print (err_content )
362+ semaphore .release ()
362363 except Exception :
363- logger .exception ("Failed to write empty-embedding error to disk for %s chunk %d" , rel_path , idx )
364+ pass
365+ raise
364366
365367 return {"stored" : True , "embedded" : embedded_any }
366- except Exception as e :
368+ except Exception :
367369 tb = traceback .format_exc ()
368370 try :
369- error_payload = {"file" : rel_path , "error" : str (e ), "traceback" : tb [:2000 ]}
370- # write the error payload to disk instead of DB
371+ error_payload = {"file" : rel_path , "error" : "processing error" , "traceback" : tb [:2000 ]}
371372 try :
372373 print (error_payload )
373374 except Exception :
@@ -377,26 +378,28 @@ def _insert_task(dbp, fid_local, pth, idx_local, vector_local):
377378 return {"stored" : False , "embedded" : False }
378379
379380
380- async def analyze_local_path (
381+ def analyze_local_path_sync (
381382 local_path : str ,
382383 database_path : str ,
383384 venv_path : Optional [str ] = None ,
384385 max_file_size : int = 200000 ,
385386 cfg : Optional [dict ] = None ,
386387):
387388 """
388- Async implementation of the analysis pipeline. Persists incremental counts so the UI can poll progress.
389+ Synchronous implementation of the analysis pipeline.
390+ Submits per-file tasks to a shared ThreadPoolExecutor and updates DB counts/status synchronously.
389391 """
390392 aid = None
391- semaphore = asyncio .Semaphore (EMBEDDING_CONCURRENCY )
393+ semaphore = threading .Semaphore (EMBEDDING_CONCURRENCY )
392394 try :
393395 name = os .path .basename (os .path .abspath (local_path )) or local_path
394- aid = await _run_in_executor ( create_analysis , database_path , name , local_path , "running" )
396+ aid = create_analysis ( database_path , name , local_path , "running" )
395397
396398 file_count = 0
397399 emb_count = 0
398- tasks = []
400+ file_paths : List [ Dict [ str , str ]] = []
399401
402+ # Collect files to process
400403 for root , dirs , files in os .walk (local_path ):
401404 for fname in files :
402405 full = os .path .join (root , fname )
@@ -407,57 +410,101 @@ async def analyze_local_path(
407410 continue
408411 except Exception :
409412 continue
410- # schedule processing but don't block the loop
411- tasks .append (_process_file (semaphore , database_path , aid , full , rel , cfg ))
412-
413- # execute tasks with bounded concurrency handled inside _process_file
414- # gather results in chunks and persist incremental counts after each chunk
415- for chunk_start in range (0 , len (tasks ), 256 ):
416- chunk = tasks [chunk_start : chunk_start + 256 ]
417- results = await asyncio .gather (* chunk , return_exceptions = False )
418- for r in results :
419- if isinstance (r , dict ):
420- if r .get ("stored" ):
421- file_count += 1
422- if r .get ("embedded" ):
423- emb_count += 1
424-
425- # detect uv usage and deps (run in executor because it may use subprocess / file IO)
426- uv_info = await _run_in_executor (lambda p , v : (None if p is None else p ), local_path , venv_path )
413+ file_paths .append ({"full" : full , "rel" : rel })
414+
415+ # Process files in chunks to avoid too many futures at once.
416+ CHUNK_SUBMIT = 256
417+ for chunk_start in range (0 , len (file_paths ), CHUNK_SUBMIT ):
418+ chunk = file_paths [chunk_start : chunk_start + CHUNK_SUBMIT ]
419+ futures = []
420+ for f in chunk :
421+ fut = _EXECUTOR .submit (
422+ _process_file_sync ,
423+ semaphore ,
424+ database_path ,
425+ aid ,
426+ f ["full" ],
427+ f ["rel" ],
428+ cfg ,
429+ )
430+ futures .append (fut )
431+
432+ for fut in concurrent .futures .as_completed (futures ):
433+ try :
434+ r = fut .result ()
435+ if isinstance (r , dict ):
436+ if r .get ("stored" ):
437+ file_count += 1
438+ if r .get ("embedded" ):
439+ emb_count += 1
440+ except Exception :
441+ logger .exception ("A per-file task failed for analysis %s" , aid )
442+
443+ # store uv_detected.json metadata if possible
444+ uv_info = None
445+ try :
446+ uv_info = None if local_path is None else local_path
447+ except Exception :
448+ uv_info = None
449+
427450 try :
428- # uv_detected.json is meta information — we keep storing meta in DB as before
429- await _run_in_executor (
430- store_file ,
451+ store_file (
431452 database_path ,
432453 aid ,
433454 "uv_detected.json" ,
434455 json .dumps (uv_info , indent = 2 ),
435456 "meta" ,
436457 )
437458 except Exception :
438- # if storing meta fails, log to disk
439459 try :
440460 print ("Failed to store uv_detected.json in DB" )
441461 except Exception :
442462 logger .exception ("Failed to write uv_detected meta error to disk for analysis %s" , aid )
443463
444464 # final counts & status
445- await _run_in_executor (update_analysis_counts , database_path , aid , file_count , emb_count )
446- await _run_in_executor (update_analysis_status , database_path , aid , "completed" )
465+ try :
466+ # update_analysis_counts may be defined elsewhere; call if present
467+ try :
468+ update_analysis_counts # type: ignore
469+ update_analysis_counts (database_path , aid , file_count , emb_count ) # type: ignore
470+ except NameError :
471+ # function not present; skip
472+ pass
473+ except Exception :
474+ logger .exception ("Failed to update analysis counts for %s" , aid )
475+
476+ try :
477+ update_analysis_status (database_path , aid , "completed" )
478+ except Exception :
479+ logger .exception ("Failed to set analysis status to completed for %s" , aid )
480+
447481 except Exception :
448482 try :
449483 if aid :
450- await _run_in_executor (update_analysis_status , database_path , aid , "failed" )
484+ try :
485+ update_analysis_status (database_path , aid , "failed" )
486+ except Exception :
487+ pass
451488 except Exception :
452489 pass
453490 traceback .print_exc ()
454491
455492
456493def analyze_local_path_background (local_path : str , database_path : str , venv_path : Optional [str ] = None , max_file_size : int = 200000 , cfg : Optional [dict ] = None ):
457494 """
458- Blocking wrapper for the async analyze_local_path.
495+ Non-blocking wrapper intended to be scheduled by FastAPI BackgroundTasks.
496+ This function starts a daemon thread which runs the synchronous analyzer and returns immediately.
497+ Usage from FastAPI endpoint:
498+ background_tasks.add_task(analyze_local_path_background, local_path, database_path, venv_path, max_file_size, cfg)
459499 """
460- asyncio .run (analyze_local_path (local_path , database_path , venv_path = venv_path , max_file_size = max_file_size , cfg = cfg ))
500+ def _worker ():
501+ try :
502+ analyze_local_path_sync (local_path , database_path , venv_path = venv_path , max_file_size = max_file_size , cfg = cfg )
503+ except Exception :
504+ logger .exception ("Background analysis worker failed for %s" , local_path )
505+
506+ t = threading .Thread (target = _worker , name = f"analysis-worker-{ os .path .basename (local_path )} " , daemon = True )
507+ t .start ()
461508
462509
463510# Simple synchronous helpers preserved for compatibility --------------------------------
@@ -481,7 +528,7 @@ def cosine(a, b):
481528def search_semantic (query : str , database_path : str , analysis_id : int , top_k : int = 5 ):
482529 """
483530 Uses sqlite-vector's vector_full_scan to retrieve best-matching chunks and returns
484- a list of {file_id, path, chunk_index, score}. Raises on error in strict mode.
531+ a list of {file_id, path, chunk_index, score}.
485532 """
486533 q_emb = get_embedding_for_text (query )
487534 if not q_emb :
@@ -490,7 +537,6 @@ def search_semantic(query: str, database_path: str, analysis_id: int, top_k: int
490537 try :
491538 return _search_vectors (database_path , q_emb , top_k = top_k )
492539 except Exception :
493- # propagate error so operator sees underlying issue (extension not loaded/api mismatch)
494540 raise
495541
496542
0 commit comments