Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void forecastTableFunctionErrorTest() throws SQLException {
}
}

public void forecastTableFunctionErrorTest(
public static void forecastTableFunctionErrorTest(
Statement statement, AINodeTestUtils.FakeModelInfo modelInfo) throws SQLException {
// OUTPUT_START_TIME error
String invalidOutputStartTimeSQL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,16 @@ def _step(self):
batch_inputs = self._batcher.batch_request(requests).to(
"cpu"
) # The input data should first load to CPU in current version
batch_inputs = self._inference_pipeline.preprocess(batch_inputs)
batch_input_list = []
for i in range(batch_inputs.size(0)):
batch_input_list.append({"targets": batch_inputs[i]})
batch_inputs = self._inference_pipeline.preprocess(
batch_input_list, output_length=requests[0].output_length
)
if isinstance(self._inference_pipeline, ForecastPipeline):
batch_output = self._inference_pipeline.forecast(
batch_inputs,
predict_length=requests[0].output_length,
output_length=requests[0].output_length,
revin=True,
)
elif isinstance(self._inference_pipeline, ClassificationPipeline):
Expand All @@ -143,7 +148,8 @@ def _step(self):
else:
batch_output = None
self._logger.error("[Inference] Unsupported pipeline type.")
batch_output = self._inference_pipeline.postprocess(batch_output)
batch_output_list = self._inference_pipeline.postprocess(batch_output)
batch_output = torch.stack([output for output in batch_output_list], dim=0)

offset = 0
for request in requests:
Expand Down
163 changes: 143 additions & 20 deletions iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,84 +21,207 @@
import torch

from iotdb.ainode.core.exception import InferenceModelInternalException
from iotdb.ainode.core.model.model_info import ModelInfo
from iotdb.ainode.core.model.model_loader import load_model


class BasicPipeline(ABC):
def __init__(self, model_info, **model_kwargs):
def __init__(self, model_info: ModelInfo, **model_kwargs):
self.model_info = model_info
self.device = model_kwargs.get("device", "cpu")
self.model = load_model(model_info, device_map=self.device, **model_kwargs)

@abstractmethod
def preprocess(self, inputs):
def preprocess(self, inputs, **infer_kwargs):
"""
Preprocess the input before inference, including shape validation and value transformation.
"""
raise NotImplementedError("preprocess not implemented")

@abstractmethod
def postprocess(self, outputs: torch.Tensor):
def postprocess(self, outputs, **infer_kwargs):
"""
Post-process the outputs after the entire inference task.
"""
raise NotImplementedError("postprocess not implemented")


class ForecastPipeline(BasicPipeline):
def __init__(self, model_info, **model_kwargs):
def __init__(self, model_info: ModelInfo, **model_kwargs):
super().__init__(model_info, model_kwargs=model_kwargs)

def preprocess(self, inputs):
def preprocess(
self,
inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]],
**infer_kwargs,
):
"""
The inputs should be 3D tensor: [batch_size, target_count, sequence_length].
Preprocess the input data before passing it to the model for inference, validating the shape and type of the input data.
Args:
inputs (list[dict]):
The input data, a list of dictionaries, where each dictionary contains:
- 'targets': A tensor (1D or 2D) of shape (input_length,) or (target_count, input_length).
- 'past_covariates': A dictionary of tensors (optional), where each tensor has shape (input_length,).
- 'future_covariates': A dictionary of tensors (optional), where each tensor has shape (input_length,).
infer_kwargs (dict, optional): Additional keyword arguments for inference, such as:
- `output_length`(int): Used to check validation of 'future_covariates' if provided.
Raises:
ValueError: If the input format is incorrect (e.g., missing keys, invalid tensor shapes).
Returns:
The preprocessed inputs, validated and ready for model inference.
"""
if len(inputs.shape) != 3:
raise InferenceModelInternalException(
f"[Inference] Input must be: [batch_size, target_count, sequence_length], but receives {inputs.shape}"

if isinstance(inputs, list):
output_length = infer_kwargs.get("output_length", 96)
for idx, input_dict in enumerate(inputs):
# Check if the dictionary contains the expected keys
if not isinstance(input_dict, dict):
raise ValueError(f"Input at index {idx} is not a dictionary.")

required_keys = ["targets"]
for key in required_keys:
if key not in input_dict:
raise ValueError(
f"Key '{key}' is missing in input at index {idx}."
)

# Check 'targets' is torch.Tensor and has the correct shape
targets = input_dict["targets"]
if not isinstance(targets, torch.Tensor):
raise ValueError(
f"'targets' must be torch.Tensor, but got {type(targets)} at index {idx}."
)
if targets.ndim not in [1, 2]:
raise ValueError(
f"'targets' must have 1 or 2 dimensions, but got {targets.ndim} dimensions at index {idx}."
)
# If targets is 2-d, check if the second dimension is input_length
if targets.ndim == 2:
n_variates, input_length = targets.shape
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable 'n_variates' is assigned on line 104 but never used. If it's not needed for any validation or processing, consider removing this assignment or using it for additional validation logic.

Suggested change
n_variates, input_length = targets.shape
_, input_length = targets.shape

Copilot uses AI. Check for mistakes.
else:
input_length = targets.shape[
0
] # for 1-d targets, shape should be (input_length,)
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "for 1-d targets, shape should be (input_length,)" but doesn't reflect that for 1D targets, you're not validating anything about target_count. The comment might be misleading since it seems to imply a specific expected structure that isn't actually enforced.

Suggested change
] # for 1-d targets, shape should be (input_length,)
] # for 1-d targets, infer input_length from the first (and only) dimension

Copilot uses AI. Check for mistakes.

# Check 'past_covariates' if it exists (optional)
past_covariates = input_dict.get("past_covariates", {})
if not isinstance(past_covariates, dict):
raise ValueError(
f"'past_covariates' must be a dictionary, but got {type(past_covariates)} at index {idx}."
)
for cov_key, cov_value in past_covariates.items():
if not isinstance(cov_value, torch.Tensor):
raise ValueError(
f"Each value in 'past_covariates' must be torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}."
)
if cov_value.ndim != 1 or cov_value.shape[0] != input_length:
raise ValueError(
f"Each covariate in 'past_covariates' must have shape ({input_length},), but got shape {cov_value.shape} for key '{cov_key}' at index {idx}."
)

# Check 'future_covariates' if it exists (optional)
future_covariates = input_dict.get("future_covariates", {})
if not isinstance(future_covariates, dict):
raise ValueError(
f"'future_covariates' must be a dictionary, but got {type(future_covariates)} at index {idx}."
)
# If future_covariates exists, check if they are a subset of past_covariates
if future_covariates:
for cov_key, cov_value in future_covariates.items():
if cov_key not in past_covariates:
raise ValueError(
f"Key '{cov_key}' in 'future_covariates' is not in 'past_covariates' at index {idx}."
)
if not isinstance(cov_value, torch.Tensor):
raise ValueError(
f"Each value in 'future_covariates' must be torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}."
)
if cov_value.ndim != 1 or cov_value.shape[0] != output_length:
raise ValueError(
f"Each covariate in 'future_covariates' must have shape ({output_length},), but got shape {cov_value.shape} for key '{cov_key}' at index {idx}."
)
else:
raise ValueError(
f"The inputs must be a list of dictionaries, but got {type(inputs)}."
)
return inputs

@abstractmethod
def forecast(self, inputs, **infer_kwargs):
"""
Perform forecasting on the given inputs.
Parameters:
inputs: The input data used for making predictions. The type and structure
depend on the specific implementation of the model.
**infer_kwargs: Additional inference parameters such as:
- `output_length`(int): The number of time points that model should generate.
Returns:
The forecasted output, which will depend on the specific model's implementation.
"""
pass

def postprocess(self, outputs: torch.Tensor):
def postprocess(
self, outputs: list[torch.Tensor], **infer_kwargs
) -> list[torch.Tensor]:
"""
The outputs should be 3D tensor: [batch_size, target_count, predict_length].
Postprocess the model outputs after inference, validating the shape of the output data and ensures it matches the expected dimensions.
Args:
outputs:
The model outputs, which is a list of 2D tensors, where each tensor has shape `[target_count, output_length]`.
Raises:
InferenceModelInternalException: If the output tensor has an invalid shape (e.g., wrong number of dimensions).
ValueError: If the output format is incorrect.
Returns:
list[torch.Tensor]:
The postprocessed outputs, which will be a list of 2D tensors.
"""
if len(outputs.shape) != 3:
raise InferenceModelInternalException(
f"[Inference] Output must be: [batch_size, target_count, predict_length], but receives {outputs.shape}"
if isinstance(outputs, list):
for idx, output in enumerate(outputs):
if output.ndim != 2:
raise InferenceModelInternalException(
f"Output in outputs_list should be 2D-tensor, but receives {output.ndim} dims at index {idx}."
)
else:
raise ValueError(
f"The outputs should be a list of 2D-tensors, but got {type(outputs)}."
)
return outputs


class ClassificationPipeline(BasicPipeline):
def __init__(self, model_info, **model_kwargs):
def __init__(self, model_info: ModelInfo, **model_kwargs):
super().__init__(model_info, model_kwargs=model_kwargs)

def preprocess(self, inputs):
def preprocess(self, inputs, **kwargs):
return inputs

@abstractmethod
def classify(self, inputs, **kwargs):
pass

def postprocess(self, outputs: torch.Tensor):
def postprocess(self, outputs, **kwargs):
return outputs


class ChatPipeline(BasicPipeline):
def __init__(self, model_info, **model_kwargs):
def __init__(self, model_info: ModelInfo, **model_kwargs):
super().__init__(model_info, model_kwargs=model_kwargs)

def preprocess(self, inputs):
def preprocess(self, inputs, **kwargs):
return inputs

@abstractmethod
def chat(self, inputs, **kwargs):
pass

def postprocess(self, outputs: torch.Tensor):
def postprocess(self, outputs, **kwargs):
return outputs
22 changes: 16 additions & 6 deletions iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import time
from typing import Dict

import pandas as pd
import torch
import torch.multiprocessing as mp

Expand Down Expand Up @@ -183,6 +182,13 @@ def _run(

inference_attrs = extract_attrs(req)
output_length = int(inference_attrs.pop("output_length", 96))

# model_inputs_list: Each element is a dict, which contains the following keys:
# `targets`: The input tensor for the target variable(s), whose shape is [target_count, input_length].
model_inputs_list: list[
dict[str, torch.Tensor | dict[str, torch.Tensor]]
] = [{"targets": inputs[0]}]

if (
output_length
> AINodeDescriptor().get_config().get_ain_inference_max_output_length()
Expand All @@ -200,17 +206,21 @@ def _run(
infer_req = InferenceRequest(
req_id=generate_req_id(),
model_id=model_id,
inputs=inputs,
inputs=torch.stack(
[data["targets"] for data in model_inputs_list], dim=0
),
output_length=output_length,
)
outputs = self._process_request(infer_req)
else:
model_info = self._model_manager.get_model_info(model_id)
inference_pipeline = load_pipeline(model_info, device="cpu")
inputs = inference_pipeline.preprocess(inputs)
inputs = inference_pipeline.preprocess(
model_inputs_list, output_length=output_length
)
if isinstance(inference_pipeline, ForecastPipeline):
outputs = inference_pipeline.forecast(
inputs, predict_length=output_length, **inference_attrs
inputs, output_length=output_length, **inference_attrs
)
elif isinstance(inference_pipeline, ClassificationPipeline):
outputs = inference_pipeline.classify(inputs)
Expand All @@ -223,8 +233,8 @@ def _run(

# convert tensor into tsblock for the output in each batch
output_list = []
for batch_idx in range(outputs.size(0)):
output = convert_tensor_to_tsblock(outputs[batch_idx])
for batch_idx, output in enumerate(outputs):
output = convert_tensor_to_tsblock(output)
output_list.append(output)

return resp_cls(
Expand Down
Loading