Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[project]
name = "uipath-runtime"
version = "0.0.3"
version = "0.0.4"
description = "UiPath Runtime abstractions"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
dependencies = [
"opentelemetry-sdk>=1.38.0",
"opentelemetry-instrumentation>=0.59b0",
"pydantic>=2.12.3",
"uipath-core>=0.0.1",
"uipath-core>=0.0.3",
]
classifiers = [
"Intended Audience :: Developers",
Expand Down
14 changes: 11 additions & 3 deletions src/uipath/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
"""UiPath Runtime Package."""

from uipath.runtime.base import UiPathBaseRuntime, UiPathStreamNotSupportedError
from uipath.runtime.base import (
UiPathBaseRuntime,
UiPathExecuteOptions,
UiPathExecutionRuntime,
UiPathStreamNotSupportedError,
UiPathStreamOptions,
)
from uipath.runtime.context import UiPathRuntimeContext
from uipath.runtime.events import UiPathRuntimeEvent
from uipath.runtime.factory import UiPathRuntimeExecutor, UiPathRuntimeFactory
from uipath.runtime.factory import UiPathRuntimeFactory
from uipath.runtime.result import (
UiPathApiTrigger,
UiPathBreakpointResult,
Expand All @@ -14,10 +20,12 @@
)

__all__ = [
"UiPathExecuteOptions",
"UiPathStreamOptions",
"UiPathRuntimeContext",
"UiPathBaseRuntime",
"UiPathExecutionRuntime",
"UiPathRuntimeFactory",
"UiPathRuntimeExecutor",
"UiPathRuntimeResult",
"UiPathRuntimeStatus",
"UiPathRuntimeEvent",
Expand Down
287 changes: 126 additions & 161 deletions src/uipath/runtime/base.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
"""Base runtime class and async context manager implementation."""

import json
import logging
import os
from abc import ABC, abstractmethod
from typing import AsyncGenerator
from typing import (
Any,
AsyncGenerator,
Generic,
List,
Literal,
Optional,
TypeVar,
)

from pydantic import BaseModel, Field
from typing_extensions import override
from uipath.core import UiPathTraceManager

from uipath.runtime.context import UiPathRuntimeContext
from uipath.runtime.errors import (
UiPathErrorCategory,
UiPathErrorCode,
UiPathErrorContract,
UiPathRuntimeError,
)
from uipath.runtime.events import (
UiPathRuntimeEvent,
)
from uipath.runtime.logging import UiPathRuntimeExecutionLogHandler
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
from uipath.runtime.result import UiPathRuntimeResult
from uipath.runtime.schema import (
UiPathRuntimeSchema,
)
Expand All @@ -31,13 +36,34 @@ class UiPathStreamNotSupportedError(NotImplementedError):
pass


class UiPathExecuteOptions(BaseModel):
"""Execution-time options controlling runtime behavior."""

resume: bool = Field(
default=False,
description="Indicates whether to resume a suspended execution.",
)
breakpoints: Optional[List[str] | Literal["*"]] = Field(
default=None,
description="List of nodes or '*' to break on all steps.",
)

model_config = {"arbitrary_types_allowed": True, "extra": "allow"}


class UiPathStreamOptions(UiPathExecuteOptions):
"""Streaming-specific execution options."""

pass


class UiPathBaseRuntime(ABC):
"""Base runtime class implementing the async context manager protocol.

This allows using the class with 'async with' statements.
"""

def __init__(self, context: UiPathRuntimeContext):
def __init__(self, context: Optional[UiPathRuntimeContext] = None):
"""Initialize the runtime with the provided context."""
self.context = context

Expand All @@ -48,73 +74,19 @@ async def get_schema(self) -> UiPathRuntimeSchema:
"""
raise NotImplementedError()

async def __aenter__(self):
"""Async enter method called when entering the 'async with' block.

Initializes and prepares the runtime environment.

Returns:
The runtime instance
"""
# Read the input from file if provided
if self.context.input_file:
_, file_extension = os.path.splitext(self.context.input_file)
if file_extension != ".json":
raise UiPathRuntimeError(
code=UiPathErrorCode.INVALID_INPUT_FILE_EXTENSION,
title="Invalid Input File Extension",
detail="The provided input file must be in JSON format.",
)
with open(self.context.input_file) as f:
self.context.input = f.read()

try:
if isinstance(self.context.input, str):
if self.context.input.strip():
self.context.input = json.loads(self.context.input)
else:
self.context.input = {}
elif self.context.input is None:
self.context.input = {}
# else: leave it as-is (already a dict, list, bool, etc.)
except json.JSONDecodeError as e:
raise UiPathRuntimeError(
UiPathErrorCode.INPUT_INVALID_JSON,
"Invalid JSON input",
f"The input data is not valid JSON: {str(e)}",
UiPathErrorCategory.USER,
) from e

await self.validate()

# Intercept all stdout/stderr/logs
# Write to file (runtime), stdout (debug) or log handler (if provided)
self.logs_interceptor = UiPathRuntimeLogsInterceptor(
min_level=self.context.logs_min_level,
dir=self.context.runtime_dir,
file=self.context.logs_file,
job_id=self.context.job_id,
execution_id=self.context.execution_id,
log_handler=self.context.log_handler,
)
self.logs_interceptor.setup()

return self

@abstractmethod
async def execute(self) -> UiPathRuntimeResult:
"""Execute with the provided context.

Returns:
Dictionary with execution results

Raises:
RuntimeError: If execution fails
"""
pass
async def execute(
self,
input: Optional[dict[str, Any]] = None,
options: Optional[UiPathExecuteOptions] = None,
) -> UiPathRuntimeResult:
"""Produce the agent output."""
raise NotImplementedError()

async def stream(
self,
input: Optional[dict[str, Any]] = None,
options: Optional[UiPathStreamOptions] = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream execution events in real-time.

Expand Down Expand Up @@ -154,101 +126,94 @@ async def stream(
# Without it, the function wouldn't match the AsyncGenerator return type
yield

@abstractmethod
async def validate(self):
"""Validate runtime inputs."""
pass

@abstractmethod
async def cleanup(self):
"""Cleaup runtime resources."""
pass

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async exit method called when exiting the 'async with' block.

Cleans up resources and handles any exceptions.
T = TypeVar("T", bound=UiPathBaseRuntime)


class UiPathExecutionRuntime(UiPathBaseRuntime, Generic[T]):
"""Handles runtime execution with tracing/telemetry."""

def __init__(
self,
delegate: T,
trace_manager: UiPathTraceManager,
root_span: str = "root",
execution_id: Optional[str] = None,
):
"""Initialize the executor."""
self.delegate = delegate
self.trace_manager = trace_manager
self.root_span = root_span
self.execution_id = execution_id
self.log_handler: Optional[UiPathRuntimeExecutionLogHandler] = None
if execution_id is not None:
self.log_handler = UiPathRuntimeExecutionLogHandler(execution_id)

async def execute(
self,
input: Optional[dict[str, Any]] = None,
options: Optional[UiPathExecuteOptions] = None,
) -> UiPathRuntimeResult:
"""Execute runtime with context."""
if self.log_handler:
log_interceptor = UiPathRuntimeLogsInterceptor(
execution_id=self.execution_id, log_handler=self.log_handler
)
log_interceptor.setup()

Always writes output file regardless of whether execution was successful,
suspended, or encountered an error.
"""
try:
if self.context.result is None:
execution_result = UiPathRuntimeResult()
if self.execution_id:
with self.trace_manager.start_execution_span(
self.root_span, execution_id=self.execution_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to be able to populate the parent span with extra attributes (eval item id etc)

):
return await self.delegate.execute(input, options=options)
else:
execution_result = self.context.result

if exc_type:
# Create error info from exception
if isinstance(exc_val, UiPathRuntimeError):
error_info = exc_val.error_info
else:
# Generic error
error_info = UiPathErrorContract(
code=f"ERROR_{exc_type.__name__}",
title=f"Runtime error: {exc_type.__name__}",
detail=str(exc_val),
category=UiPathErrorCategory.UNKNOWN,
)

execution_result.status = UiPathRuntimeStatus.FAULTED
execution_result.error = error_info

content = execution_result.to_dict()

# Always write output file at runtime, except for inner runtimes
# Inner runtimes have execution_id
if self.context.job_id and not self.context.execution_id:
with open(self.context.result_file_path, "w") as f:
json.dump(content, f, indent=2, default=str)

# Write the execution output to file if requested
if self.context.output_file:
with open(self.context.output_file, "w") as f:
f.write(content.get("output", "{}"))

# Don't suppress exceptions
return False

except Exception as e:
logger.error(f"Error during runtime shutdown: {str(e)}")

# Create a fallback error result if we fail during cleanup
if not isinstance(e, UiPathRuntimeError):
error_info = UiPathErrorContract(
code="RUNTIME_SHUTDOWN_ERROR",
title="Runtime shutdown failed",
detail=f"Error: {str(e)}",
category=UiPathErrorCategory.SYSTEM,
)
else:
error_info = e.error_info

# Last-ditch effort to write error output
try:
error_result = UiPathRuntimeResult(
status=UiPathRuntimeStatus.FAULTED, error=error_info
)
error_result_content = error_result.to_dict()
if self.context.job_id:
with open(self.context.result_file_path, "w") as f:
json.dump(error_result_content, f, indent=2, default=str)
except Exception as write_error:
logger.error(f"Failed to write error output file: {str(write_error)}")
raise

# Re-raise as RuntimeError if it's not already a UiPathRuntimeError
if not isinstance(e, UiPathRuntimeError):
raise RuntimeError(
error_info.code,
error_info.title,
error_info.detail,
error_info.category,
) from e
raise
return await self.delegate.execute(input, options=options)
finally:
# Restore original logging
if hasattr(self, "logs_interceptor"):
self.logs_interceptor.teardown()
self.trace_manager.flush_spans()
if self.log_handler:
log_interceptor.teardown()

await self.cleanup()
@override
async def stream(
self,
input: Optional[dict[str, Any]] = None,
options: Optional[UiPathStreamOptions] = None,
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream runtime execution with context.

Args:
runtime: The runtime instance
context: The runtime context

Yields:
UiPathRuntimeEvent instances during execution and final UiPathRuntimeResult

Raises:
UiPathStreamNotSupportedError: If the runtime doesn't support streaming
"""
if self.log_handler:
log_interceptor = UiPathRuntimeLogsInterceptor(
execution_id=self.execution_id, log_handler=self.log_handler
)
log_interceptor.setup()
try:
if self.execution_id:
with self.trace_manager.start_execution_span(
self.root_span, execution_id=self.execution_id
):
async for event in self.delegate.stream(input, options=options):
yield event
finally:
self.trace_manager.flush_spans()
if self.log_handler:
log_interceptor.teardown()

def cleanup(self) -> None:
"""Close runtime resources."""
pass
Loading