diff --git a/README.md b/README.md index 96c0f62..3b0e932 100644 --- a/README.md +++ b/README.md @@ -3,13 +3,20 @@ [![PyPI downloads](https://img.shields.io/pypi/dm/uipath-runtime.svg)](https://pypi.org/project/uipath-runtime/) [![Python versions](https://img.shields.io/pypi/pyversions/uipath-runtime.svg)](https://pypi.org/project/uipath-runtime/) -Core runtime abstractions and contracts for the UiPath Python SDK. +Runtime abstractions and contracts for the UiPath Python SDK. ## Overview -`uipath-runtime` provides the foundational interfaces and base classes for building agent runtimes in the UiPath ecosystem. It defines the contracts that all runtime implementations must follow, handles execution context, event streaming, tracing, and error management. +`uipath-runtime` provides the foundational interfaces and base classes for building agent runtimes in the UiPath ecosystem. +It defines the contracts that all runtime implementations must follow and provides utilities for execution context, event streaming, tracing, and structured error handling. -This package is typically used as a dependency by higher-level packages like `uipath-langchain`, `uipath-llamaindex`, or the main `uipath` SDK. You would use this directly only if you're building custom runtime implementations. +This package is typically used as a dependency by higher-level SDKs such as: +- [`uipath-langchain`](https://pypi.org/project/uipath-langchain) +- [`uipath-llamaindex`](https://pypi.org/project/uipath-llamaindex) +- [`uipath-mcp`](https://pypi.org/project/uipath-mcp) +- the main [`uipath`](https://pypi.org/project/uipath) SDK. + +You would use this directly only if you're building custom runtime implementations. ## Installation @@ -17,146 +24,306 @@ This package is typically used as a dependency by higher-level packages like `ui uv add uipath-runtime ``` -## Core Concepts +## Runtime Base Class -### Runtime Base Class +All runtimes inherit from `UiPathBaseRuntime` and implement these core methods: -All runtimes extend `UiPathBaseRuntime` and implement these core methods: +- `get_schema()` — defines input and output JSON schemas. +- `execute(input, options)` — executes the runtime logic and returns a `UiPathRuntimeResult`. +- `stream(input, options)` — optionally streams runtime events for real-time monitoring. +- `cleanup()` — releases resources. ```python -from uipath.runtime import UiPathBaseRuntime, UiPathRuntimeContext, UiPathRuntimeResult, UiPathRuntimeEvent +from typing import Any, AsyncGenerator, Optional +from uipath.runtime import ( + UiPathBaseRuntime, + UiPathRuntimeContext, + UiPathRuntimeResult, + UiPathRuntimeStatus, + UiPathRuntimeSchema, + UiPathRuntimeEvent, + UiPathExecuteOptions, + UiPathStreamOptions, +) +from uipath.runtime.events import UiPathRuntimeStateEvent +from uipath.runtime.errors import UiPathRuntimeError, UiPathErrorCode, UiPathErrorCategory + class MyCustomRuntime(UiPathBaseRuntime): - def __init__(self, context: UiPathRuntimeContext): - super().__init__(context) + """Example runtime demonstrating the base runtime interface.""" async def get_schema(self) -> UiPathRuntimeSchema: - # Returns the runtime's JSON schemas return UiPathRuntimeSchema( input={ "type": "object", - "properties": { - "message": { - "type": "string", - "description": "Input message" - } - }, - "required": ["message"] + "properties": {"message": {"type": "string"}}, + "required": ["message"], }, output={ "type": "object", - "properties": { - "result": { - "type": "string", - "description": "Execution result" - } - }, - "required": ["result"] - } + "properties": {"result": {"type": "string"}}, + "required": ["result"], + }, ) - async def execute(self) -> UiPathRuntimeResult: - # Execute framework-specific agent invoke logic + async def execute( + self, + input: Optional[dict[str, Any]] = None, + options: Optional[UiPathExecuteOptions] = None, + ) -> UiPathRuntimeResult: + message = (input or {}).get("message", "") return UiPathRuntimeResult( - output={"result": "success"}, - status=UiPathRuntimeStatus.SUCCESSFUL + output={"result": f"Echo: {message}"}, + status=UiPathRuntimeStatus.SUCCESSFUL, ) async def stream( self, + input: Optional[dict[str, Any]] = None, + options: Optional[UiPathStreamOptions] = None, ) -> AsyncGenerator[UiPathRuntimeEvent, None]: - # Stream events during execution for real-time monitoring - yield UiPathRuntimeStateEvent( - payload={"status": "starting"}, - execution_id=self.context.execution_id - ) - - # Yield final result + yield UiPathRuntimeStateEvent(payload={"status": "starting"}) yield UiPathRuntimeResult( output={"completed": True}, - status=UiPathRuntimeStatus.SUCCESSFUL + status=UiPathRuntimeStatus.SUCCESSFUL, ) - async def validate(self) -> None: - # Validate configuration before execution - if not self.context.entrypoint: - raise UiPathRuntimeError( - UiPathErrorCode.ENTRYPOINT_MISSING, - "Missing entrypoint", - "Detailed error message here", - UiPathErrorCategory.USER, - ) - async def cleanup(self) -> None: - # Clean up resources after execution pass ``` -### Runtime Factory -The factory pattern handles runtime instantiation, instrumentation, and tracing: +## Execution Context + +`UiPathRuntimeContext` manages configuration, file I/O, and logs across runtime execution. +It can read JSON input files, capture all stdout/stderr logs, and automatically write output and result files when execution completes. ```python -from uipath.runtime import UiPathRuntimeFactory, UiPathRuntimeContext, UiPathRuntimeExecutor +from uipath.runtime import UiPathRuntimeContext, UiPathRuntimeResult, UiPathRuntimeStatus + +with UiPathRuntimeContext.with_defaults() as ctx: + ctx.result = UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUCCESSFUL, + output={"message": "done"}, + ) +# On exit: result.json and logs are written automatically +``` -factory = UiPathRuntimeFactory(MyCustomRuntime) +When execution fails, the context: +- Writes a structured error contract to the result file. +- Re-raises the original exception. -executor = UiPathRuntimeExecutor() -# Add OpenTelemetry instrumentation -executor.add_instrumentor(MyInstrumentor, get_current_span) +## Execution Runtime -# Add span exporters for tracing -executor.add_span_exporter(JsonLinesFileExporter("trace.jsonl")) +`UiPathExecutionRuntime` wraps any runtime with tracing, telemetry, and log collection. + +```python +from uipath.core import UiPathTraceManager +from uipath.runtime import UiPathExecutionRuntime + +trace_manager = UiPathTraceManager() +runtime = MyCustomRuntime() +executor = UiPathExecutionRuntime( + runtime, + trace_manager, + root_span="my-runtime", + execution_id="exec-123", +) -# Execute -context = UiPathRuntimeContext(entrypoint="main.py", input='{"query": "hello"}') -async with factory.from_context(context): - result = await executor.execute(runtime) +result = await executor.execute({"message": "hello"}) +spans = trace_manager.get_execution_spans("exec-123") # captured spans +logs = executor.log_handler.buffer # captured logs +print(result.output) # {'result': 'Echo: hello'} ``` -### Event Streaming -Runtimes can stream events during execution for real-time monitoring: +## Event Streaming + +Runtimes can optionally emit real-time events during execution: ```python -async for event in executor.stream(runtime): +async for event in runtime.stream({"query": "hello"}): if isinstance(event, UiPathRuntimeStateEvent): print(f"State update: {event.payload}") elif isinstance(event, UiPathRuntimeMessageEvent): - print(f"Message: {event.payload}") + print(f"Message received: {event.payload}") elif isinstance(event, UiPathRuntimeResult): print(f"Completed: {event.output}") ``` -### Execution Context +If a runtime doesn’t support streaming, it raises a `UiPathStreamNotSupportedError`. + + +## Structured Error Handling -Runtime context carries configuration and state throughout execution: +Runtime errors use a consistent, structured model: ```python -context = UiPathRuntimeContext( - entrypoint="agent.py", - input='{"query": "hello"}', - job_id="job-123", - resume=False, +from uipath.runtime.errors import UiPathRuntimeError, UiPathErrorCode, UiPathErrorCategory + +raise UiPathRuntimeError( + UiPathErrorCode.EXECUTION_ERROR, + "Agent failed", + "Failed to call external service", + UiPathErrorCategory.USER, ) ``` -### Error Handling +Resulting JSON contract: -Structured error handling with categorization: +```json +{ + "code": "Python.EXECUTION_ERROR", + "title": "Agent failed", + "detail": "Failed to call external service", + "category": "User" +} +``` + + +## Example: Runtime Orchestration + +This example demonstrates an **orchestrator runtime** that receives a `UiPathRuntimeFactory`, creates child runtimes through it, and executes each one via `UiPathExecutionRuntime`, all within a single shared `UiPathTraceManager` and `UiPathRuntimeContext`. ```python -from uipath.runtime.error import ( - UiPathRuntimeError, - UiPathErrorCode, - UiPathErrorCategory +from typing import Any, Optional, List, TypeVar, Generic + +from uipath.core import UiPathTraceManager +from uipath.runtime import ( + UiPathRuntimeContext, + UiPathBaseRuntime, + UiPathExecutionRuntime, + UiPathRuntimeResult, + UiPathRuntimeStatus, + UiPathExecuteOptions, + UiPathRuntimeFactory, ) -raise UiPathRuntimeError( - UiPathErrorCode.EXECUTION_ERROR, - "Failed to execute agent", - "Detailed error message here", - UiPathErrorCategory.USER, -) + +class ChildRuntime(UiPathBaseRuntime): + """A simple child runtime that echoes its name and input.""" + + def __init__( + self, + name: str, + ): + super().__init__() + self.name = name + + async def get_schema(self): + return None + + async def execute( + self, + input: Optional[dict[str, Any]] = None, + options: Optional[UiPathExecuteOptions] = None, + ) -> UiPathRuntimeResult: + payload = input or {} + return UiPathRuntimeResult( + output={ + "runtime": self.name, + "input": payload, + }, + status=UiPathRuntimeStatus.SUCCESSFUL, + ) + + async def cleanup(self) -> None: + pass + + +class ChildRuntimeFactory(UiPathRuntimeFactory[ChildRuntime]): + """Factory that creates ChildRuntime instances.""" + + def new_runtime(self, entrypoint: str) -> ChildRuntime: + return ChildRuntime(name=entrypoint) + + def discover_runtimes(self) -> List[ChildRuntime]: + return [] + + +class OrchestratorRuntime(UiPathBaseRuntime): + """A runtime that orchestrates multiple child runtimes via a factory.""" + + def __init__( + self, + factory: UiPathRuntimeFactory[ChildRuntime], + trace_manager: UiPathTraceManager, + ): + super().__init__() + self.factory = factory + self.trace_manager = trace_manager + + async def get_schema(self): + return None + + async def execute( + self, + input: Optional[dict[str, Any]] = None, + options: Optional[UiPathExecuteOptions] = None, + ) -> UiPathRuntimeResult: + payload = input or {} + child_inputs: List[dict[str, Any]] = payload.get("children", []) + child_results: List[dict[str, Any]] = [] + + for i, child_input in enumerate(child_inputs): + # Use the factory to create a new child runtime + child_runtime = self.factory.new_runtime(entrypoint=f"child-{i}") + + # Wrap child runtime with tracing + logs + execution_id = f"{self.context.job_id}-child-{i}" if self.context else f"child-{i}" + executor = UiPathExecutionRuntime( + delegate=child_runtime, + trace_manager=self.trace_manager, + root_span=f"child-span-{i}", + execution_id=execution_id, + ) + + # Execute child runtime + result = await executor.execute(child_input, options=options) + + child_results.append(result.output or {}) + child_spans = trace_manager.get_execution_spans(execution_id) + + await child_runtime.cleanup() + + return UiPathRuntimeResult( + output={ + "main": True, + "children": child_results, + }, + status=UiPathRuntimeStatus.SUCCESSFUL, + ) + + async def cleanup(self) -> None: + pass + + +# Example usage +async def main() -> None: + trace_manager = UiPathTraceManager() + factory = ChildRuntimeFactory() + options = UiPathExecuteOptions() + + with UiPathRuntimeContext(job_id="main-job-001") as ctx: + runtime = OrchestratorRuntime(factory=factory, trace_manager=trace_manager, context=ctx) + + input_data = { + "children": [ + {"message": "hello from child 1"}, + {"message": "hello from child 2"}, + ] + } + + ctx.result = await runtime.execute(input=input_data, options=options) + print(ctx.result.output) + +# Output: +# { +# "main": True, +# "children": [ +# {"runtime": "child-0", "input": {"message": "hello from child 1"}}, +# {"runtime": "child-1", "input": {"message": "hello from child 2"}} +# ] +# } ``` diff --git a/tests/test_context.py b/tests/test_context.py index 512283f..f6ecd46 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -137,7 +137,9 @@ def test_result_file_written_on_fault_contains_error_contract(tmp_path: Path) -> # We always have an output key, even if it's an empty dict assert "output" in content - + # Status should be FAULTED + assert "status" in content + assert content["status"] == UiPathRuntimeStatus.FAULTED.value # Error contract should be present and structured assert "error" in content error = content["error"] diff --git a/tests/test_executor.py b/tests/test_executor.py index d18b89e..5aba3e3 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -117,16 +117,14 @@ def discover_runtimes(self) -> List[T]: @pytest.mark.asyncio async def test_multiple_factories_same_executor(): - """Test two factories using same executor, verify spans are captured correctly.""" + """Test factories using same trace manager, verify spans are captured correctly.""" trace_manager = UiPathTraceManager() - # Create two factories for different runtimes + # Create factories for different runtimes factory_a = UiPathTestRuntimeFactory(MockRuntimeA) factory_b = UiPathTestRuntimeFactory(MockRuntimeB) factory_c = UiPathTestRuntimeFactory(MockRuntimeC) - # Create single executor - # Execute runtime A runtime_a = factory_a.new_runtime(entrypoint="") execution_runtime_a = UiPathExecutionRuntime( @@ -142,7 +140,6 @@ async def test_multiple_factories_same_executor(): result_b = await execution_runtime_b.execute({"input": "b"}) # Execute runtime C with custom spans - runtime_c = factory_c.new_runtime(entrypoint="") execution_runtime_c = UiPathExecutionRuntime( runtime_c, trace_manager, "runtime-c-span", "exec-c"