From 448ba8c0c9a3eac0088b3ccfccf535d402934fa5 Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Mon, 6 Oct 2025 20:28:50 +0530 Subject: [PATCH 01/11] Adding metrics logger --- web-agent/app/worker.py | 283 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 279 insertions(+), 4 deletions(-) diff --git a/web-agent/app/worker.py b/web-agent/app/worker.py index 2c1d9b3..f290511 100644 --- a/web-agent/app/worker.py +++ b/web-agent/app/worker.py @@ -5,6 +5,7 @@ monkey.patch_all() import argparse +import atexit import base64 import gzip import json @@ -12,6 +13,7 @@ import os import shutil import secrets +import signal import string import tempfile import time @@ -19,7 +21,8 @@ from collections import deque from logging.handlers import TimedRotatingFileHandler from pathlib import Path -from typing import Optional, Tuple, Any, Dict, Union +from typing import Optional, Tuple, Any, Dict, Union, List +from urllib.parse import urlparse, urlunparse import requests from gevent.pool import Pool @@ -45,17 +48,34 @@ # throttling to 25 requests per seconds to avoid rate limit errors rate_limiter = None config_dict: dict = None +metrics_logger = None def main() -> None: - global config_dict, logger, rate_limiter + global config_dict, logger, rate_limiter, metrics_logger - # Instantiate RateLimiter for 25 requests per 15 seconds window rate_limiter = RateLimiter(request_limit=25, time_window=15) parser = argparse.ArgumentParser() config_dict, agent_index, debug_mode = get_initial_config(parser) logger = setup_logger(agent_index, debug_mode) + + # Initialize metrics logger + metrics_folder = os.path.join(armorcode_folder, 'metrics') + _createFolder(metrics_folder) + metrics_file = os.path.join(metrics_folder, f'metrics{agent_index}.json') + metrics_logger = BufferedMetricsLogger(metrics_file, flush_interval=10, buffer_size=1000) + + # Register shutdown handlers to flush metrics + def shutdown_handler(signum=None, frame=None): + logger.info("Shutting down, flushing remaining metrics...") + metrics_logger.flush_now() + logger.info("Metrics flushed. Exiting.") + + atexit.register(shutdown_handler) + signal.signal(signal.SIGTERM, shutdown_handler) + signal.signal(signal.SIGINT, shutdown_handler) + logger.info("Agent Started for url %s, verify %s, timeout %s, outgoing proxy %s, inward %s, uploadToAc %s", config_dict.get('server_url'), config_dict.get('verify_cert', False), config_dict.get('timeout', 10), config_dict['outgoing_proxy'], @@ -86,6 +106,7 @@ def process() -> None: params['envName'] = config_dict['env_name'] logger.info("Requesting task from %s", get_task_server_url) + get_task_start_time = time.time() get_task_response: requests.Response = requests.get( get_task_server_url, headers=headers, @@ -93,13 +114,30 @@ def process() -> None: proxies=config_dict['outgoing_proxy'], params=params ) + get_task_duration_ms = (time.time() - get_task_start_time) * 1000 if get_task_response.status_code == 200: thread_backoff_time = min_backoff_time task: Optional[Dict[str, Any]] = get_task_response.json().get('data', None) + + # Track get-task metric + metrics_logger.write_metric( + "http.request.duration_ms", + get_task_duration_ms, + tags={ + "task_id": task.get('taskId', 'none') if task else "none", + "operation": "get_task", + "url": _get_url_without_params(get_task_server_url), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "GET", + "status_code": "200", + "has_task": str(task is not None).lower() + } + ) + if task is None: logger.info("Received empty task") - time.sleep(5) # Wait before requesting next task + time.sleep(5) continue logger.info("Received task: %s", task['taskId']) @@ -112,13 +150,50 @@ def process() -> None: thread_pool.wait_available() # Wait if the thread_pool is full thread_pool.spawn(process_task_async, task) # Submit the task when free elif get_task_response.status_code == 204: + metrics_logger.write_metric( + "http.request.duration_ms", + get_task_duration_ms, + tags={ + "task_id": "none", + "operation": "get_task", + "url": _get_url_without_params(get_task_server_url), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "GET", + "status_code": "204", + "has_task": "false" + } + ) logger.info("No task available. Waiting...") time.sleep(5) elif get_task_response.status_code > 500: + metrics_logger.write_metric( + "http.request.duration_ms", + get_task_duration_ms, + tags={ + "task_id": "none", + "operation": "get_task", + "url": _get_url_without_params(get_task_server_url), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "GET", + "status_code": str(get_task_response.status_code) + } + ) logger.error("Getting 5XX error %d, increasing backoff time", get_task_response.status_code) time.sleep(thread_backoff_time) thread_backoff_time = min(max_backoff_time, thread_backoff_time * 2) else: + metrics_logger.write_metric( + "http.request.duration_ms", + get_task_duration_ms, + tags={ + "task_id": "none", + "operation": "get_task", + "url": _get_url_without_params(get_task_server_url), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "GET", + "status_code": str(get_task_response.status_code) + } + ) logger.error("Unexpected response: %d", get_task_response.status_code) time.sleep(5) @@ -154,17 +229,50 @@ def update_task(task: Optional[Dict[str, Any]], count: int = 0) -> None: return try: rate_limiter.throttle() + update_start_time = time.time() update_task_response: requests.Response = requests.post( f"{config_dict.get('server_url')}/api/http-teleport/put-result", headers=_get_headers(), json=task, timeout=30, verify=config_dict.get('verify_cert'), proxies=config_dict['outgoing_proxy'] ) + update_duration_ms = (time.time() - update_start_time) * 1000 if update_task_response.status_code == 200: logger.info("Task %s updated successfully. Response: %s", task['taskId'], update_task_response.text) + + # Track successful update + metrics_logger.write_metric( + "http.request.duration_ms", + update_duration_ms, + tags={ + "task_id": task['taskId'], + "operation": "upload_result", + "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/put-result"), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "POST", + "status_code": "200", + "success": "true" + } + ) elif update_task_response.status_code == 429 or update_task_response.status_code == 504: + # Track rate limit / timeout + metrics_logger.write_metric( + "http.request.duration_ms", + update_duration_ms, + tags={ + "task_id": task['taskId'], + "operation": "upload_result", + "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/put-result"), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "POST", + "status_code": str(update_task_response.status_code), + "success": "false", + "error_type": "rate_limit" if update_task_response.status_code == 429 else "timeout" + } + ) + time.sleep(2) logger.warning("Rate limit hit while updating the task output, retrying again for task %s", task['taskId']) count = count + 1 @@ -172,9 +280,42 @@ def update_task(task: Optional[Dict[str, Any]], count: int = 0) -> None: else: logger.warning("Failed to update task %s: %s", task['taskId'], update_task_response.text) + # Track failed update + metrics_logger.write_metric( + "http.request.duration_ms", + update_duration_ms, + tags={ + "task_id": task['taskId'], + "operation": "upload_result", + "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/put-result"), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "POST", + "status_code": str(update_task_response.status_code), + "success": "false", + "error_type": "server_error" + } + ) + except requests.exceptions.RequestException as e: logger.error("Network error processing task %s: %s", task['taskId'], e) + + # Track network error + metrics_logger.write_metric( + "http.request.duration_ms", + 0, + tags={ + "task_id": task['taskId'], + "operation": "upload_result", + "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/put-result"), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "POST", + "status_code": "network_error", + "success": "false", + "error_type": "network" + } + ) + count = count + 1 update_task(task, count) @@ -236,6 +377,8 @@ def process_task(task: Dict[str, Any]) -> Optional[dict[str, Any]]: expiryTime: int = task.get('expiryTsMs', round((time.time() + 300) * 1000)) logger.info("Processing task %s: %s %s", taskId, method, url) + task_start_time = time.time() + # creating temp file to store outputs _createFolder(log_folder) # create folder to store log files _createFolder(output_file_folder) # create folder to store output files @@ -277,11 +420,28 @@ def process_task(task: Dict[str, Any]) -> Optional[dict[str, Any]]: logger.debug("Input data is not str or bytes %s", input_data) + http_start_time = time.time() response: requests.Response = requests.request(method, url, headers=headers, data=encoded_input_data, stream=True, timeout=(15, timeout), verify=config_dict.get('verify_cert'), proxies=config_dict['inward_proxy']) + http_duration_ms = (time.time() - http_start_time) * 1000 logger.info("Response: %d", response.status_code) + # Track HTTP request to target URL + metrics_logger.write_metric( + "http.request.duration_ms", + http_duration_ms, + tags={ + "task_id": taskId, + "operation": "target_request", + "url": _get_url_without_params(url), + "domain": urlparse(url).netloc, + "method": method, + "status_code": str(response.status_code), + "success": str(response.status_code < 400).lower() + } + ) + data: Any = None if response.status_code == 200: # Check if the response is chunked @@ -322,6 +482,17 @@ def process_task(task: Dict[str, Any]) -> Optional[dict[str, Any]]: base64_string = base64.b64encode(file_data).decode('utf-8') task['responseBase64'] = True task['output'] = base64_string + + # Track inline upload size + metrics_logger.write_metric( + "upload.size_bytes", + file_size, + tags={ + "task_id": taskId, + "upload_type": "inline" + } + ) + return task return upload_response(temp_output_file.name, temp_output_file_zip.name, taskId, task) @@ -334,6 +505,19 @@ def process_task(task: Dict[str, Any]) -> Optional[dict[str, Any]]: task['statusCode'] = 500 task['output'] = f"Agent Side Error: Error: {str(e)}" finally: + # Track overall task processing duration + task_total_duration_ms = (time.time() - task_start_time) * 1000 + metrics_logger.write_metric( + "task.processing_duration_ms", + task_total_duration_ms, + tags={ + "task_id": taskId, + "method": method, + "domain": urlparse(url).netloc, + "http_status": str(task.get('statusCode', 'unknown')) + } + ) + temp_output_file.close() temp_output_file_zip.close() os.unlink(temp_output_file.name) @@ -369,6 +553,8 @@ def upload_response(temp_file, temp_file_zip, taskId: str, task: Dict[str, Any]) file_path = temp_file_zip if success else temp_file task['responseZipped'] = success file_name = f"{taskId}_{uuid.uuid4().hex}.{'zip' if success else 'txt'}" + file_size = os.path.getsize(file_path) + headers: Dict[str, str] = { "Authorization": f"Bearer {config_dict['api_key']}", } @@ -380,13 +566,41 @@ def upload_response(temp_file, temp_file_zip, taskId: str, task: Dict[str, Any]) # If you have multiple files, you can add them here as more entries } rate_limiter.throttle() + upload_start_time = time.time() upload_result: requests.Response = requests.post( f"{config_dict.get('server_url')}/api/http-teleport/upload-result", headers=headers, timeout=300, verify=config_dict.get('verify_cert', False), proxies=config_dict['outgoing_proxy'], files=files ) + upload_duration_ms = (time.time() - upload_start_time) * 1000 logger.info("Upload result response: %s, code: %d", upload_result.text, upload_result.status_code) + + # Track file upload metrics + metrics_logger.write_metric( + "http.request.duration_ms", + upload_duration_ms, + tags={ + "task_id": taskId, + "operation": "upload_file", + "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/upload-result"), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "POST", + "status_code": str(upload_result.status_code), + "success": str(upload_result.status_code < 400).lower() + } + ) + + # Track upload size + metrics_logger.write_metric( + "upload.size_bytes", + file_size, + tags={ + "task_id": taskId, + "upload_type": "s3" + } + ) + upload_result.raise_for_status() return None except Exception as e: @@ -441,6 +655,67 @@ def throttle(self) -> None: time.sleep(0.5) +class BufferedMetricsLogger: + """Buffered metrics logger for DataDog. Flushes periodically to preserve timestamps.""" + + def __init__(self, metrics_file: str, flush_interval: int = 10, buffer_size: int = 1000): + Path(metrics_file).parent.mkdir(parents=True, exist_ok=True) + self.flush_interval = flush_interval + self.buffer_size = buffer_size + self.buffer: List[Dict] = [] + self.buffer_lock = threading.Lock() + self.last_flush_time = time.time() + + self.file_logger = logging.getLogger('metrics_file') + self.file_logger.setLevel(logging.INFO) + self.file_logger.propagate = False + + handler = TimedRotatingFileHandler(metrics_file, when="midnight", interval=1, backupCount=7) + handler.setFormatter(logging.Formatter('%(message)s')) + self.file_logger.addHandler(handler) + + self.flush_thread = threading.Thread(target=self._auto_flush_loop, daemon=True) + self.flush_thread.start() + + def write_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None): + timestamp_ms = int(time.time() * 1000) + metric_event = { + "@timestamp": timestamp_ms, + "metric_name": metric_name, + "value": value, + "tags": tags or {} + } + with self.buffer_lock: + self.buffer.append(metric_event) + if len(self.buffer) >= self.buffer_size: + self._flush() + + def _flush(self): + if not self.buffer: + return + for event in self.buffer: + self.file_logger.info(json.dumps(event)) + self.buffer.clear() + self.last_flush_time = time.time() + + def _auto_flush_loop(self): + while True: + time.sleep(self.flush_interval) + with self.buffer_lock: + if self.buffer and (time.time() - self.last_flush_time) >= self.flush_interval: + self._flush() + + def flush_now(self): + with self.buffer_lock: + self._flush() + + +def _get_url_without_params(url: str) -> str: + """Remove query parameters from URL.""" + parsed = urlparse(url) + return urlunparse((parsed.scheme, parsed.netloc, parsed.path, '', '', '')) + + def upload_s3(temp_file, preSignedUrl: str, headers: Dict[str, Any]) -> bool: headersForS3: Dict[str, str] = {} if 'Content-Encoding' in headers and headers['Content-Encoding'] is not None: From 87abbd708a63c1ecc0a0b690d1bc8c9de0ddbf51 Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Wed, 8 Oct 2025 15:36:52 +0530 Subject: [PATCH 02/11] Adding metrics logger --- web-agent/app/worker.py | 91 +++++++++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 30 deletions(-) diff --git a/web-agent/app/worker.py b/web-agent/app/worker.py index f290511..18d1755 100644 --- a/web-agent/app/worker.py +++ b/web-agent/app/worker.py @@ -1,6 +1,4 @@ #!/usr/bin/env python3 -import threading - from gevent import monkey; monkey.patch_all() @@ -24,7 +22,9 @@ from typing import Optional, Tuple, Any, Dict, Union, List from urllib.parse import urlparse, urlunparse +import gevent import requests +from gevent.lock import Semaphore from gevent.pool import Pool # Global variables @@ -69,8 +69,8 @@ def main() -> None: # Register shutdown handlers to flush metrics def shutdown_handler(signum=None, frame=None): logger.info("Shutting down, flushing remaining metrics...") - metrics_logger.flush_now() - logger.info("Metrics flushed. Exiting.") + metrics_logger.shutdown() + logger.info("Metrics flushed and greenlet stopped. Exiting.") atexit.register(shutdown_handler) signal.signal(signal.SIGTERM, shutdown_handler) @@ -88,9 +88,29 @@ def shutdown_handler(signum=None, frame=None): process() +def _get_task_from_server(headers: Dict[str, str], params: Dict[str, str], get_task_server_url: str) -> Tuple[requests.Response, float]: + """Execute get-task request in a separate greenlet to prevent LoopExit.""" + get_task_start_time = time.time() + get_task_response: requests.Response = requests.get( + get_task_server_url, + headers=headers, + timeout=25, verify=config_dict.get('verify_cert', False), + proxies=config_dict['outgoing_proxy'], + params=params + ) + get_task_duration_ms = (time.time() - get_task_start_time) * 1000 + return get_task_response, get_task_duration_ms + + def process() -> None: headers: Dict[str, str] = _get_headers() thread_backoff_time: int = min_backoff_time + + # Note: Keepalive greenlet not needed because: + # 1. Main loop waits with .get(timeout=30) which registers a timer + # 2. Flush greenlet has gevent.sleep(10) which registers a timer + # 3. These ensure hub always has pending > 0 + # thread_pool = Pool(config_dict['thread_pool_size']) while True: try: @@ -106,15 +126,16 @@ def process() -> None: params['envName'] = config_dict['env_name'] logger.info("Requesting task from %s", get_task_server_url) - get_task_start_time = time.time() - get_task_response: requests.Response = requests.get( - get_task_server_url, - headers=headers, - timeout=25, verify=config_dict.get('verify_cert', False), - proxies=config_dict['outgoing_proxy'], - params=params - ) - get_task_duration_ms = (time.time() - get_task_start_time) * 1000 + + # Spawn get-task in separate greenlet to keep main loop active (prevents LoopExit) + get_task_greenlet = gevent.spawn(_get_task_from_server, headers, params, get_task_server_url) + + try: + get_task_response, get_task_duration_ms = get_task_greenlet.get(timeout=30) + except gevent.Timeout: + logger.error("Get-task request timed out after 30 seconds") + gevent.sleep(5) + continue if get_task_response.status_code == 200: thread_backoff_time = min_backoff_time @@ -137,7 +158,7 @@ def process() -> None: if task is None: logger.info("Received empty task") - time.sleep(5) + gevent.sleep(5) continue logger.info("Received task: %s", task['taskId']) @@ -147,8 +168,12 @@ def process() -> None: if thread_pool is None: process_task_async(task) else: - thread_pool.wait_available() # Wait if the thread_pool is full - thread_pool.spawn(process_task_async, task) # Submit the task when free + # Use helper greenlet to avoid blocking main loop (prevents LoopExit deadlock) + def spawn_when_available(pool, task_to_process): + pool.wait_available() + pool.spawn(process_task_async, task_to_process) + + gevent.spawn(spawn_when_available, thread_pool, task) elif get_task_response.status_code == 204: metrics_logger.write_metric( "http.request.duration_ms", @@ -164,7 +189,7 @@ def process() -> None: } ) logger.info("No task available. Waiting...") - time.sleep(5) + gevent.sleep(5) elif get_task_response.status_code > 500: metrics_logger.write_metric( "http.request.duration_ms", @@ -179,7 +204,7 @@ def process() -> None: } ) logger.error("Getting 5XX error %d, increasing backoff time", get_task_response.status_code) - time.sleep(thread_backoff_time) + gevent.sleep(thread_backoff_time) thread_backoff_time = min(max_backoff_time, thread_backoff_time * 2) else: metrics_logger.write_metric( @@ -195,14 +220,14 @@ def process() -> None: } ) logger.error("Unexpected response: %d", get_task_response.status_code) - time.sleep(5) + gevent.sleep(5) except requests.exceptions.RequestException as e: logger.error("Network error: %s", e) - time.sleep(10) # Wait longer on network errors + gevent.sleep(10) # Wait longer on network errors except Exception as e: logger.error("Unexpected error while processing: %s", e, exc_info=True) - time.sleep(5) + gevent.sleep(5) def process_task_async(task: Dict[str, Any]) -> None: @@ -217,7 +242,7 @@ def process_task_async(task: Dict[str, Any]) -> None: except Exception as e: logger.info("Unexpected error while processing task id: %s, method: %s url: %s, error: %s", taskId, method, url, e) - time.sleep(5) + gevent.sleep(5) def update_task(task: Optional[Dict[str, Any]], count: int = 0) -> None: @@ -273,7 +298,7 @@ def update_task(task: Optional[Dict[str, Any]], count: int = 0) -> None: } ) - time.sleep(2) + gevent.sleep(2) logger.warning("Rate limit hit while updating the task output, retrying again for task %s", task['taskId']) count = count + 1 update_task(task, count) @@ -630,7 +655,7 @@ def __init__(self, request_limit: int, time_window: int) -> None: self.request_limit = request_limit self.time_window = time_window self.timestamps = deque() - self.lock = threading.Lock() + self.lock = Semaphore() def set_limits(self, request_limit: int, time_window: int): self.request_limit = request_limit @@ -652,18 +677,18 @@ def allow_request(self) -> bool: def throttle(self) -> None: while not self.allow_request(): - time.sleep(0.5) + gevent.sleep(0.5) class BufferedMetricsLogger: - """Buffered metrics logger for DataDog. Flushes periodically to preserve timestamps.""" + """Buffered metrics logger for DataDog. Flushes periodically to preserve timestamps. Uses gevent primitives.""" def __init__(self, metrics_file: str, flush_interval: int = 10, buffer_size: int = 1000): Path(metrics_file).parent.mkdir(parents=True, exist_ok=True) self.flush_interval = flush_interval self.buffer_size = buffer_size self.buffer: List[Dict] = [] - self.buffer_lock = threading.Lock() + self.buffer_lock = Semaphore() self.last_flush_time = time.time() self.file_logger = logging.getLogger('metrics_file') @@ -674,8 +699,7 @@ def __init__(self, metrics_file: str, flush_interval: int = 10, buffer_size: int handler.setFormatter(logging.Formatter('%(message)s')) self.file_logger.addHandler(handler) - self.flush_thread = threading.Thread(target=self._auto_flush_loop, daemon=True) - self.flush_thread.start() + self.flush_greenlet = gevent.spawn(self._auto_flush_loop) def write_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None): timestamp_ms = int(time.time() * 1000) @@ -700,15 +724,22 @@ def _flush(self): def _auto_flush_loop(self): while True: - time.sleep(self.flush_interval) + gevent.sleep(self.flush_interval) with self.buffer_lock: if self.buffer and (time.time() - self.last_flush_time) >= self.flush_interval: self._flush() def flush_now(self): + """Flush all buffered metrics immediately.""" with self.buffer_lock: self._flush() + def shutdown(self): + """Flush remaining metrics and stop the flush greenlet.""" + self.flush_now() + if self.flush_greenlet and not self.flush_greenlet.dead: + self.flush_greenlet.kill() + def _get_url_without_params(url: str) -> str: """Remove query parameters from URL.""" From eab55d767c6f1459aebc619ef1f0459e9a30dd30 Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Fri, 24 Oct 2025 14:39:46 +0530 Subject: [PATCH 03/11] Adding retry for concurrent request exception --- web-agent/app/worker.py | 583 +++++++++++++++++++++++++++++----------- 1 file changed, 424 insertions(+), 159 deletions(-) diff --git a/web-agent/app/worker.py b/web-agent/app/worker.py index 18d1755..551e2a4 100644 --- a/web-agent/app/worker.py +++ b/web-agent/app/worker.py @@ -2,6 +2,7 @@ from gevent import monkey; monkey.patch_all() +import gevent import argparse import atexit import base64 @@ -9,6 +10,7 @@ import json import logging import os +import random import shutil import secrets import signal @@ -19,7 +21,7 @@ from collections import deque from logging.handlers import TimedRotatingFileHandler from pathlib import Path -from typing import Optional, Tuple, Any, Dict, Union, List +from typing import Optional, Tuple, Any, Dict, Union, List, Callable from urllib.parse import urlparse, urlunparse import gevent @@ -50,11 +52,18 @@ config_dict: dict = None metrics_logger = None +# CRITICAL: Semaphore to limit concurrent teleport endpoint calls +# Shared across all greenlets in the worker to prevent "Too many concurrent requests" errors +# Applies to ALL teleport endpoints: get-task, put-result, upload-logs, upload-result, upload-url +teleport_semaphore: Optional[Semaphore] = None + def main() -> None: - global config_dict, logger, rate_limiter, metrics_logger + global config_dict, logger, rate_limiter, metrics_logger, teleport_semaphore rate_limiter = RateLimiter(request_limit=25, time_window=15) + # Initialize semaphore to limit concurrent teleport endpoint calls (max 2) + teleport_semaphore = Semaphore(2) parser = argparse.ArgumentParser() config_dict, agent_index, debug_mode = get_initial_config(parser) @@ -91,17 +100,163 @@ def shutdown_handler(signum=None, frame=None): def _get_task_from_server(headers: Dict[str, str], params: Dict[str, str], get_task_server_url: str) -> Tuple[requests.Response, float]: """Execute get-task request in a separate greenlet to prevent LoopExit.""" get_task_start_time = time.time() - get_task_response: requests.Response = requests.get( - get_task_server_url, - headers=headers, - timeout=25, verify=config_dict.get('verify_cert', False), - proxies=config_dict['outgoing_proxy'], - params=params - ) + + # Acquire semaphore for teleport endpoint call + with teleport_semaphore: + get_task_response: requests.Response = requests.get( + get_task_server_url, + headers=headers, + timeout=25, verify=config_dict.get('verify_cert', False), + proxies=config_dict['outgoing_proxy'], + params=params + ) + get_task_duration_ms = (time.time() - get_task_start_time) * 1000 return get_task_response, get_task_duration_ms +# ============================================================================ +# Retry Infrastructure - Smart 429 Handling with Semaphore +# ============================================================================ + +def is_concurrent_limit_error(response: requests.Response) -> bool: + """ + Check if 429 response is due to concurrent request limit. + + This distinguishes between: + - Standard rate limiting (use header delay) + - Concurrent request limit (use random delay) + + Args: + response: HTTP response object + + Returns: + True if response indicates concurrent limit error, False otherwise + """ + if response.status_code == 429: + try: + return "Too many concurrent requests" in response.text + except Exception: + # If response.text fails (rare), assume not concurrent error + return False + return False + + +def get_retry_delay(response: requests.Response, default_delay: int = 2) -> float: + """ + Extract retry delay from response, detecting concurrent errors. + + Priority order: + 1. Check for concurrent error → random delay (0-10s) + 2. Check retry-after header → use header value + 3. Use default delay + + Args: + response: HTTP response object + default_delay: Fallback delay in seconds (default: 2) + + Returns: + Delay in seconds (validated, bounded) + """ + # Priority 1: Check for concurrent error + if is_concurrent_limit_error(response): + delay = random.uniform(0, 10) + logger.info(f"Concurrent limit error detected, using random delay: {delay:.2f}s") + return delay + + # Priority 2: Check retry-after header + retry_after = response.headers.get('X-Rate-Limit-Retry-After-Seconds') + + if retry_after: + try: + delay = int(retry_after) + + # Validate: must be positive + if delay < 0: + logger.warning( + f"Negative retry delay {delay}s in header, using default {default_delay}s" + ) + return default_delay + + # Validate: cap at 5 minutes + if delay > 300: + logger.warning( + f"Excessive retry delay {delay}s in header, capping at 300s" + ) + return 300 + + logger.info(f"Using retry-after header delay: {delay}s") + return delay + + except ValueError: + logger.warning( + f"Invalid retry delay '{retry_after}' in header, using default {default_delay}s" + ) + + return default_delay + + +def retry_on_429( + func: Callable[[], requests.Response], + max_retries: int = 5, + operation_name: str = "request" +) -> Optional[requests.Response]: + """ + Retry a function on 429 rate limit errors. + + Uses X-Rate-Limit-Retry-After-Seconds header if available, + or random delay (0-10s) for concurrent errors, + otherwise uses default 2-second delay. + + CRITICAL: Uses gevent.sleep() not time.sleep() to allow other greenlets + to run during retry delays, preventing hub from becoming empty. + + Args: + func: Function to call (must return requests.Response) + max_retries: Maximum retry attempts (default: 5) + operation_name: Name for logging + + Returns: + Response object, or None if unexpected error + + Raises: + requests.exceptions.RequestException: On network errors + """ + for attempt in range(max_retries + 1): + try: + response = func() + + # Success or non-429 error → return immediately + if response.status_code != 429: + return response + + # 429 but retries exhausted → return last response + if attempt >= max_retries: + logger.error( + f"{operation_name} failed after {max_retries} retries due to rate limiting" + ) + return response + + # 429 with retries remaining → sleep and retry + delay = get_retry_delay(response) + error_type = "concurrent limit" if is_concurrent_limit_error(response) else "rate limit" + logger.warning( + f"{operation_name} {error_type} hit " + f"(attempt {attempt + 1}/{max_retries + 1}), " + f"retrying in {delay:.2f}s" + ) + gevent.sleep(delay) + continue + + except requests.exceptions.RequestException as e: + logger.error(f"{operation_name} request error: {e}") + raise + + # Should never reach here + logger.error(f"{operation_name} unexpected loop exit") + return None + + def process() -> None: headers: Dict[str, str] = _get_headers() thread_backoff_time: int = min_backoff_time @@ -174,7 +329,7 @@ def spawn_when_available(pool, task_to_process): pool.spawn(process_task_async, task_to_process) gevent.spawn(spawn_when_available, thread_pool, task) - elif get_task_response.status_code == 204: + elif get_task_response.status_code == 429: metrics_logger.write_metric( "http.request.duration_ms", get_task_duration_ms, @@ -225,6 +380,9 @@ def spawn_when_available(pool, task_to_process): except requests.exceptions.RequestException as e: logger.error("Network error: %s", e) gevent.sleep(10) # Wait longer on network errors + except gevent.hub.LoopExit as e: + logger.error("Getting LoopExit Error, resetting the thread pool") + config_dict['thread_pool'] = Pool(config_dict['thread_pool_size']) except Exception as e: logger.error("Unexpected error while processing: %s", e, exc_info=True) gevent.sleep(5) @@ -245,104 +403,109 @@ def process_task_async(task: Dict[str, Any]) -> None: gevent.sleep(5) -def update_task(task: Optional[Dict[str, Any]], count: int = 0) -> None: +def _log_update_metrics( + task: Dict[str, Any], + response: requests.Response, + duration_ms: float +) -> None: + """ + Log metrics for update_task operation. + + Separated for testability and reuse across refactored functions. + + Args: + task: Task dictionary with taskId + response: HTTP response + duration_ms: Request duration in milliseconds + """ + tags = { + "task_id": task['taskId'], + "operation": "upload_result", + "url": _get_url_without_params( + f"{config_dict.get('server_url')}/api/http-teleport/put-result" + ), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "POST", + "status_code": str(response.status_code), + "success": str(response.status_code == 200).lower() + } + + # Add error type for failed requests + if response.status_code == 429: + tags["error_type"] = "rate_limit" + elif response.status_code == 504: + tags["error_type"] = "timeout" + elif response.status_code >= 500: + tags["error_type"] = "server_error" + elif response.status_code >= 400: + tags["error_type"] = "client_error" + + metrics_logger.write_metric( + "http.request.duration_ms", + duration_ms, + tags=tags + ) + + +def update_task(task: Optional[Dict[str, Any]]) -> None: + """ + Update task result to ArmorCode server. + + Retries on 429 rate limit errors up to 5 times, + respecting X-Rate-Limit-Retry-After-Seconds header or using + random delay (0-10s) for concurrent errors. + + Uses global teleport_semaphore to limit concurrent calls. + + Args: + task: Task dictionary with result data + """ if task is None: return - # Update the task status - if count > max_retry: - logger.error("Retry count exceeds for task %s", task['taskId']) - return - try: + + def _make_update_request() -> requests.Response: + """Inner function for HTTP request with semaphore protection.""" rate_limiter.throttle() update_start_time = time.time() - update_task_response: requests.Response = requests.post( - f"{config_dict.get('server_url')}/api/http-teleport/put-result", - headers=_get_headers(), - json=task, - timeout=30, verify=config_dict.get('verify_cert'), proxies=config_dict['outgoing_proxy'] - ) - update_duration_ms = (time.time() - update_start_time) * 1000 - - if update_task_response.status_code == 200: - logger.info("Task %s updated successfully. Response: %s", task['taskId'], - update_task_response.text) - # Track successful update - metrics_logger.write_metric( - "http.request.duration_ms", - update_duration_ms, - tags={ - "task_id": task['taskId'], - "operation": "upload_result", - "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/put-result"), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "POST", - "status_code": "200", - "success": "true" - } - ) - elif update_task_response.status_code == 429 or update_task_response.status_code == 504: - # Track rate limit / timeout - metrics_logger.write_metric( - "http.request.duration_ms", - update_duration_ms, - tags={ - "task_id": task['taskId'], - "operation": "upload_result", - "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/put-result"), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "POST", - "status_code": str(update_task_response.status_code), - "success": "false", - "error_type": "rate_limit" if update_task_response.status_code == 429 else "timeout" - } + # Acquire semaphore for the HTTP call + with teleport_semaphore: + response = requests.post( + f"{config_dict.get('server_url')}/api/http-teleport/put-result", + headers=_get_headers(), + json=task, + timeout=30, + verify=config_dict.get('verify_cert'), + proxies=config_dict['outgoing_proxy'] ) - gevent.sleep(2) - logger.warning("Rate limit hit while updating the task output, retrying again for task %s", task['taskId']) - count = count + 1 - update_task(task, count) - else: - logger.warning("Failed to update task %s: %s", task['taskId'], update_task_response.text) - - # Track failed update - metrics_logger.write_metric( - "http.request.duration_ms", - update_duration_ms, - tags={ - "task_id": task['taskId'], - "operation": "upload_result", - "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/put-result"), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "POST", - "status_code": str(update_task_response.status_code), - "success": "false", - "error_type": "server_error" - } - ) + # Metrics logging happens OUTSIDE semaphore (don't hold it longer than needed) + update_duration_ms = (time.time() - update_start_time) * 1000 + _log_update_metrics(task, response, update_duration_ms) + return response - except requests.exceptions.RequestException as e: - logger.error("Network error processing task %s: %s", task['taskId'], e) - - # Track network error - metrics_logger.write_metric( - "http.request.duration_ms", - 0, - tags={ - "task_id": task['taskId'], - "operation": "upload_result", - "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/put-result"), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "POST", - "status_code": "network_error", - "success": "false", - "error_type": "network" - } + # Use retry wrapper + try: + response = retry_on_429( + _make_update_request, + max_retries=5, + operation_name=f"update_task[{task['taskId']}]" ) - count = count + 1 - update_task(task, count) + # Handle response + if response and response.status_code == 200: + logger.info(f"Task {task['taskId']} updated successfully. Response: {response.text}") + elif response and response.status_code == 504: + logger.warning(f"Timeout updating task {task['taskId']}: {response.text}") + elif response and response.status_code == 429: + logger.warning(f"Rate limit updating task {task['taskId']} after all retries") + elif response: + logger.warning(f"Failed to update task {task['taskId']}: {response.text}") + + except requests.exceptions.RequestException as e: + logger.error(f"Network error updating task {task['taskId']}: {e}") + # Note: Network errors are propagated from retry_on_429, no need to retry again here def _get_headers() -> Dict[str, str]: @@ -354,39 +517,75 @@ def _get_headers() -> Dict[str, str]: def check_for_logs_fetch(url, task, temp_output_file_zip): + """ + Check if this is a logs fetch request and upload logs if so. + + Includes retry on 429 rate limit errors with semaphore protection. + + Args: + url: Request URL + task: Task dictionary + temp_output_file_zip: Temporary file for zipped logs + + Returns: + True if logs were uploaded, False otherwise + """ if 'agent/fetch-logs' in url and 'fetchLogs' in task.get('taskId'): try: - # Zip the logs_folder shutil.make_archive(temp_output_file_zip.name[:-4], 'zip', log_folder) # Update the task with the zip file information task['responseZipped'] = True + logger.info(f"Logs zipped successfully: {temp_output_file_zip.name}") + + # Prepare upload data headers: Dict[str, str] = { "Authorization": f"Bearer {config_dict['api_key']}", } - logger.info(f"Logs zipped successfully: {temp_output_file_zip.name}") task_json = json.dumps(task) files = { - # 'fileFieldName' is the name of the form field expected by the server - "file": (temp_output_file_zip.name, open(temp_output_file_zip.name, "rb"), f"{'application/zip'}"), + "file": (temp_output_file_zip.name, open(temp_output_file_zip.name, "rb"), "application/zip"), "task": (None, task_json, "application/json") } - rate_limiter.throttle() + upload_logs_url = f"{config_dict.get('server_url')}/api/http-teleport/upload-logs" if len(config_dict.get('env_name', '')) > 0: - upload_logs_url = f"{config_dict.get('server_url')}/api/http-teleport/upload-logs?envName={config_dict.get('env_name')}" - upload_result: requests.Response = requests.post( - upload_logs_url, - headers=headers, - timeout=300, verify=config_dict.get('verify_cert', False), proxies=config_dict['outgoing_proxy'], - files=files + upload_logs_url += f"?envName={config_dict.get('env_name')}" + + # Inner function for HTTP call with semaphore protection + def _upload_logs() -> requests.Response: + """Inner function for logs upload request.""" + rate_limiter.throttle() + + # Acquire semaphore for teleport endpoint call + with teleport_semaphore: + return requests.post( + upload_logs_url, + headers=headers, + timeout=300, + verify=config_dict.get('verify_cert', False), + proxies=config_dict['outgoing_proxy'], + files=files + ) + + # Use retry wrapper + response = retry_on_429( + _upload_logs, + max_retries=5, + operation_name="upload_logs" ) - if upload_result.status_code == 200: + + if response and response.status_code == 200: + logger.info("Logs uploaded successfully") return True else: - logger.error("Response code while uploading is not 200 , response code {} and error {} ", upload_result.status_code, upload_result.content) - return True + logger.error( + f"Failed to upload logs: code={response.status_code if response else 'None'}, " + f"error={response.content if response else 'None'}" + ) + return True # Still return True to maintain existing behavior + except Exception as e: logger.error(f"Error zipping logs: {str(e)}") raise e @@ -572,6 +771,20 @@ def zip_response(temp_file, temp_file_zip) -> bool: def upload_response(temp_file, temp_file_zip, taskId: str, task: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Upload task response to ArmorCode server. + + Includes retry on 429 rate limit errors with semaphore protection. + + Args: + temp_file: Temporary file with response + temp_file_zip: Temporary file for zipped response + taskId: Task ID + task: Task dictionary + + Returns: + None if uploaded to ArmorCode, task dict if using S3 + """ if config_dict.get('upload_to_ac', True): try: success = zip_response(temp_file, temp_file_zip) @@ -585,48 +798,68 @@ def upload_response(temp_file, temp_file_zip, taskId: str, task: Dict[str, Any]) } task_json = json.dumps(task) files = { - # 'fileFieldName' is the name of the form field expected by the server "file": (file_name, open(file_path, "rb"), f"{'application/zip' if success else 'text/plain'}"), "task": (None, task_json, "application/json") - # If you have multiple files, you can add them here as more entries } - rate_limiter.throttle() - upload_start_time = time.time() - upload_result: requests.Response = requests.post( - f"{config_dict.get('server_url')}/api/http-teleport/upload-result", - headers=headers, - timeout=300, verify=config_dict.get('verify_cert', False), proxies=config_dict['outgoing_proxy'], - files=files - ) - upload_duration_ms = (time.time() - upload_start_time) * 1000 - logger.info("Upload result response: %s, code: %d", upload_result.text, upload_result.status_code) - # Track file upload metrics - metrics_logger.write_metric( - "http.request.duration_ms", - upload_duration_ms, - tags={ - "task_id": taskId, - "operation": "upload_file", - "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/upload-result"), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "POST", - "status_code": str(upload_result.status_code), - "success": str(upload_result.status_code < 400).lower() - } - ) + # Inner function for HTTP call with semaphore protection + def _upload_result_file() -> requests.Response: + """Inner function for result file upload.""" + rate_limiter.throttle() + upload_start_time = time.time() + + # Acquire semaphore for teleport endpoint call + with teleport_semaphore: + response = requests.post( + f"{config_dict.get('server_url')}/api/http-teleport/upload-result", + headers=headers, + timeout=300, + verify=config_dict.get('verify_cert', False), + proxies=config_dict['outgoing_proxy'], + files=files + ) + + # Metrics logging happens OUTSIDE semaphore + upload_duration_ms = (time.time() - upload_start_time) * 1000 + + # Track file upload metrics + metrics_logger.write_metric( + "http.request.duration_ms", + upload_duration_ms, + tags={ + "task_id": taskId, + "operation": "upload_file", + "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/upload-result"), + "domain": urlparse(config_dict.get('server_url')).netloc, + "method": "POST", + "status_code": str(response.status_code), + "success": str(response.status_code < 400).lower() + } + ) - # Track upload size - metrics_logger.write_metric( - "upload.size_bytes", - file_size, - tags={ - "task_id": taskId, - "upload_type": "s3" - } + # Track upload size + metrics_logger.write_metric( + "upload.size_bytes", + file_size, + tags={ + "task_id": taskId, + "upload_type": "direct" + } + ) + + return response + + # Use retry wrapper + upload_result = retry_on_429( + _upload_result_file, + max_retries=5, + operation_name=f"upload_result[{taskId}]" ) - upload_result.raise_for_status() + if upload_result: + logger.info("Upload result response: %s, code: %d", upload_result.text, upload_result.status_code) + upload_result.raise_for_status() + return None except Exception as e: logger.error("Unable to upload file to armorcode: %s", e) @@ -782,26 +1015,58 @@ def _createFolder(folder_path: str) -> None: def get_s3_upload_url(taskId: str) -> Tuple[Optional[str], Optional[str]]: + """ + Get S3 upload URL from ArmorCode server. + + Retries on 429 rate limit errors up to 5 times with semaphore protection. + + Args: + taskId: Task ID for filename generation + + Returns: + Tuple of (putUrl, getUrl) or (None, None) on error + """ params: Dict[str, str] = {'fileName': f"{taskId}{uuid.uuid4().hex}"} - try: + + def _request_upload_url() -> requests.Response: + """Inner function for S3 URL request with semaphore protection.""" rate_limiter.throttle() - get_s3_url: requests.Response = requests.get( - f"{config_dict.get('server_url')}/api/http-teleport/upload-url", - params=params, - headers=_get_headers(), - timeout=25, verify=config_dict.get('verify_cert', False), proxies=config_dict['outgoing_proxy'] + + # Acquire semaphore for teleport endpoint call + with teleport_semaphore: + return requests.get( + f"{config_dict.get('server_url')}/api/http-teleport/upload-url", + params=params, + headers=_get_headers(), + timeout=25, + verify=config_dict.get('verify_cert', False), + proxies=config_dict['outgoing_proxy'] + ) + + try: + response = retry_on_429( + _request_upload_url, + max_retries=5, + operation_name="get_s3_upload_url" ) - get_s3_url.raise_for_status() - data: Optional[Dict[str, str]] = get_s3_url.json().get('data', None) - if data is not None: - return data.get('putUrl'), data.get('getUrl') - logger.warning("No data returned when requesting S3 upload URL") + if response and response.status_code == 200: + data: Optional[Dict[str, str]] = response.json().get('data') + if data: + return data.get('putUrl'), data.get('getUrl') + logger.warning("No data in S3 upload URL response") + else: + logger.warning( + f"Failed to get S3 URL: {response.status_code if response else 'None'}" + ) + return None, None + except requests.exceptions.RequestException as e: - logger.error("Network error getting S3 upload URL: %s", e) + logger.error(f"Network error getting S3 upload URL: {e}") except Exception as e: - logger.exception("Unexpected error getting S3 upload URL: %s", e) + logger.exception(f"Unexpected error getting S3 upload URL: {e}") + return None, None From 0a7d4957cd4820d708381205619b10c1736d45b6 Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Fri, 24 Oct 2025 16:58:20 +0530 Subject: [PATCH 04/11] Adding fix for latch timeout error --- web-agent/app/worker.py | 251 +--------------------------------------- 1 file changed, 3 insertions(+), 248 deletions(-) diff --git a/web-agent/app/worker.py b/web-agent/app/worker.py index 551e2a4..db25ad2 100644 --- a/web-agent/app/worker.py +++ b/web-agent/app/worker.py @@ -50,7 +50,6 @@ # throttling to 25 requests per seconds to avoid rate limit errors rate_limiter = None config_dict: dict = None -metrics_logger = None # CRITICAL: Semaphore to limit concurrent teleport endpoint calls # Shared across all greenlets in the worker to prevent "Too many concurrent requests" errors @@ -59,7 +58,7 @@ def main() -> None: - global config_dict, logger, rate_limiter, metrics_logger, teleport_semaphore + global config_dict, logger, rate_limiter, teleport_semaphore rate_limiter = RateLimiter(request_limit=25, time_window=15) # Initialize semaphore to limit concurrent teleport endpoint calls (max 2) @@ -69,22 +68,6 @@ def main() -> None: logger = setup_logger(agent_index, debug_mode) - # Initialize metrics logger - metrics_folder = os.path.join(armorcode_folder, 'metrics') - _createFolder(metrics_folder) - metrics_file = os.path.join(metrics_folder, f'metrics{agent_index}.json') - metrics_logger = BufferedMetricsLogger(metrics_file, flush_interval=10, buffer_size=1000) - - # Register shutdown handlers to flush metrics - def shutdown_handler(signum=None, frame=None): - logger.info("Shutting down, flushing remaining metrics...") - metrics_logger.shutdown() - logger.info("Metrics flushed and greenlet stopped. Exiting.") - - atexit.register(shutdown_handler) - signal.signal(signal.SIGTERM, shutdown_handler) - signal.signal(signal.SIGINT, shutdown_handler) - logger.info("Agent Started for url %s, verify %s, timeout %s, outgoing proxy %s, inward %s, uploadToAc %s", config_dict.get('server_url'), config_dict.get('verify_cert', False), config_dict.get('timeout', 10), config_dict['outgoing_proxy'], @@ -274,7 +257,8 @@ def process() -> None: rate_limiter.throttle() params = { - 'agentId' : config_dict['agent_id'] + 'agentId': config_dict['agent_id'], + 'agentVersion': __version__ } get_task_server_url = f"{config_dict.get('server_url')}/api/http-teleport/get-task" if len(config_dict.get('env_name', '')) > 0: @@ -296,21 +280,6 @@ def process() -> None: thread_backoff_time = min_backoff_time task: Optional[Dict[str, Any]] = get_task_response.json().get('data', None) - # Track get-task metric - metrics_logger.write_metric( - "http.request.duration_ms", - get_task_duration_ms, - tags={ - "task_id": task.get('taskId', 'none') if task else "none", - "operation": "get_task", - "url": _get_url_without_params(get_task_server_url), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "GET", - "status_code": "200", - "has_task": str(task is not None).lower() - } - ) - if task is None: logger.info("Received empty task") gevent.sleep(5) @@ -330,50 +299,13 @@ def spawn_when_available(pool, task_to_process): gevent.spawn(spawn_when_available, thread_pool, task) elif get_task_response.status_code == 429: - metrics_logger.write_metric( - "http.request.duration_ms", - get_task_duration_ms, - tags={ - "task_id": "none", - "operation": "get_task", - "url": _get_url_without_params(get_task_server_url), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "GET", - "status_code": "204", - "has_task": "false" - } - ) logger.info("No task available. Waiting...") gevent.sleep(5) elif get_task_response.status_code > 500: - metrics_logger.write_metric( - "http.request.duration_ms", - get_task_duration_ms, - tags={ - "task_id": "none", - "operation": "get_task", - "url": _get_url_without_params(get_task_server_url), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "GET", - "status_code": str(get_task_response.status_code) - } - ) logger.error("Getting 5XX error %d, increasing backoff time", get_task_response.status_code) gevent.sleep(thread_backoff_time) thread_backoff_time = min(max_backoff_time, thread_backoff_time * 2) else: - metrics_logger.write_metric( - "http.request.duration_ms", - get_task_duration_ms, - tags={ - "task_id": "none", - "operation": "get_task", - "url": _get_url_without_params(get_task_server_url), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "GET", - "status_code": str(get_task_response.status_code) - } - ) logger.error("Unexpected response: %d", get_task_response.status_code) gevent.sleep(5) @@ -403,50 +335,6 @@ def process_task_async(task: Dict[str, Any]) -> None: gevent.sleep(5) -def _log_update_metrics( - task: Dict[str, Any], - response: requests.Response, - duration_ms: float -) -> None: - """ - Log metrics for update_task operation. - - Separated for testability and reuse across refactored functions. - - Args: - task: Task dictionary with taskId - response: HTTP response - duration_ms: Request duration in milliseconds - """ - tags = { - "task_id": task['taskId'], - "operation": "upload_result", - "url": _get_url_without_params( - f"{config_dict.get('server_url')}/api/http-teleport/put-result" - ), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "POST", - "status_code": str(response.status_code), - "success": str(response.status_code == 200).lower() - } - - # Add error type for failed requests - if response.status_code == 429: - tags["error_type"] = "rate_limit" - elif response.status_code == 504: - tags["error_type"] = "timeout" - elif response.status_code >= 500: - tags["error_type"] = "server_error" - elif response.status_code >= 400: - tags["error_type"] = "client_error" - - metrics_logger.write_metric( - "http.request.duration_ms", - duration_ms, - tags=tags - ) - - def update_task(task: Optional[Dict[str, Any]]) -> None: """ Update task result to ArmorCode server. @@ -466,7 +354,6 @@ def update_task(task: Optional[Dict[str, Any]]) -> None: def _make_update_request() -> requests.Response: """Inner function for HTTP request with semaphore protection.""" rate_limiter.throttle() - update_start_time = time.time() # Acquire semaphore for the HTTP call with teleport_semaphore: @@ -479,10 +366,6 @@ def _make_update_request() -> requests.Response: proxies=config_dict['outgoing_proxy'] ) - # Metrics logging happens OUTSIDE semaphore (don't hold it longer than needed) - update_duration_ms = (time.time() - update_start_time) * 1000 - _log_update_metrics(task, response, update_duration_ms) - return response # Use retry wrapper @@ -651,21 +534,6 @@ def process_task(task: Dict[str, Any]) -> Optional[dict[str, Any]]: http_duration_ms = (time.time() - http_start_time) * 1000 logger.info("Response: %d", response.status_code) - # Track HTTP request to target URL - metrics_logger.write_metric( - "http.request.duration_ms", - http_duration_ms, - tags={ - "task_id": taskId, - "operation": "target_request", - "url": _get_url_without_params(url), - "domain": urlparse(url).netloc, - "method": method, - "status_code": str(response.status_code), - "success": str(response.status_code < 400).lower() - } - ) - data: Any = None if response.status_code == 200: # Check if the response is chunked @@ -707,16 +575,6 @@ def process_task(task: Dict[str, Any]) -> Optional[dict[str, Any]]: task['responseBase64'] = True task['output'] = base64_string - # Track inline upload size - metrics_logger.write_metric( - "upload.size_bytes", - file_size, - tags={ - "task_id": taskId, - "upload_type": "inline" - } - ) - return task return upload_response(temp_output_file.name, temp_output_file_zip.name, taskId, task) @@ -729,19 +587,6 @@ def process_task(task: Dict[str, Any]) -> Optional[dict[str, Any]]: task['statusCode'] = 500 task['output'] = f"Agent Side Error: Error: {str(e)}" finally: - # Track overall task processing duration - task_total_duration_ms = (time.time() - task_start_time) * 1000 - metrics_logger.write_metric( - "task.processing_duration_ms", - task_total_duration_ms, - tags={ - "task_id": taskId, - "method": method, - "domain": urlparse(url).netloc, - "http_status": str(task.get('statusCode', 'unknown')) - } - ) - temp_output_file.close() temp_output_file_zip.close() os.unlink(temp_output_file.name) @@ -806,7 +651,6 @@ def upload_response(temp_file, temp_file_zip, taskId: str, task: Dict[str, Any]) def _upload_result_file() -> requests.Response: """Inner function for result file upload.""" rate_limiter.throttle() - upload_start_time = time.time() # Acquire semaphore for teleport endpoint call with teleport_semaphore: @@ -819,34 +663,6 @@ def _upload_result_file() -> requests.Response: files=files ) - # Metrics logging happens OUTSIDE semaphore - upload_duration_ms = (time.time() - upload_start_time) * 1000 - - # Track file upload metrics - metrics_logger.write_metric( - "http.request.duration_ms", - upload_duration_ms, - tags={ - "task_id": taskId, - "operation": "upload_file", - "url": _get_url_without_params(f"{config_dict.get('server_url')}/api/http-teleport/upload-result"), - "domain": urlparse(config_dict.get('server_url')).netloc, - "method": "POST", - "status_code": str(response.status_code), - "success": str(response.status_code < 400).lower() - } - ) - - # Track upload size - metrics_logger.write_metric( - "upload.size_bytes", - file_size, - tags={ - "task_id": taskId, - "upload_type": "direct" - } - ) - return response # Use retry wrapper @@ -913,67 +729,6 @@ def throttle(self) -> None: gevent.sleep(0.5) -class BufferedMetricsLogger: - """Buffered metrics logger for DataDog. Flushes periodically to preserve timestamps. Uses gevent primitives.""" - - def __init__(self, metrics_file: str, flush_interval: int = 10, buffer_size: int = 1000): - Path(metrics_file).parent.mkdir(parents=True, exist_ok=True) - self.flush_interval = flush_interval - self.buffer_size = buffer_size - self.buffer: List[Dict] = [] - self.buffer_lock = Semaphore() - self.last_flush_time = time.time() - - self.file_logger = logging.getLogger('metrics_file') - self.file_logger.setLevel(logging.INFO) - self.file_logger.propagate = False - - handler = TimedRotatingFileHandler(metrics_file, when="midnight", interval=1, backupCount=7) - handler.setFormatter(logging.Formatter('%(message)s')) - self.file_logger.addHandler(handler) - - self.flush_greenlet = gevent.spawn(self._auto_flush_loop) - - def write_metric(self, metric_name: str, value: float, tags: Dict[str, str] = None): - timestamp_ms = int(time.time() * 1000) - metric_event = { - "@timestamp": timestamp_ms, - "metric_name": metric_name, - "value": value, - "tags": tags or {} - } - with self.buffer_lock: - self.buffer.append(metric_event) - if len(self.buffer) >= self.buffer_size: - self._flush() - - def _flush(self): - if not self.buffer: - return - for event in self.buffer: - self.file_logger.info(json.dumps(event)) - self.buffer.clear() - self.last_flush_time = time.time() - - def _auto_flush_loop(self): - while True: - gevent.sleep(self.flush_interval) - with self.buffer_lock: - if self.buffer and (time.time() - self.last_flush_time) >= self.flush_interval: - self._flush() - - def flush_now(self): - """Flush all buffered metrics immediately.""" - with self.buffer_lock: - self._flush() - - def shutdown(self): - """Flush remaining metrics and stop the flush greenlet.""" - self.flush_now() - if self.flush_greenlet and not self.flush_greenlet.dead: - self.flush_greenlet.kill() - - def _get_url_without_params(url: str) -> str: """Remove query parameters from URL.""" parsed = urlparse(url) From 4083ef449e1b25a1580606520b8b9030fc2e7dac Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Fri, 24 Oct 2025 17:26:09 +0530 Subject: [PATCH 05/11] Adding fix for latch timeout error --- web-agent/app/worker.py | 234 +++++----------------------------------- 1 file changed, 27 insertions(+), 207 deletions(-) diff --git a/web-agent/app/worker.py b/web-agent/app/worker.py index db25ad2..595aaff 100644 --- a/web-agent/app/worker.py +++ b/web-agent/app/worker.py @@ -51,9 +51,7 @@ rate_limiter = None config_dict: dict = None -# CRITICAL: Semaphore to limit concurrent teleport endpoint calls -# Shared across all greenlets in the worker to prevent "Too many concurrent requests" errors -# Applies to ALL teleport endpoints: get-task, put-result, upload-logs, upload-result, upload-url +# Limit concurrent teleport calls (max 2 per worker) teleport_semaphore: Optional[Semaphore] = None @@ -61,8 +59,7 @@ def main() -> None: global config_dict, logger, rate_limiter, teleport_semaphore rate_limiter = RateLimiter(request_limit=25, time_window=15) - # Initialize semaphore to limit concurrent teleport endpoint calls (max 2) - teleport_semaphore = Semaphore(2) + teleport_semaphore = Semaphore(2) # Max 2 concurrent teleport calls parser = argparse.ArgumentParser() config_dict, agent_index, debug_mode = get_initial_config(parser) @@ -84,7 +81,6 @@ def _get_task_from_server(headers: Dict[str, str], params: Dict[str, str], get_t """Execute get-task request in a separate greenlet to prevent LoopExit.""" get_task_start_time = time.time() - # Acquire semaphore for teleport endpoint call with teleport_semaphore: get_task_response: requests.Response = requests.get( get_task_server_url, @@ -98,24 +94,8 @@ def _get_task_from_server(headers: Dict[str, str], params: Dict[str, str], get_t return get_task_response, get_task_duration_ms -# ============================================================================ -# Retry Infrastructure - Smart 429 Handling with Semaphore -# ============================================================================ - def is_concurrent_limit_error(response: requests.Response) -> bool: - """ - Check if 429 response is due to concurrent request limit. - - This distinguishes between: - - Standard rate limiting (use header delay) - - Concurrent request limit (use random delay) - - Args: - response: HTTP response object - - Returns: - True if response indicates concurrent limit error, False otherwise - """ + """Check if 429 is due to concurrent request limit vs rate limit.""" if response.status_code == 429: try: return "Too many concurrent requests" in response.text @@ -126,55 +106,26 @@ def is_concurrent_limit_error(response: requests.Response) -> bool: def get_retry_delay(response: requests.Response, default_delay: int = 2) -> float: - """ - Extract retry delay from response, detecting concurrent errors. - - Priority order: - 1. Check for concurrent error → random delay (0-10s) - 2. Check retry-after header → use header value - 3. Use default delay - - Args: - response: HTTP response object - default_delay: Fallback delay in seconds (default: 2) - - Returns: - Delay in seconds (validated, bounded) - """ - # Priority 1: Check for concurrent error + """Get retry delay: concurrent error (0-10s) > header > default.""" if is_concurrent_limit_error(response): delay = random.uniform(0, 10) - logger.info(f"Concurrent limit error detected, using random delay: {delay:.2f}s") + logger.info(f"Concurrent limit error, random delay: {delay:.2f}s") return delay - # Priority 2: Check retry-after header retry_after = response.headers.get('X-Rate-Limit-Retry-After-Seconds') - if retry_after: try: delay = int(retry_after) - - # Validate: must be positive if delay < 0: - logger.warning( - f"Negative retry delay {delay}s in header, using default {default_delay}s" - ) + logger.warning(f"Negative retry delay {delay}s, using default {default_delay}s") return default_delay - - # Validate: cap at 5 minutes if delay > 300: - logger.warning( - f"Excessive retry delay {delay}s in header, capping at 300s" - ) + logger.warning(f"Excessive retry delay {delay}s, capping at 300s") return 300 - - logger.info(f"Using retry-after header delay: {delay}s") + logger.info(f"Using header delay: {delay}s") return delay - except ValueError: - logger.warning( - f"Invalid retry delay '{retry_after}' in header, using default {default_delay}s" - ) + logger.warning(f"Invalid retry delay '{retry_after}', using default {default_delay}s") return default_delay @@ -184,58 +135,27 @@ def retry_on_429( max_retries: int = 5, operation_name: str = "request" ) -> Optional[requests.Response]: - """ - Retry a function on 429 rate limit errors. - - Uses X-Rate-Limit-Retry-After-Seconds header if available, - or random delay (0-10s) for concurrent errors, - otherwise uses default 2-second delay. - - CRITICAL: Uses gevent.sleep() not time.sleep() to allow other greenlets - to run during retry delays, preventing hub from becoming empty. - - Args: - func: Function to call (must return requests.Response) - max_retries: Maximum retry attempts (default: 5) - operation_name: Name for logging - - Returns: - Response object, or None if unexpected error - - Raises: - requests.exceptions.RequestException: On network errors - """ + """Retry function on 429 errors. Uses gevent.sleep() for greenlet safety.""" for attempt in range(max_retries + 1): try: response = func() - # Success or non-429 error → return immediately if response.status_code != 429: return response - # 429 but retries exhausted → return last response if attempt >= max_retries: - logger.error( - f"{operation_name} failed after {max_retries} retries due to rate limiting" - ) + logger.error(f"{operation_name} failed after {max_retries} retries due to rate limiting") return response - # 429 with retries remaining → sleep and retry delay = get_retry_delay(response) error_type = "concurrent limit" if is_concurrent_limit_error(response) else "rate limit" - logger.warning( - f"{operation_name} {error_type} hit " - f"(attempt {attempt + 1}/{max_retries + 1}), " - f"retrying in {delay:.2f}s" - ) + logger.warning(f"{operation_name} {error_type} (attempt {attempt + 1}/{max_retries + 1}), retry in {delay:.2f}s") gevent.sleep(delay) - continue except requests.exceptions.RequestException as e: logger.error(f"{operation_name} request error: {e}") raise - # Should never reach here logger.error(f"{operation_name} unexpected loop exit") return None @@ -292,12 +212,8 @@ def process() -> None: if thread_pool is None: process_task_async(task) else: - # Use helper greenlet to avoid blocking main loop (prevents LoopExit deadlock) - def spawn_when_available(pool, task_to_process): - pool.wait_available() - pool.spawn(process_task_async, task_to_process) - - gevent.spawn(spawn_when_available, thread_pool, task) + thread_pool.wait_available() + thread_pool.spawn(process_task_async, task) elif get_task_response.status_code == 429: logger.info("No task available. Waiting...") gevent.sleep(5) @@ -336,26 +252,12 @@ def process_task_async(task: Dict[str, Any]) -> None: def update_task(task: Optional[Dict[str, Any]]) -> None: - """ - Update task result to ArmorCode server. - - Retries on 429 rate limit errors up to 5 times, - respecting X-Rate-Limit-Retry-After-Seconds header or using - random delay (0-10s) for concurrent errors. - - Uses global teleport_semaphore to limit concurrent calls. - - Args: - task: Task dictionary with result data - """ + """Update task result with 429 retry and semaphore protection.""" if task is None: return def _make_update_request() -> requests.Response: - """Inner function for HTTP request with semaphore protection.""" rate_limiter.throttle() - - # Acquire semaphore for the HTTP call with teleport_semaphore: response = requests.post( f"{config_dict.get('server_url')}/api/http-teleport/put-result", @@ -368,15 +270,9 @@ def _make_update_request() -> requests.Response: return response - # Use retry wrapper try: - response = retry_on_429( - _make_update_request, - max_retries=5, - operation_name=f"update_task[{task['taskId']}]" - ) + response = retry_on_429(_make_update_request, max_retries=5, operation_name=f"update_task[{task['taskId']}]") - # Handle response if response and response.status_code == 200: logger.info(f"Task {task['taskId']} updated successfully. Response: {response.text}") elif response and response.status_code == 504: @@ -388,7 +284,6 @@ def _make_update_request() -> requests.Response: except requests.exceptions.RequestException as e: logger.error(f"Network error updating task {task['taskId']}: {e}") - # Note: Network errors are propagated from retry_on_429, no need to retry again here def _get_headers() -> Dict[str, str]: @@ -400,32 +295,14 @@ def _get_headers() -> Dict[str, str]: def check_for_logs_fetch(url, task, temp_output_file_zip): - """ - Check if this is a logs fetch request and upload logs if so. - - Includes retry on 429 rate limit errors with semaphore protection. - - Args: - url: Request URL - task: Task dictionary - temp_output_file_zip: Temporary file for zipped logs - - Returns: - True if logs were uploaded, False otherwise - """ + """Upload agent logs if this is a fetch-logs request.""" if 'agent/fetch-logs' in url and 'fetchLogs' in task.get('taskId'): try: - # Zip the logs_folder shutil.make_archive(temp_output_file_zip.name[:-4], 'zip', log_folder) - - # Update the task with the zip file information task['responseZipped'] = True logger.info(f"Logs zipped successfully: {temp_output_file_zip.name}") - # Prepare upload data - headers: Dict[str, str] = { - "Authorization": f"Bearer {config_dict['api_key']}", - } + headers: Dict[str, str] = {"Authorization": f"Bearer {config_dict['api_key']}"} task_json = json.dumps(task) files = { "file": (temp_output_file_zip.name, open(temp_output_file_zip.name, "rb"), "application/zip"), @@ -436,12 +313,8 @@ def check_for_logs_fetch(url, task, temp_output_file_zip): if len(config_dict.get('env_name', '')) > 0: upload_logs_url += f"?envName={config_dict.get('env_name')}" - # Inner function for HTTP call with semaphore protection def _upload_logs() -> requests.Response: - """Inner function for logs upload request.""" rate_limiter.throttle() - - # Acquire semaphore for teleport endpoint call with teleport_semaphore: return requests.post( upload_logs_url, @@ -452,22 +325,14 @@ def _upload_logs() -> requests.Response: files=files ) - # Use retry wrapper - response = retry_on_429( - _upload_logs, - max_retries=5, - operation_name="upload_logs" - ) + response = retry_on_429(_upload_logs, max_retries=5, operation_name="upload_logs") if response and response.status_code == 200: logger.info("Logs uploaded successfully") return True else: - logger.error( - f"Failed to upload logs: code={response.status_code if response else 'None'}, " - f"error={response.content if response else 'None'}" - ) - return True # Still return True to maintain existing behavior + logger.error(f"Failed to upload logs: code={response.status_code if response else 'None'}, error={response.content if response else 'None'}") + return True except Exception as e: logger.error(f"Error zipping logs: {str(e)}") @@ -616,43 +481,23 @@ def zip_response(temp_file, temp_file_zip) -> bool: def upload_response(temp_file, temp_file_zip, taskId: str, task: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """ - Upload task response to ArmorCode server. - - Includes retry on 429 rate limit errors with semaphore protection. - - Args: - temp_file: Temporary file with response - temp_file_zip: Temporary file for zipped response - taskId: Task ID - task: Task dictionary - - Returns: - None if uploaded to ArmorCode, task dict if using S3 - """ + """Upload task response with 429 retry and semaphore protection.""" if config_dict.get('upload_to_ac', True): try: success = zip_response(temp_file, temp_file_zip) file_path = temp_file_zip if success else temp_file task['responseZipped'] = success file_name = f"{taskId}_{uuid.uuid4().hex}.{'zip' if success else 'txt'}" - file_size = os.path.getsize(file_path) - headers: Dict[str, str] = { - "Authorization": f"Bearer {config_dict['api_key']}", - } + headers: Dict[str, str] = {"Authorization": f"Bearer {config_dict['api_key']}"} task_json = json.dumps(task) files = { "file": (file_name, open(file_path, "rb"), f"{'application/zip' if success else 'text/plain'}"), "task": (None, task_json, "application/json") } - # Inner function for HTTP call with semaphore protection def _upload_result_file() -> requests.Response: - """Inner function for result file upload.""" rate_limiter.throttle() - - # Acquire semaphore for teleport endpoint call with teleport_semaphore: response = requests.post( f"{config_dict.get('server_url')}/api/http-teleport/upload-result", @@ -662,15 +507,9 @@ def _upload_result_file() -> requests.Response: proxies=config_dict['outgoing_proxy'], files=files ) - return response - # Use retry wrapper - upload_result = retry_on_429( - _upload_result_file, - max_retries=5, - operation_name=f"upload_result[{taskId}]" - ) + upload_result = retry_on_429(_upload_result_file, max_retries=5, operation_name=f"upload_result[{taskId}]") if upload_result: logger.info("Upload result response: %s, code: %d", upload_result.text, upload_result.status_code) @@ -770,24 +609,11 @@ def _createFolder(folder_path: str) -> None: def get_s3_upload_url(taskId: str) -> Tuple[Optional[str], Optional[str]]: - """ - Get S3 upload URL from ArmorCode server. - - Retries on 429 rate limit errors up to 5 times with semaphore protection. - - Args: - taskId: Task ID for filename generation - - Returns: - Tuple of (putUrl, getUrl) or (None, None) on error - """ + """Get S3 upload URL with 429 retry and semaphore protection.""" params: Dict[str, str] = {'fileName': f"{taskId}{uuid.uuid4().hex}"} def _request_upload_url() -> requests.Response: - """Inner function for S3 URL request with semaphore protection.""" rate_limiter.throttle() - - # Acquire semaphore for teleport endpoint call with teleport_semaphore: return requests.get( f"{config_dict.get('server_url')}/api/http-teleport/upload-url", @@ -799,11 +625,7 @@ def _request_upload_url() -> requests.Response: ) try: - response = retry_on_429( - _request_upload_url, - max_retries=5, - operation_name="get_s3_upload_url" - ) + response = retry_on_429(_request_upload_url, max_retries=5, operation_name="get_s3_upload_url") if response and response.status_code == 200: data: Optional[Dict[str, str]] = response.json().get('data') @@ -811,9 +633,7 @@ def _request_upload_url() -> requests.Response: return data.get('putUrl'), data.get('getUrl') logger.warning("No data in S3 upload URL response") else: - logger.warning( - f"Failed to get S3 URL: {response.status_code if response else 'None'}" - ) + logger.warning(f"Failed to get S3 URL: {response.status_code if response else 'None'}") return None, None From 1cf3f3cd1e38392a508e9a2913cc95ef37015259 Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Fri, 24 Oct 2025 17:26:28 +0530 Subject: [PATCH 06/11] Adding fix for latch timeout error --- web-agent/entrypoint.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web-agent/entrypoint.sh b/web-agent/entrypoint.sh index 35ede4c..6d48219 100644 --- a/web-agent/entrypoint.sh +++ b/web-agent/entrypoint.sh @@ -1,3 +1,3 @@ #!/bin/sh # Pass all arguments to the Python script and redirect output python -/usr/src/venv/bin/python3 worker.py "$@" > /tmp/armorcode/console.log 2>&1 \ No newline at end of file +/usr/src/venv/bin/python3 -W ignore worker.py "$@" > /dev/null 2> /tmp/armorcode/console.log From a0b685cd58451d351048010fe8b7a8b330916027 Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Fri, 24 Oct 2025 17:38:29 +0530 Subject: [PATCH 07/11] Adding fix for latch timeout error --- web-agent/app/worker.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/web-agent/app/worker.py b/web-agent/app/worker.py index 595aaff..19e5abf 100644 --- a/web-agent/app/worker.py +++ b/web-agent/app/worker.py @@ -164,12 +164,6 @@ def process() -> None: headers: Dict[str, str] = _get_headers() thread_backoff_time: int = min_backoff_time - # Note: Keepalive greenlet not needed because: - # 1. Main loop waits with .get(timeout=30) which registers a timer - # 2. Flush greenlet has gevent.sleep(10) which registers a timer - # 3. These ensure hub always has pending > 0 - - # thread_pool = Pool(config_dict['thread_pool_size']) while True: try: # Get the next task for the agent From e0368b4514f3a3e32b3f6efded49a93b954c1d14 Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Fri, 24 Oct 2025 17:58:00 +0530 Subject: [PATCH 08/11] Increased default pool size --- web-agent/app/worker.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/web-agent/app/worker.py b/web-agent/app/worker.py index 19e5abf..c885e1e 100644 --- a/web-agent/app/worker.py +++ b/web-agent/app/worker.py @@ -701,8 +701,8 @@ def update_agent_config(global_config: dict[str, Any]) -> None: if global_config.get("verifyCert", False): config_dict['verify_cert'] = global_config.get("verifyCert", False) - if global_config.get("threadPoolSize", 5): - config_dict['thread_pool_size'] = global_config.get("poolSize", 5) + if global_config.get("threadPoolSize", 25): + config_dict['thread_pool_size'] = global_config.get("poolSize", 25) config_dict['thread_pool'] = Pool(config_dict['thread_pool_size']) if global_config.get("uploadToAC") is not None: config_dict['upload_to_ac'] = global_config.get("uploadToAC", True) @@ -735,7 +735,7 @@ def get_initial_config(parser) -> tuple[dict[str, Union[Union[bool, None, str, i "outgoing_proxy": None, # Proxy for outgoing requests (e.g., to ArmorCode) "upload_to_ac": False, # Whether to upload to ArmorCode "env_name": None, # Environment name (Optional[str]) - "thread_pool_size": 5 # Connection thread_pool size + "thread_pool_size": 25 # Connection thread_pool size } parser.add_argument("--serverUrl", required=False, help="Server Url") parser.add_argument("--apiKey", required=False, help="Api Key") @@ -750,7 +750,7 @@ def get_initial_config(parser) -> tuple[dict[str, Union[Union[bool, None, str, i parser.add_argument("--outgoingProxyHttps", required=False, help="Pass outgoing Https proxy", default=None) parser.add_argument("--outgoingProxyHttp", required=False, help="Pass outgoing Http proxy", default=None) - parser.add_argument("--poolSize", required=False, help="Multi threading thread_pool size", default=5) + parser.add_argument("--poolSize", required=False, help="Multi threading thread_pool size", default=25) parser.add_argument("--rateLimitPerMin", required=False, help="Rate limit per min", default=250) parser.add_argument( @@ -828,7 +828,7 @@ def get_initial_config(parser) -> tuple[dict[str, Union[Union[bool, None, str, i config['server_url'] = os.getenv('server_url') if config.get('api_key', None) is None: config['api_key'] = os.getenv("api_key") - config['thread_pool'] = Pool(config.get('thread_pool_size', 5)) + config['thread_pool'] = Pool(config.get('thread_pool_size', 25)) return config, agent_index, debug_mode From 5f3e10cfeca5726f38a91640fb4455a6a8e6a572 Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Mon, 27 Oct 2025 09:40:17 +0530 Subject: [PATCH 09/11] Increased default pool size --- web-agent/app/worker.py | 184 ++++++++++++++++++++++++---------------- 1 file changed, 111 insertions(+), 73 deletions(-) diff --git a/web-agent/app/worker.py b/web-agent/app/worker.py index c885e1e..28b95f4 100644 --- a/web-agent/app/worker.py +++ b/web-agent/app/worker.py @@ -2,7 +2,7 @@ from gevent import monkey; monkey.patch_all() -import gevent +from gevent import Timeout import argparse import atexit import base64 @@ -30,7 +30,7 @@ from gevent.pool import Pool # Global variables -__version__ = "1.1.8" +__version__ = "1.1.10" letters: str = string.ascii_letters rand_string: str = ''.join(secrets.choice(letters) for _ in range(10)) @@ -54,6 +54,9 @@ # Limit concurrent teleport calls (max 2 per worker) teleport_semaphore: Optional[Semaphore] = None +# Timeout for teleport operations (prevents semaphore deadlock if operation hangs) +TELEPORT_TIMEOUT = int(os.getenv('TELEPORT_TIMEOUT_SECONDS', '60')) + def main() -> None: global config_dict, logger, rate_limiter, teleport_semaphore @@ -81,14 +84,19 @@ def _get_task_from_server(headers: Dict[str, str], params: Dict[str, str], get_t """Execute get-task request in a separate greenlet to prevent LoopExit.""" get_task_start_time = time.time() - with teleport_semaphore: - get_task_response: requests.Response = requests.get( - get_task_server_url, - headers=headers, - timeout=25, verify=config_dict.get('verify_cert', False), - proxies=config_dict['outgoing_proxy'], - params=params - ) + try: + with Timeout(TELEPORT_TIMEOUT): + with teleport_semaphore: + get_task_response: requests.Response = requests.get( + get_task_server_url, + headers=headers, + timeout=25, verify=config_dict.get('verify_cert', False), + proxies=config_dict['outgoing_proxy'], + params=params + ) + except Timeout: + logger.error(f"Get-task timed out after {TELEPORT_TIMEOUT}s, semaphore released") + raise get_task_duration_ms = (time.time() - get_task_start_time) * 1000 return get_task_response, get_task_duration_ms @@ -119,9 +127,9 @@ def get_retry_delay(response: requests.Response, default_delay: int = 2) -> floa if delay < 0: logger.warning(f"Negative retry delay {delay}s, using default {default_delay}s") return default_delay - if delay > 300: - logger.warning(f"Excessive retry delay {delay}s, capping at 300s") - return 300 + if delay > 60: + logger.warning(f"Excessive retry delay {delay}s, capping at 60s") + return 60 logger.info(f"Using header delay: {delay}s") return delay except ValueError: @@ -130,25 +138,31 @@ def get_retry_delay(response: requests.Response, default_delay: int = 2) -> floa return default_delay -def retry_on_429( +def retry_request( func: Callable[[], requests.Response], max_retries: int = 5, operation_name: str = "request" ) -> Optional[requests.Response]: - """Retry function on 429 errors. Uses gevent.sleep() for greenlet safety.""" + """Retry on 429 (rate limit) or 504 (timeout). 429 uses smart delay, 504 uses exponential backoff.""" for attempt in range(max_retries + 1): try: response = func() - if response.status_code != 429: + if response.status_code not in (429, 504): return response if attempt >= max_retries: - logger.error(f"{operation_name} failed after {max_retries} retries due to rate limiting") + logger.error(f"{operation_name} failed after {max_retries} retries") return response - delay = get_retry_delay(response) - error_type = "concurrent limit" if is_concurrent_limit_error(response) else "rate limit" + # Calculate delay based on error type + if response.status_code == 429: + delay = get_retry_delay(response) + error_type = "concurrent limit" if is_concurrent_limit_error(response) else "rate limit" + else: # 504 + delay = min(1 * (2 ** attempt), 30) # Exponential: 1s, 2s, 4s, 8s, 16s, 30s + error_type = "gateway timeout" + logger.warning(f"{operation_name} {error_type} (attempt {attempt + 1}/{max_retries + 1}), retry in {delay:.2f}s") gevent.sleep(delay) @@ -160,6 +174,12 @@ def retry_on_429( return None +def delayed_retry(delay_seconds: int) -> None: + """Wait by spawning timer greenlet. Keeps hub alive during main loop delays.""" + timer = gevent.spawn(lambda: gevent.sleep(delay_seconds)) + timer.join() # Wait for timer, but timer greenlet keeps hub active + + def process() -> None: headers: Dict[str, str] = _get_headers() thread_backoff_time: int = min_backoff_time @@ -187,7 +207,7 @@ def process() -> None: get_task_response, get_task_duration_ms = get_task_greenlet.get(timeout=30) except gevent.Timeout: logger.error("Get-task request timed out after 30 seconds") - gevent.sleep(5) + delayed_retry(5) continue if get_task_response.status_code == 200: @@ -196,7 +216,7 @@ def process() -> None: if task is None: logger.info("Received empty task") - gevent.sleep(5) + delayed_retry(5) continue logger.info("Received task: %s", task['taskId']) @@ -210,24 +230,24 @@ def process() -> None: thread_pool.spawn(process_task_async, task) elif get_task_response.status_code == 429: logger.info("No task available. Waiting...") - gevent.sleep(5) + delayed_retry(5) elif get_task_response.status_code > 500: logger.error("Getting 5XX error %d, increasing backoff time", get_task_response.status_code) - gevent.sleep(thread_backoff_time) + delayed_retry(thread_backoff_time) thread_backoff_time = min(max_backoff_time, thread_backoff_time * 2) else: logger.error("Unexpected response: %d", get_task_response.status_code) - gevent.sleep(5) + delayed_retry(5) except requests.exceptions.RequestException as e: logger.error("Network error: %s", e) - gevent.sleep(10) # Wait longer on network errors + delayed_retry(10) # Wait longer on network errors except gevent.hub.LoopExit as e: logger.error("Getting LoopExit Error, resetting the thread pool") config_dict['thread_pool'] = Pool(config_dict['thread_pool_size']) except Exception as e: logger.error("Unexpected error while processing: %s", e, exc_info=True) - gevent.sleep(5) + delayed_retry(5) def process_task_async(task: Dict[str, Any]) -> None: @@ -242,7 +262,6 @@ def process_task_async(task: Dict[str, Any]) -> None: except Exception as e: logger.info("Unexpected error while processing task id: %s, method: %s url: %s, error: %s", taskId, method, url, e) - gevent.sleep(5) def update_task(task: Optional[Dict[str, Any]]) -> None: @@ -252,20 +271,24 @@ def update_task(task: Optional[Dict[str, Any]]) -> None: def _make_update_request() -> requests.Response: rate_limiter.throttle() - with teleport_semaphore: - response = requests.post( - f"{config_dict.get('server_url')}/api/http-teleport/put-result", - headers=_get_headers(), - json=task, - timeout=30, - verify=config_dict.get('verify_cert'), - proxies=config_dict['outgoing_proxy'] - ) - - return response + try: + with Timeout(TELEPORT_TIMEOUT): + with teleport_semaphore: + response = requests.post( + f"{config_dict.get('server_url')}/api/http-teleport/put-result", + headers=_get_headers(), + json=task, + timeout=30, + verify=config_dict.get('verify_cert'), + proxies=config_dict['outgoing_proxy'] + ) + return response + except Timeout: + logger.error(f"Put-result timed out after {TELEPORT_TIMEOUT}s, semaphore released") + raise try: - response = retry_on_429(_make_update_request, max_retries=5, operation_name=f"update_task[{task['taskId']}]") + response = retry_request(_make_update_request, max_retries=5, operation_name=f"update_task[{task['taskId']}]") if response and response.status_code == 200: logger.info(f"Task {task['taskId']} updated successfully. Response: {response.text}") @@ -309,17 +332,22 @@ def check_for_logs_fetch(url, task, temp_output_file_zip): def _upload_logs() -> requests.Response: rate_limiter.throttle() - with teleport_semaphore: - return requests.post( - upload_logs_url, - headers=headers, - timeout=300, - verify=config_dict.get('verify_cert', False), - proxies=config_dict['outgoing_proxy'], - files=files - ) - - response = retry_on_429(_upload_logs, max_retries=5, operation_name="upload_logs") + try: + with Timeout(TELEPORT_TIMEOUT): + with teleport_semaphore: + return requests.post( + upload_logs_url, + headers=headers, + timeout=300, + verify=config_dict.get('verify_cert', False), + proxies=config_dict['outgoing_proxy'], + files=files + ) + except Timeout: + logger.error(f"Upload-logs timed out after {TELEPORT_TIMEOUT}s, semaphore released") + raise + + response = retry_request(_upload_logs, max_retries=5, operation_name="upload_logs") if response and response.status_code == 200: logger.info("Logs uploaded successfully") @@ -492,18 +520,23 @@ def upload_response(temp_file, temp_file_zip, taskId: str, task: Dict[str, Any]) def _upload_result_file() -> requests.Response: rate_limiter.throttle() - with teleport_semaphore: - response = requests.post( - f"{config_dict.get('server_url')}/api/http-teleport/upload-result", - headers=headers, - timeout=300, - verify=config_dict.get('verify_cert', False), - proxies=config_dict['outgoing_proxy'], - files=files - ) - return response - - upload_result = retry_on_429(_upload_result_file, max_retries=5, operation_name=f"upload_result[{taskId}]") + try: + with Timeout(TELEPORT_TIMEOUT): + with teleport_semaphore: + response = requests.post( + f"{config_dict.get('server_url')}/api/http-teleport/upload-result", + headers=headers, + timeout=300, + verify=config_dict.get('verify_cert', False), + proxies=config_dict['outgoing_proxy'], + files=files + ) + return response + except Timeout: + logger.error(f"Upload-result timed out after {TELEPORT_TIMEOUT}s, semaphore released") + raise + + upload_result = retry_request(_upload_result_file, max_retries=5, operation_name=f"upload_result[{taskId}]") if upload_result: logger.info("Upload result response: %s, code: %d", upload_result.text, upload_result.status_code) @@ -608,18 +641,23 @@ def get_s3_upload_url(taskId: str) -> Tuple[Optional[str], Optional[str]]: def _request_upload_url() -> requests.Response: rate_limiter.throttle() - with teleport_semaphore: - return requests.get( - f"{config_dict.get('server_url')}/api/http-teleport/upload-url", - params=params, - headers=_get_headers(), - timeout=25, - verify=config_dict.get('verify_cert', False), - proxies=config_dict['outgoing_proxy'] - ) + try: + with Timeout(TELEPORT_TIMEOUT): + with teleport_semaphore: + return requests.get( + f"{config_dict.get('server_url')}/api/http-teleport/upload-url", + params=params, + headers=_get_headers(), + timeout=25, + verify=config_dict.get('verify_cert', False), + proxies=config_dict['outgoing_proxy'] + ) + except Timeout: + logger.error(f"Get-s3-upload-url timed out after {TELEPORT_TIMEOUT}s, semaphore released") + raise try: - response = retry_on_429(_request_upload_url, max_retries=5, operation_name="get_s3_upload_url") + response = retry_request(_request_upload_url, max_retries=5, operation_name="get_s3_upload_url") if response and response.status_code == 200: data: Optional[Dict[str, str]] = response.json().get('data') @@ -702,7 +740,7 @@ def update_agent_config(global_config: dict[str, Any]) -> None: if global_config.get("verifyCert", False): config_dict['verify_cert'] = global_config.get("verifyCert", False) if global_config.get("threadPoolSize", 25): - config_dict['thread_pool_size'] = global_config.get("poolSize", 25) + config_dict['thread_pool_size'] = global_config.get("threadPoolSize", 25) config_dict['thread_pool'] = Pool(config_dict['thread_pool_size']) if global_config.get("uploadToAC") is not None: config_dict['upload_to_ac'] = global_config.get("uploadToAC", True) From 395442c25f9d240579170f81c8ab535fc1d8c6e5 Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Mon, 27 Oct 2025 09:50:19 +0530 Subject: [PATCH 10/11] removing wait if no tasks came --- web-agent/app/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/web-agent/app/worker.py b/web-agent/app/worker.py index 28b95f4..5c9bf89 100644 --- a/web-agent/app/worker.py +++ b/web-agent/app/worker.py @@ -216,7 +216,6 @@ def process() -> None: if task is None: logger.info("Received empty task") - delayed_retry(5) continue logger.info("Received task: %s", task['taskId']) From 203599b8ba04455abfae81ab0294a9abf41763b2 Mon Sep 17 00:00:00 2001 From: dmeenaarmorcode Date: Mon, 27 Oct 2025 10:57:32 +0530 Subject: [PATCH 11/11] Correcting timeout value --- web-agent/app/worker.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/web-agent/app/worker.py b/web-agent/app/worker.py index 5c9bf89..672bbcb 100644 --- a/web-agent/app/worker.py +++ b/web-agent/app/worker.py @@ -57,6 +57,9 @@ # Timeout for teleport operations (prevents semaphore deadlock if operation hangs) TELEPORT_TIMEOUT = int(os.getenv('TELEPORT_TIMEOUT_SECONDS', '60')) +# HTTP request timeout for teleport endpoints +TELEPORT_REQUEST_TIMEOUT = 30 + def main() -> None: global config_dict, logger, rate_limiter, teleport_semaphore @@ -204,7 +207,7 @@ def process() -> None: get_task_greenlet = gevent.spawn(_get_task_from_server, headers, params, get_task_server_url) try: - get_task_response, get_task_duration_ms = get_task_greenlet.get(timeout=30) + get_task_response, get_task_duration_ms = get_task_greenlet.get(timeout=TELEPORT_REQUEST_TIMEOUT) except gevent.Timeout: logger.error("Get-task request timed out after 30 seconds") delayed_retry(5) @@ -277,7 +280,7 @@ def _make_update_request() -> requests.Response: f"{config_dict.get('server_url')}/api/http-teleport/put-result", headers=_get_headers(), json=task, - timeout=30, + timeout=TELEPORT_REQUEST_TIMEOUT, verify=config_dict.get('verify_cert'), proxies=config_dict['outgoing_proxy'] ) @@ -337,7 +340,7 @@ def _upload_logs() -> requests.Response: return requests.post( upload_logs_url, headers=headers, - timeout=300, + timeout=TELEPORT_REQUEST_TIMEOUT, verify=config_dict.get('verify_cert', False), proxies=config_dict['outgoing_proxy'], files=files @@ -525,7 +528,7 @@ def _upload_result_file() -> requests.Response: response = requests.post( f"{config_dict.get('server_url')}/api/http-teleport/upload-result", headers=headers, - timeout=300, + timeout=TELEPORT_REQUEST_TIMEOUT, verify=config_dict.get('verify_cert', False), proxies=config_dict['outgoing_proxy'], files=files