Skip to content
155 changes: 153 additions & 2 deletions nodescraper/base/inbanddataplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,168 @@
# SOFTWARE.
#
###############################################################################
from typing import Generic
import json
import os
from pathlib import Path
from typing import Any, Generic, Optional

from nodescraper.connection.inband import InBandConnectionManager, SSHConnectionParams
from nodescraper.generictypes import TAnalyzeArg, TCollectArg, TDataModel
from nodescraper.interfaces import DataPlugin
from nodescraper.models import DataModel
from nodescraper.utils import pascal_to_snake


class InBandDataPlugin(
DataPlugin[InBandConnectionManager, SSHConnectionParams, TDataModel, TCollectArg, TAnalyzeArg],
Generic[TDataModel, TCollectArg, TAnalyzeArg],
):
"""Base class for in band plugins"""
"""Base class for in band plugins.

Supports loading and comparing plugin data from scraper run directories
(e.g. for compare-runs). Subclasses get find_datamodel_path_in_run,
load_datamodel_from_path, get_extracted_errors, and load_run_data.
"""

CONNECTION_TYPE = InBandConnectionManager

@classmethod
def find_datamodel_path_in_run(cls, run_path: str) -> Optional[str]:
"""Find this plugin's collector datamodel file under a scraper run directory.

Looks for <run_path>/<plugin_snake>/<collector_snake>/ with result.json
whose parent matches this plugin, then a datamodel file (datamodel.json,
<data_model_name>.json, or .log).

Args:
run_path: Path to a scraper log run directory (e.g. scraper_logs_*).

Returns:
Absolute path to the datamodel file, or None if not found.
"""
run_path = os.path.abspath(run_path)
if not os.path.isdir(run_path):
return None
collector_cls = getattr(cls, "COLLECTOR", None)
data_model_cls = getattr(cls, "DATA_MODEL", None)
if not collector_cls or not data_model_cls:
return None
collector_dir = os.path.join(
run_path,
pascal_to_snake(cls.__name__),
pascal_to_snake(collector_cls.__name__),
)
if not os.path.isdir(collector_dir):
return None
result_path = os.path.join(collector_dir, "result.json")
if not os.path.isfile(result_path):
return None
try:
res_payload = json.loads(Path(result_path).read_text(encoding="utf-8"))
if res_payload.get("parent") != cls.__name__:
return None
except (json.JSONDecodeError, OSError):
return None
want_json = data_model_cls.__name__.lower() + ".json"
for fname in os.listdir(collector_dir):
low = fname.lower()
if low.endswith("datamodel.json") or low == want_json:
return os.path.join(collector_dir, fname)
if low.endswith(".log"):
return os.path.join(collector_dir, fname)
return None

@classmethod
def load_datamodel_from_path(cls, dm_path: str) -> Optional[TDataModel]:
"""Load this plugin's DATA_MODEL from a file path (JSON or .log).

Args:
dm_path: Path to datamodel JSON or to a .log file (if DATA_MODEL
implements import_model for that format).

Returns:
Instance of DATA_MODEL or None if load fails.
"""
dm_path = os.path.abspath(dm_path)
if not os.path.isfile(dm_path):
return None
data_model_cls = getattr(cls, "DATA_MODEL", None)
if not data_model_cls:
return None
try:
if dm_path.lower().endswith(".log"):
import_model = getattr(data_model_cls, "import_model", None)
if not callable(import_model):
return None
base_import = getattr(DataModel.import_model, "__func__", DataModel.import_model)
if getattr(import_model, "__func__", import_model) is base_import:
return None
return import_model(dm_path)
with open(dm_path, encoding="utf-8") as f:
data = json.load(f)
return data_model_cls.model_validate(data)
except (json.JSONDecodeError, OSError, Exception):
return None

@classmethod
def get_extracted_errors(cls, data_model: DataModel) -> Optional[list[str]]:
"""Compute extracted errors from datamodel for compare-runs (in memory only).

Uses get_compare_content() on the datamodel and ANALYZER.get_error_matches
if this plugin has an ANALYZER; otherwise returns None.

Args:
data_model: Loaded DATA_MODEL instance.

Returns:
Sorted list of error match strings, or None if not applicable.
"""
get_content = getattr(data_model, "get_compare_content", None)
if not callable(get_content):
return None
try:
content = get_content()
except Exception:
return None
if not isinstance(content, str):
return None
analyzer_cls = getattr(cls, "ANALYZER", None)
if not analyzer_cls:
return None
get_matches = getattr(analyzer_cls, "get_error_matches", None)
if not callable(get_matches):
return None
try:
matches = get_matches(content)
return sorted(matches) if matches is not None else None
except Exception:
return None

@classmethod
def load_run_data(cls, run_path: str) -> Optional[dict[str, Any]]:
"""Load this plugin's run data from a scraper run directory for comparison.

Finds the datamodel file, loads it, and returns a JSON-serializable dict
(model_dump) with optional "extracted_errors" if the plugin supports
get_compare_content and ANALYZER.get_error_matches.

Args:
run_path: Path to a scraper log run directory or to a datamodel file.

Returns:
Dict suitable for diffing with another run, or None if not found.
"""
run_path = os.path.abspath(run_path)
if not os.path.exists(run_path):
return None
dm_path = run_path if os.path.isfile(run_path) else cls.find_datamodel_path_in_run(run_path)
if not dm_path:
return None
data_model = cls.load_datamodel_from_path(dm_path)
if data_model is None:
return None
out = data_model.model_dump(mode="json")
extracted = cls.get_extracted_errors(data_model)
if extracted is not None:
out["extracted_errors"] = extracted
return out
23 changes: 22 additions & 1 deletion nodescraper/base/regexanalyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,29 @@ def count(self, val: int):
class RegexAnalyzer(DataAnalyzer[TDataModel, TAnalyzeArg]):
"""Parent class for all regex based data analyzers."""

# Class variable for timestamp pattern - can be overridden in subclasses
TIMESTAMP_PATTERN: re.Pattern = re.compile(r"(\d{4}-\d+-\d+T\d+:\d+:\d+,\d+[+-]\d+:\d+)")
ERROR_REGEX: list[ErrorRegex] = []

@classmethod
def get_error_matches(cls, content: str) -> set[str]:
"""Extract all error match strings from content using the analyzer's ERROR_REGEX.
Args:
content: Raw log text.
Returns:
Set of normalized error match strings.
"""
matches: set[str] = set()
for error_regex_obj in getattr(cls, "ERROR_REGEX", []):
for match in error_regex_obj.regex.findall(content):
if isinstance(match, str) and "\n" in match:
normalized = match.strip()
elif isinstance(match, (tuple, list)):
normalized = "\n".join(m for m in match if m)
else:
normalized = str(match).strip() if match else ""
if normalized:
matches.add(normalized)
return matches

def _extract_timestamp_from_match_position(
self, content: str, match_start: int
Expand Down
53 changes: 52 additions & 1 deletion nodescraper/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from typing import Optional

import nodescraper
from nodescraper.cli.compare_runs import run_compare_runs
from nodescraper.cli.constants import DEFAULT_CONFIG, META_VAR_MAP
from nodescraper.cli.dynamicparserbuilder import DynamicParserBuilder
from nodescraper.cli.helper import (
Expand Down Expand Up @@ -224,6 +225,40 @@ def build_parser(
help="Generate reference config from previous run logfiles. Writes to --output-path/reference_config.json if provided, otherwise ./reference_config.json.",
)

compare_runs_parser = subparsers.add_parser(
"compare-runs",
help="Compare datamodels from two run log directories",
)
compare_runs_parser.add_argument(
"path1",
type=str,
help="Path to first run log directory",
)
compare_runs_parser.add_argument(
"path2",
type=str,
help="Path to second run log directory",
)
compare_runs_parser.add_argument(
"--skip-plugins",
nargs="*",
choices=list(plugin_reg.plugins.keys()),
metavar="PLUGIN",
help="Plugin names to exclude from comparison",
)
compare_runs_parser.add_argument(
"--include-plugins",
nargs="*",
choices=list(plugin_reg.plugins.keys()),
metavar="PLUGIN",
help="If set, only compare data for these plugins (default: compare all found)",
)
compare_runs_parser.add_argument(
"--dont-truncate",
action="store_true",
dest="dont_truncate",
help="Do not truncate the Message column; show full error text and all errors (not just first 3)",
)
config_builder_parser.add_argument(
"--plugins",
nargs="*",
Expand Down Expand Up @@ -331,7 +366,11 @@ def main(arg_input: Optional[list[str]] = None):
sname = system_info.name.lower().replace("-", "_").replace(".", "_")
timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%I_%M_%S_%p")

if parsed_args.log_path and parsed_args.subcmd not in ["gen-plugin-config", "describe"]:
if parsed_args.log_path and parsed_args.subcmd not in [
"gen-plugin-config",
"describe",
"compare-runs",
]:
log_path = os.path.join(
parsed_args.log_path,
f"scraper_logs_{sname}_{timestamp}",
Expand All @@ -358,6 +397,18 @@ def main(arg_input: Optional[list[str]] = None):
if parsed_args.subcmd == "describe":
parse_describe(parsed_args, plugin_reg, config_reg, logger)

if parsed_args.subcmd == "compare-runs":
run_compare_runs(
parsed_args.path1,
parsed_args.path2,
plugin_reg,
logger,
skip_plugins=getattr(parsed_args, "skip_plugins", None) or [],
include_plugins=getattr(parsed_args, "include_plugins", None),
truncate_message=not getattr(parsed_args, "dont_truncate", False),
)
sys.exit(0)

if parsed_args.subcmd == "gen-plugin-config":

if parsed_args.reference_config_from_logs:
Expand Down
Loading
Loading