Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
e2db21b
- Use structured config object for app-malt main loop.
Kolleida Dec 23, 2025
e563ab6
Preliminary refactor to support querying various server endpoints. Move
Kolleida Dec 24, 2025
e6755fc
- Separate out code for querying and evaluating output code of agents
Kolleida Dec 25, 2025
10a48da
Change agent_utils to use modern version of A2AClient to ping A2A ser…
Kolleida Dec 25, 2025
aa8f407
Rename AgentServer (and related objects) to AgentClient.
Kolleida Dec 25, 2025
c6822fa
- Evaluation loop now skips agents if it cannot connect (before would
Kolleida Dec 25, 2025
e5ada58
Demo green agent based on Agentbeats tutorial.
Kolleida Dec 25, 2025
e141512
Shell script to illustrate how to run green agent demo. Fix issue where
Kolleida Dec 25, 2025
5f37e65
Dependencies with uv.
Kolleida Dec 26, 2025
8b0cd65
Command line args for malt_agent to specify host and port to expose on.
Kolleida Dec 26, 2025
b49e1b4
Fix typo when passing port in malt_agent.py example.
Kolleida Dec 26, 2025
66a1b1b
- Define netarena python package to put shared code across apps.
Kolleida Dec 26, 2025
94b7024
- Remove unused cmd line args related to specifying model type.
Kolleida Dec 26, 2025
9d0f540
Remove unused arg in sample scenario (green agent).
Kolleida Dec 27, 2025
99b958b
Fix issue where query latency was not being calculated correctly.
Kolleida Dec 27, 2025
0b72155
- Include final average correctness, safety, and latency
Kolleida Dec 27, 2025
227f13f
Enhance malt_env to use shared execution namespace for LLM and ground…
lesleychou Dec 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
.idea
*/.env
.env
*.DS_Store
*/logs/*
*/__pycache__/
*/.pytest_cache/*
*/.vscode/*
*/.mypy_cache/*
*/.coverage
*/.coverage.*
logs/
__pycache__/
.pytest_cache/
.vscode/
.mypy_cache/
.coverage
.coverage.*

# Config files with credentials (use *.template.toml as base)
app-malt/config.toml
Expand Down
15 changes: 12 additions & 3 deletions app-malt/dy_query_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@
import random
import matplotlib.pyplot as plt
import json
from solid_step_helper import get_node_value_ranges, getGraphData, \
solid_step_add_node_to_graph, solid_step_counting_query, solid_step_remove_node_from_graph, solid_step_list_child_nodes, solid_step_update_node_value, solid_step_rank_child_nodes
import os
from solid_step_helper import get_node_value_ranges, getGraphData, GRAPH_TOPOLOGY_DIR
from enum import Enum


class ComplexityLevel(Enum):
LEVEL1 = 'level1'
LEVEL2 = 'level2'
LEVEL3 = 'level3'


class QueryGenerator:
def __init__(self,):
_, self.malt_real_graph = getGraphData()
node_value_ranges_path = 'data/node_value_ranges.json'
data_path = os.path.join(GRAPH_TOPOLOGY_DIR, 'node_value_ranges.json')
node_value_ranges_path = data_path
self.node_value_ranges = get_node_value_ranges(self.malt_real_graph, node_value_ranges_path)
self.queries = []

Expand Down
227 changes: 227 additions & 0 deletions app-malt/green_agent/client_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import sys
import json
import asyncio
from pathlib import Path

import tomllib

from malt_agent import EvalRequest
from a2a.types import (
AgentCard,
Message,
TaskStatusUpdateEvent,
TaskArtifactUpdateEvent,
Artifact,
TextPart,
DataPart,
Part,
Role,
)
import asyncio
import json
import logging
from uuid import uuid4

import httpx
from a2a.client import (
A2ACardResolver,
ClientConfig,
ClientFactory,
Consumer,
)

DEFAULT_TIMEOUT = 300

def create_message(*, role: Role = Role.user, text: str, context_id: str | None = None) -> Message:
return Message(
kind="message",
role=role,
parts=[Part(TextPart(kind="text", text=text))],
message_id=uuid4().hex,
context_id=context_id
)

def merge_parts(parts: list[Part]) -> str:
chunks = []
for part in parts:
if isinstance(part.root, TextPart):
chunks.append(part.root.text)
elif isinstance(part.root, DataPart):
chunks.append(json.dumps(part.root.data, indent=2))
return "\n".join(chunks)

async def send_message(message: str, base_url: str, context_id: str | None = None, streaming=False, consumer: Consumer | None = None):
"""Returns dict with context_id, response and status (if exists)"""
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as httpx_client:
resolver = A2ACardResolver(httpx_client=httpx_client, base_url=base_url)
agent_card = await resolver.get_agent_card()
config = ClientConfig(
httpx_client=httpx_client,
streaming=streaming,
)
factory = ClientFactory(config)
client = factory.create(agent_card)
if consumer:
await client.add_event_consumer(consumer)

outbound_msg = create_message(text=message, context_id=context_id)
last_event = None
outputs = {
"response": "",
"context_id": None
}

# if streaming == False, only one event is generated
async for event in client.send_message(outbound_msg):
last_event = event

match last_event:
case Message() as msg:
outputs["context_id"] = msg.context_id
outputs["response"] += merge_parts(msg.parts)

case (task, update):
outputs["context_id"] = task.context_id
outputs["status"] = task.status.state.value
msg = task.status.message
if msg:
outputs["response"] += merge_parts(msg.parts)
if task.artifacts:
for artifact in task.artifacts:
outputs["response"] += merge_parts(artifact.parts)

case _:
pass

return outputs


def parse_toml(d: dict[str, object]) -> tuple[EvalRequest, str, dict[str, str]]:
green = d.get("green_agent")
if not isinstance(green, dict) or "endpoint" not in green:
raise ValueError("green.endpoint is required in TOML")
green_endpoint: str = green["endpoint"]

parts: dict[str, str] = {}
role_to_id: dict[str, str] = {}

for p in d.get("participants", []):
if isinstance(p, dict):
role = p.get("role")
endpoint = p.get("endpoint")
agentbeats_id = p.get("agentbeats_id")
if role and endpoint:
parts[role] = endpoint
if role and agentbeats_id:
role_to_id[role] = agentbeats_id

eval_req = EvalRequest(
participants=parts,
config=d.get("config", {}) or {}
)
return eval_req, green_endpoint, role_to_id

def parse_parts(parts) -> tuple[list, list]:
text_parts = []
data_parts = []

for part in parts:
if isinstance(part.root, TextPart):
try:
data_item = json.loads(part.root.text)
data_parts.append(data_item)
except Exception:
text_parts.append(part.root.text.strip())
elif isinstance(part.root, DataPart):
data_parts.append(part.root.data)

return text_parts, data_parts

def print_parts(parts, task_state: str | None = None):
text_parts, data_parts = parse_parts(parts)

output = []
if task_state:
output.append(f"[Status: {task_state}]")
if text_parts:
output.append("\n".join(text_parts))
if data_parts:
output.extend(json.dumps(item, indent=2) for item in data_parts)

print("\n".join(output) + "\n")

async def main():
if len(sys.argv) < 2:
print("Usage: python client_cli.py <scenario.toml> [output.json]")
sys.exit(1)

scenario_path = Path(sys.argv[1])
output_path = Path(sys.argv[2]) if len(sys.argv) > 2 else None

if not scenario_path.exists():
print(f"File not found: {scenario_path}")
sys.exit(1)

toml_data = scenario_path.read_text()
data = tomllib.loads(toml_data)

req, green_url, role_to_id = parse_toml(data)

artifacts: list[Artifact] = []

async def event_consumer(event, card: AgentCard):
nonlocal artifacts
match event:
case Message() as msg:
print_parts(msg.parts)

case (task, TaskStatusUpdateEvent() as status_event):
status = status_event.status
parts = status.message.parts if status.message else []
print_parts(parts, status.state.value)
if status.state.value == "completed":
print(task.artifacts)
artifacts = task.artifacts
elif status.state.value not in ["submitted", "working"]:
print(f"Agent returned status {status.state.value}. Exiting.")
exit(1)

case (task, TaskArtifactUpdateEvent() as artifact_event):
print_parts(artifact_event.artifact.parts, "Artifact update")

case task, None:
status = task.status
parts = status.message.parts if status.message else []
print_parts(parts, task.status.state.value)
if status.state.value == "completed":
print(task.artifacts)
artifacts = task.artifacts
elif status.state.value not in ["submitted", "working"]:
print(f"Agent returned status {status.state.value}. Exiting.")
exit(1)

case _:
print("Unhandled event")

msg = req.model_dump_json()
await send_message(msg, green_url, streaming=True, consumer=event_consumer)

if output_path:
all_data_parts = []
for artifact in artifacts:
_, data_parts = parse_parts(artifact.parts)
all_data_parts.extend(data_parts)

output_data = {
"participants": role_to_id,
"results": all_data_parts
}

output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(output_data, f, indent=2)
print(f"Results written to {output_path}")


if __name__ == "__main__":
asyncio.run(main())
36 changes: 36 additions & 0 deletions app-malt/green_agent/green_agent_demo.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash

# Make sure your working directory is in the green_agent folder when running.

trap cleanup EXIT

cleanup() {
echo "Cleaning up"
kill $server_pid1 $server_pid2 # Terminates both server processes
exit
}

# Choose model to serve along with exporting relevant secrets (see LiteLLM docs for more details on configuring env variables).
export AZURE_API_KEY="<API_KEY>"
export AZURE_API_BASE="<API_BASE_URL>"
export AZURE_API_VERSION="<API_VERSION>"

MODEL_NAME="azure/<DEPLOYMENT_NAME>"

uv run ./litellm_a2a_server.py \
--model-name "${MODEL_NAME}" \
--port 8000 &
server_pid1=$! # Get the process ID of the last backgrounded command

sleep 3

# Serve the MALT evaluation agent.
uv run ./malt_agent.py &
server_pid2=$! # Get the process ID of the last backgrounded command

sleep 3

# Run the client code to send a mock evaluation request. Modify scenario.toml to point to existing benchmark data (or regenerate new ones).
uv run ./client_cli.py ./scenario.toml ./output.json

cleanup()
Loading