diff --git a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeForecastIT.java b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeForecastIT.java index c2114ac94995..e2d759d4bcf8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeForecastIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeForecastIT.java @@ -113,7 +113,7 @@ public void forecastTableFunctionErrorTest() throws SQLException { } } - public void forecastTableFunctionErrorTest( + public static void forecastTableFunctionErrorTest( Statement statement, AINodeTestUtils.FakeModelInfo modelInfo) throws SQLException { // OUTPUT_START_TIME error String invalidOutputStartTimeSQL = diff --git a/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py b/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py index 3cca9b183c8f..fb03e0af5205 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py +++ b/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py @@ -123,11 +123,16 @@ def _step(self): batch_inputs = self._batcher.batch_request(requests).to( "cpu" ) # The input data should first load to CPU in current version - batch_inputs = self._inference_pipeline.preprocess(batch_inputs) + batch_input_list = [] + for i in range(batch_inputs.size(0)): + batch_input_list.append({"targets": batch_inputs[i]}) + batch_inputs = self._inference_pipeline.preprocess( + batch_input_list, output_length=requests[0].output_length + ) if isinstance(self._inference_pipeline, ForecastPipeline): batch_output = self._inference_pipeline.forecast( batch_inputs, - predict_length=requests[0].output_length, + output_length=requests[0].output_length, revin=True, ) elif isinstance(self._inference_pipeline, ClassificationPipeline): @@ -143,7 +148,8 @@ def _step(self): else: batch_output = None self._logger.error("[Inference] Unsupported pipeline type.") - batch_output = self._inference_pipeline.postprocess(batch_output) + batch_output_list = self._inference_pipeline.postprocess(batch_output) + batch_output = torch.stack([output for output in batch_output_list], dim=0) offset = 0 for request in requests: diff --git a/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py b/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py index d7345d3140f6..f1704fb90c4c 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py +++ b/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py @@ -21,24 +21,25 @@ import torch from iotdb.ainode.core.exception import InferenceModelInternalException +from iotdb.ainode.core.model.model_info import ModelInfo from iotdb.ainode.core.model.model_loader import load_model class BasicPipeline(ABC): - def __init__(self, model_info, **model_kwargs): + def __init__(self, model_info: ModelInfo, **model_kwargs): self.model_info = model_info self.device = model_kwargs.get("device", "cpu") self.model = load_model(model_info, device_map=self.device, **model_kwargs) @abstractmethod - def preprocess(self, inputs): + def preprocess(self, inputs, **infer_kwargs): """ Preprocess the input before inference, including shape validation and value transformation. """ raise NotImplementedError("preprocess not implemented") @abstractmethod - def postprocess(self, outputs: torch.Tensor): + def postprocess(self, outputs, **infer_kwargs): """ Post-process the outputs after the entire inference task. """ @@ -46,59 +47,181 @@ def postprocess(self, outputs: torch.Tensor): class ForecastPipeline(BasicPipeline): - def __init__(self, model_info, **model_kwargs): + def __init__(self, model_info: ModelInfo, **model_kwargs): super().__init__(model_info, model_kwargs=model_kwargs) - def preprocess(self, inputs): + def preprocess( + self, + inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]], + **infer_kwargs, + ): """ - The inputs should be 3D tensor: [batch_size, target_count, sequence_length]. + Preprocess the input data before passing it to the model for inference, validating the shape and type of the input data. + + Args: + inputs (list[dict]): + The input data, a list of dictionaries, where each dictionary contains: + - 'targets': A tensor (1D or 2D) of shape (input_length,) or (target_count, input_length). + - 'past_covariates': A dictionary of tensors (optional), where each tensor has shape (input_length,). + - 'future_covariates': A dictionary of tensors (optional), where each tensor has shape (input_length,). + + infer_kwargs (dict, optional): Additional keyword arguments for inference, such as: + - `output_length`(int): Used to check validation of 'future_covariates' if provided. + + Raises: + ValueError: If the input format is incorrect (e.g., missing keys, invalid tensor shapes). + + Returns: + The preprocessed inputs, validated and ready for model inference. """ - if len(inputs.shape) != 3: - raise InferenceModelInternalException( - f"[Inference] Input must be: [batch_size, target_count, sequence_length], but receives {inputs.shape}" + + if isinstance(inputs, list): + output_length = infer_kwargs.get("output_length", 96) + for idx, input_dict in enumerate(inputs): + # Check if the dictionary contains the expected keys + if not isinstance(input_dict, dict): + raise ValueError(f"Input at index {idx} is not a dictionary.") + + required_keys = ["targets"] + for key in required_keys: + if key not in input_dict: + raise ValueError( + f"Key '{key}' is missing in input at index {idx}." + ) + + # Check 'targets' is torch.Tensor and has the correct shape + targets = input_dict["targets"] + if not isinstance(targets, torch.Tensor): + raise ValueError( + f"'targets' must be torch.Tensor, but got {type(targets)} at index {idx}." + ) + if targets.ndim not in [1, 2]: + raise ValueError( + f"'targets' must have 1 or 2 dimensions, but got {targets.ndim} dimensions at index {idx}." + ) + # If targets is 2-d, check if the second dimension is input_length + if targets.ndim == 2: + n_variates, input_length = targets.shape + else: + input_length = targets.shape[ + 0 + ] # for 1-d targets, shape should be (input_length,) + + # Check 'past_covariates' if it exists (optional) + past_covariates = input_dict.get("past_covariates", {}) + if not isinstance(past_covariates, dict): + raise ValueError( + f"'past_covariates' must be a dictionary, but got {type(past_covariates)} at index {idx}." + ) + for cov_key, cov_value in past_covariates.items(): + if not isinstance(cov_value, torch.Tensor): + raise ValueError( + f"Each value in 'past_covariates' must be torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}." + ) + if cov_value.ndim != 1 or cov_value.shape[0] != input_length: + raise ValueError( + f"Each covariate in 'past_covariates' must have shape ({input_length},), but got shape {cov_value.shape} for key '{cov_key}' at index {idx}." + ) + + # Check 'future_covariates' if it exists (optional) + future_covariates = input_dict.get("future_covariates", {}) + if not isinstance(future_covariates, dict): + raise ValueError( + f"'future_covariates' must be a dictionary, but got {type(future_covariates)} at index {idx}." + ) + # If future_covariates exists, check if they are a subset of past_covariates + if future_covariates: + for cov_key, cov_value in future_covariates.items(): + if cov_key not in past_covariates: + raise ValueError( + f"Key '{cov_key}' in 'future_covariates' is not in 'past_covariates' at index {idx}." + ) + if not isinstance(cov_value, torch.Tensor): + raise ValueError( + f"Each value in 'future_covariates' must be torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}." + ) + if cov_value.ndim != 1 or cov_value.shape[0] != output_length: + raise ValueError( + f"Each covariate in 'future_covariates' must have shape ({output_length},), but got shape {cov_value.shape} for key '{cov_key}' at index {idx}." + ) + else: + raise ValueError( + f"The inputs must be a list of dictionaries, but got {type(inputs)}." ) return inputs @abstractmethod def forecast(self, inputs, **infer_kwargs): + """ + Perform forecasting on the given inputs. + + Parameters: + inputs: The input data used for making predictions. The type and structure + depend on the specific implementation of the model. + **infer_kwargs: Additional inference parameters such as: + - `output_length`(int): The number of time points that model should generate. + + Returns: + The forecasted output, which will depend on the specific model's implementation. + """ pass - def postprocess(self, outputs: torch.Tensor): + def postprocess( + self, outputs: list[torch.Tensor], **infer_kwargs + ) -> list[torch.Tensor]: """ - The outputs should be 3D tensor: [batch_size, target_count, predict_length]. + Postprocess the model outputs after inference, validating the shape of the output data and ensures it matches the expected dimensions. + + Args: + outputs: + The model outputs, which is a list of 2D tensors, where each tensor has shape `[target_count, output_length]`. + + Raises: + InferenceModelInternalException: If the output tensor has an invalid shape (e.g., wrong number of dimensions). + ValueError: If the output format is incorrect. + + Returns: + list[torch.Tensor]: + The postprocessed outputs, which will be a list of 2D tensors. """ - if len(outputs.shape) != 3: - raise InferenceModelInternalException( - f"[Inference] Output must be: [batch_size, target_count, predict_length], but receives {outputs.shape}" + if isinstance(outputs, list): + for idx, output in enumerate(outputs): + if output.ndim != 2: + raise InferenceModelInternalException( + f"Output in outputs_list should be 2D-tensor, but receives {output.ndim} dims at index {idx}." + ) + else: + raise ValueError( + f"The outputs should be a list of 2D-tensors, but got {type(outputs)}." ) return outputs class ClassificationPipeline(BasicPipeline): - def __init__(self, model_info, **model_kwargs): + def __init__(self, model_info: ModelInfo, **model_kwargs): super().__init__(model_info, model_kwargs=model_kwargs) - def preprocess(self, inputs): + def preprocess(self, inputs, **kwargs): return inputs @abstractmethod def classify(self, inputs, **kwargs): pass - def postprocess(self, outputs: torch.Tensor): + def postprocess(self, outputs, **kwargs): return outputs class ChatPipeline(BasicPipeline): - def __init__(self, model_info, **model_kwargs): + def __init__(self, model_info: ModelInfo, **model_kwargs): super().__init__(model_info, model_kwargs=model_kwargs) - def preprocess(self, inputs): + def preprocess(self, inputs, **kwargs): return inputs @abstractmethod def chat(self, inputs, **kwargs): pass - def postprocess(self, outputs: torch.Tensor): + def postprocess(self, outputs, **kwargs): return outputs diff --git a/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py b/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py index d3f77e993a1b..ada641dd54c7 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py +++ b/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py @@ -20,7 +20,6 @@ import time from typing import Dict -import pandas as pd import torch import torch.multiprocessing as mp @@ -183,6 +182,13 @@ def _run( inference_attrs = extract_attrs(req) output_length = int(inference_attrs.pop("output_length", 96)) + + # model_inputs_list: Each element is a dict, which contains the following keys: + # `targets`: The input tensor for the target variable(s), whose shape is [target_count, input_length]. + model_inputs_list: list[ + dict[str, torch.Tensor | dict[str, torch.Tensor]] + ] = [{"targets": inputs[0]}] + if ( output_length > AINodeDescriptor().get_config().get_ain_inference_max_output_length() @@ -200,17 +206,21 @@ def _run( infer_req = InferenceRequest( req_id=generate_req_id(), model_id=model_id, - inputs=inputs, + inputs=torch.stack( + [data["targets"] for data in model_inputs_list], dim=0 + ), output_length=output_length, ) outputs = self._process_request(infer_req) else: model_info = self._model_manager.get_model_info(model_id) inference_pipeline = load_pipeline(model_info, device="cpu") - inputs = inference_pipeline.preprocess(inputs) + inputs = inference_pipeline.preprocess( + model_inputs_list, output_length=output_length + ) if isinstance(inference_pipeline, ForecastPipeline): outputs = inference_pipeline.forecast( - inputs, predict_length=output_length, **inference_attrs + inputs, output_length=output_length, **inference_attrs ) elif isinstance(inference_pipeline, ClassificationPipeline): outputs = inference_pipeline.classify(inputs) @@ -223,8 +233,8 @@ def _run( # convert tensor into tsblock for the output in each batch output_list = [] - for batch_idx in range(outputs.size(0)): - output = convert_tensor_to_tsblock(outputs[batch_idx]) + for batch_idx, output in enumerate(outputs): + output = convert_tensor_to_tsblock(output) output_list.append(output) return resp_cls( diff --git a/iotdb-core/ainode/iotdb/ainode/core/model/chronos2/pipeline_chronos2.py b/iotdb-core/ainode/iotdb/ainode/core/model/chronos2/pipeline_chronos2.py index b99a93078414..3fdc7b41b17a 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/model/chronos2/pipeline_chronos2.py +++ b/iotdb-core/ainode/iotdb/ainode/core/model/chronos2/pipeline_chronos2.py @@ -21,7 +21,6 @@ from einops import rearrange, repeat from torch.utils.data import DataLoader -from iotdb.ainode.core.exception import InferenceModelInternalException from iotdb.ainode.core.inference.pipeline.basic_pipeline import ForecastPipeline from iotdb.ainode.core.log import Logger from iotdb.ainode.core.model.chronos2.dataset import Chronos2Dataset, DatasetMode @@ -37,8 +36,35 @@ class Chronos2Pipeline(ForecastPipeline): def __init__(self, model_info, **model_kwargs): super().__init__(model_info, model_kwargs=model_kwargs) - def preprocess(self, inputs): - inputs = super().preprocess(inputs) + def preprocess(self, inputs, **infer_kwargs): + """ + Preprocess input data of chronos2. + + Parameters + ---------- + inputs : list of dict + A list of dictionaries containing input data. Each dictionary contains: + - 'targets': A tensor (1D or 2D) of shape (input_length,) or (target_count, input_length). + - 'past_covariates': A dictionary of tensors (optional), where each tensor has shape (input_length,). + - 'future_covariates': A dictionary of tensors (optional), where each tensor has shape (input_length,). + + infer_kwargs: Additional keyword arguments for inference, such as: + - `output_length`(int): Used to check validation of 'future_covariates' if provided. + + Returns + ------- + list of dict + Processed inputs with the following structure for each dictionary: + - 'target': torch.Tensor + The renamed target tensor (originally 'targets'), shape [target_count, input_length]. + - 'past_covariates' (optional): dict of str to torch.Tensor + Unchanged past covariates. + - 'future_covariates' (optional): dict of str to torch.Tensor + Unchanged future covariates. + """ + super().preprocess(inputs, **infer_kwargs) + for item in inputs: + item["target"] = item.pop("targets") return inputs @property @@ -206,9 +232,28 @@ def _autoregressive_unroll_for_long_horizon( return prediction, context, future_covariates - def forecast(self, inputs, **infer_kwargs): + def forecast(self, inputs, **infer_kwargs) -> list[torch.Tensor]: + """ + Generate forecasts for the input time series. + + Parameters + ---------- + inputs : + - A 3D `torch.Tensor` or `np.ndarray` of shape (batch_size, n_variates, history_length). + - A list of 1D or 2D `torch.Tensor` or `np.ndarray`, where each element has shape (history_length,) or (n_variates, history_length). + - A list of dictionaries, each with: + - `target` (required): 1D or 2D `torch.Tensor` or `np.ndarray` of shape (history_length,) or (n_variates, history_length). + - `past_covariates` (optional): dict of past-only covariates with 1D `torch.Tensor` or `np.ndarray`. + - `future_covariates` (optional): dict of future covariates with 1D `torch.Tensor` or `np.ndarray`. + + **infer_kwargs** : Additional arguments for inference. + + Returns + ------- + list of torch.Tensor : The model's predictions, each of shape (n_variates, n_quantiles, prediction_length). + """ model_prediction_length = self.model_prediction_length - prediction_length = infer_kwargs.get("predict_length", 96) + prediction_length = infer_kwargs.get("output_length", 96) # The maximum number of output patches to generate in a single forward pass before the long-horizon heuristic kicks in. Note: A value larger # than the model's default max_output_patches may lead to degradation in forecast accuracy, defaults to a model-specific value max_output_patches = infer_kwargs.get( @@ -218,10 +263,13 @@ def forecast(self, inputs, **infer_kwargs): # are appended to the historical context and input into the model autoregressively to generate long-horizon predictions. Note that the # effective batch size increases by a factor of `len(unrolled_quantiles)` when making long-horizon predictions, # by default [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] + + # Note that this parameter is used for long horizon forecasting, the default quantile_levels + # are [0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, + # 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 0.99] unrolled_quantiles = infer_kwargs.get( "unrolled_quantiles", [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] ) - if not set(unrolled_quantiles).issubset(self.quantiles): raise ValueError( f"Unrolled quantiles must be a subset of the model's quantiles. " @@ -236,7 +284,9 @@ def forecast(self, inputs, **infer_kwargs): ) logger.warning(msg) - context_length = inputs.shape[-1] + # The maximum context length used during for inference, + # by default set to the model's default context length + context_length = infer_kwargs.get("context_length", self.model_context_length) if context_length > self.model_context_length: logger.warning( f"The specified context_length {context_length} is greater than the model's default context length {self.model_context_length}. " @@ -244,11 +294,16 @@ def forecast(self, inputs, **infer_kwargs): ) context_length = self.model_context_length + # The batch size used for prediction. + # Note that the batch size here means the number of time series, + # including target(s) and covariates,which are input into the model. + batch_size = infer_kwargs.get("batch_size", 256) + test_dataset = Chronos2Dataset.convert_inputs( inputs=inputs, context_length=context_length, prediction_length=prediction_length, - batch_size=256, + batch_size=batch_size, output_patch_size=self.model_output_patch_size, mode=DatasetMode.TEST, ) @@ -268,6 +323,13 @@ def forecast(self, inputs, **infer_kwargs): batch_future_covariates = batch["future_covariates"] batch_target_idx_ranges = batch["target_idx_ranges"] + # If True, cross-learning is enabled, + # i.e., all the tasks in `inputs` will be predicted jointly and the model will share information across all inputs, + # by default False + predict_batches_jointly = infer_kwargs.get("predict_batches_jointly", False) + if predict_batches_jointly: + batch_group_ids = torch.zeros_like(batch_group_ids) + batch_prediction = self._predict_batch( context=batch_context, group_ids=batch_group_ids, @@ -387,5 +449,28 @@ def _predict_step( return prediction - def postprocess(self, output: torch.Tensor): - return output[0].mean(dim=1, keepdim=True) + def postprocess( + self, outputs: list[torch.Tensor], **infer_kwargs + ) -> list[torch.Tensor]: + """ + Postprocesses the model's forecast outputs by selecting the 0.5 quantile or averaging over quantiles. + + Args: + outputs (list[torch.Tensor]): List of forecast outputs, where each output is a 3D-tensor with shape [target_count, quantile_count, output_length]. + + Returns: + list[torch.Tensor]: Processed list of forecast outputs, each is a 2D-tensor with shape [target_count, output_length]. + """ + outputs_list = [] + for output in outputs: + # Check if 0.5 quantile is available + if 0.5 in self.quantiles: + idx = self.quantiles.index(0.5) + # Get the 0.5 quantile value + outputs_list.append(output[:, idx, :]) + else: + # If 0.5 quantile is not provided, + # get the mean of all quantiles + outputs_list.append(output.mean(dim=1)) + super().postprocess(outputs_list, **infer_kwargs) + return outputs_list diff --git a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/arima/config.json b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/arima/config.json index 1561124badd1..c9f80477643b 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/arima/config.json +++ b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/arima/config.json @@ -1,7 +1,7 @@ { "model_type": "sktime", "model_id": "arima", - "predict_length": 1, + "output_length": 1, "order": [1, 0, 0], "seasonal_order": [0, 0, 0, 0], "start_params": null, diff --git a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/configuration_sktime.py b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/configuration_sktime.py index d9d20545af6f..f6d0c94ee39a 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/configuration_sktime.py +++ b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/configuration_sktime.py @@ -153,7 +153,7 @@ def parse(self, string_value: str): # Model configuration definitions - using concise dictionary format MODEL_CONFIGS = { "NAIVE_FORECASTER": { - "predict_length": AttributeConfig("predict_length", 1, "int", 1, 5000), + "output_length": AttributeConfig("output_length", 1, "int", 1, 5000), "strategy": AttributeConfig( "strategy", "last", "str", choices=["last", "mean", "drift"] ), @@ -161,7 +161,7 @@ def parse(self, string_value: str): "sp": AttributeConfig("sp", 1, "int", 1, 5000), }, "EXPONENTIAL_SMOOTHING": { - "predict_length": AttributeConfig("predict_length", 1, "int", 1, 5000), + "output_length": AttributeConfig("output_length", 1, "int", 1, 5000), "damped_trend": AttributeConfig("damped_trend", False, "bool"), "initialization_method": AttributeConfig( "initialization_method", @@ -174,7 +174,7 @@ def parse(self, string_value: str): "use_brute": AttributeConfig("use_brute", False, "bool"), }, "ARIMA": { - "predict_length": AttributeConfig("predict_length", 1, "int", 1, 5000), + "output_length": AttributeConfig("output_length", 1, "int", 1, 5000), "order": AttributeConfig("order", (1, 0, 0), "tuple", value_type=int), "seasonal_order": AttributeConfig( "seasonal_order", (0, 0, 0, 0), "tuple", value_type=int @@ -212,7 +212,7 @@ def parse(self, string_value: str): "concentrate_scale": AttributeConfig("concentrate_scale", False, "bool"), }, "STL_FORECASTER": { - "predict_length": AttributeConfig("predict_length", 1, "int", 1, 5000), + "output_length": AttributeConfig("output_length", 1, "int", 1, 5000), "sp": AttributeConfig("sp", 2, "int", 1, 5000), "seasonal": AttributeConfig("seasonal", 7, "int", 1, 5000), "trend": AttributeConfig("trend", None, "int"), diff --git a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/exponential_smoothing/config.json b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/exponential_smoothing/config.json index 4126d9de857a..a0550548bbee 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/exponential_smoothing/config.json +++ b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/exponential_smoothing/config.json @@ -1,7 +1,7 @@ { "model_type": "sktime", "model_id": "exponential_smoothing", - "predict_length": 1, + "output_length": 1, "damped_trend": false, "initialization_method": "estimated", "optimized": true, diff --git a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/modeling_sktime.py b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/modeling_sktime.py index 9ddbcab286fa..11277eb26e55 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/modeling_sktime.py +++ b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/modeling_sktime.py @@ -59,11 +59,11 @@ class ForecastingModel(SktimeModel): def generate(self, data, **kwargs): """Execute forecasting""" try: - predict_length = kwargs.get( - "predict_length", self._attributes["predict_length"] + output_length = kwargs.get( + "output_length", self._attributes["output_length"] ) self._model.fit(data) - output = self._model.predict(fh=range(predict_length)) + output = self._model.predict(fh=range(output_length)) return np.array(output, dtype=np.float64) except Exception as e: raise InferenceModelInternalException(str(e)) @@ -75,8 +75,8 @@ class DetectionModel(SktimeModel): def generate(self, data, **kwargs): """Execute detection""" try: - predict_length = kwargs.get("predict_length", data.size) - output = self._model.fit_transform(data[:predict_length]) + output_length = kwargs.get("output_length", data.size) + output = self._model.fit_transform(data[:output_length]) if isinstance(output, pd.DataFrame): return np.array(output["labels"], dtype=np.int32) else: @@ -91,7 +91,7 @@ class ArimaModel(ForecastingModel): def __init__(self, attributes: Dict[str, Any]): super().__init__(attributes) self._model = ARIMA( - **{k: v for k, v in attributes.items() if k != "predict_length"} + **{k: v for k, v in attributes.items() if k != "output_length"} ) @@ -101,7 +101,7 @@ class ExponentialSmoothingModel(ForecastingModel): def __init__(self, attributes: Dict[str, Any]): super().__init__(attributes) self._model = ExponentialSmoothing( - **{k: v for k, v in attributes.items() if k != "predict_length"} + **{k: v for k, v in attributes.items() if k != "output_length"} ) @@ -111,7 +111,7 @@ class NaiveForecasterModel(ForecastingModel): def __init__(self, attributes: Dict[str, Any]): super().__init__(attributes) self._model = NaiveForecaster( - **{k: v for k, v in attributes.items() if k != "predict_length"} + **{k: v for k, v in attributes.items() if k != "output_length"} ) @@ -121,7 +121,7 @@ class STLForecasterModel(ForecastingModel): def __init__(self, attributes: Dict[str, Any]): super().__init__(attributes) self._model = STLForecaster( - **{k: v for k, v in attributes.items() if k != "predict_length"} + **{k: v for k, v in attributes.items() if k != "output_length"} ) diff --git a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/naive_forecaster/config.json b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/naive_forecaster/config.json index 3dadd7c3b1e5..797a2fd4b98a 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/naive_forecaster/config.json +++ b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/naive_forecaster/config.json @@ -1,7 +1,7 @@ { "model_type": "sktime", "model_id": "naive_forecaster", - "predict_length": 1, + "output_length": 1, "strategy": "last", "window_length": null, "sp": 1 diff --git a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/pipeline_sktime.py b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/pipeline_sktime.py index a10a0a134a24..964ab156e264 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/pipeline_sktime.py +++ b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/pipeline_sktime.py @@ -22,54 +22,94 @@ from iotdb.ainode.core.exception import InferenceModelInternalException from iotdb.ainode.core.inference.pipeline.basic_pipeline import ForecastPipeline +from iotdb.ainode.core.log import Logger +from iotdb.ainode.core.model.model_info import ModelInfo + +logger = Logger() class SktimePipeline(ForecastPipeline): - def __init__(self, model_info, **model_kwargs): + def __init__(self, model_info: ModelInfo, **model_kwargs): model_kwargs.pop("device", None) # sktime models run on CPU super().__init__(model_info, model_kwargs=model_kwargs) - def preprocess(self, inputs): - inputs = super().preprocess(inputs) + def preprocess( + self, + inputs: list[dict[str, dict[str, torch.Tensor] | torch.Tensor]], + **infer_kwargs, + ) -> list[pd.Series]: + """ + Preprocess the input data for forecasting. + + Parameters: + inputs (list): A list of dictionaries containing input data with key 'targets'. + + Returns: + list of pd.Series: Processed inputs for the model with each of shape [input_length, ]. + """ + model_id = self.model_info.model_id + + inputs = super().preprocess(inputs, **infer_kwargs) + + # Here, we assume element in list has same history_length, + # otherwise, the model cannot proceed + if inputs[0].get("past_covariates", None) or inputs[0].get( + "future_covariates", None + ): + logger.warning( + f"[Inference] Past_covariates and future_covariates will be ignored, as they are not supported for model {model_id}." + ) + + # stack the data and get a 3D-tensor: [batch_size, target_count(1), input_length] + inputs = torch.stack([data["targets"] for data in inputs], dim=0) if inputs.shape[1] != 1: raise InferenceModelInternalException( - f"[Inference] Sktime model only supports univarate forecast, but receives {inputs.shape[1]} target variables." + f"Model {model_id} only supports univariate forecast, but receives {inputs.shape[1]} target variables." ) + # Transform into a 2D-tensor: [batch_size, input_length] inputs = inputs.squeeze(1) + # Transform into a list of Series with each of shape [input_length,] + inputs = [pd.Series(data.cpu().numpy()) for i, data in enumerate(inputs)] + return inputs - def forecast(self, inputs, **infer_kwargs): - predict_length = infer_kwargs.get("predict_length", 96) - - # Convert to pandas Series for sktime (sktime expects Series or DataFrame) - # Handle batch dimension: if batch_size > 1, process each sample separately - if len(inputs.shape) == 2 and inputs.shape[0] > 1: - # Batch processing: convert each row to Series - outputs = [] - for i in range(inputs.shape[0]): - series = pd.Series( - inputs[i].cpu().numpy() - if isinstance(inputs, torch.Tensor) - else inputs[i] - ) - output = self.model.generate(series, predict_length=predict_length) - outputs.append(output) - outputs = np.array(outputs) - else: - # Single sample: convert to Series - if isinstance(inputs, torch.Tensor): - series = pd.Series(inputs.squeeze().cpu().numpy()) - else: - series = pd.Series(inputs.squeeze()) - outputs = self.model.generate(series, predict_length=predict_length) - # Add batch dimension if needed - if len(outputs.shape) == 1: - outputs = outputs[np.newaxis, :] + def forecast(self, inputs: list[pd.Series], **infer_kwargs) -> np.ndarray: + """ + Generate forecasts from the model for given inputs. + + Parameters: + inputs (list[Series]): A list of input data for forecasting with each of shape [input_length,]. + **infer_kwargs: Additional inference parameters such as: + - 'output_length'(int): The number of time points that model should generate. + + Returns: + np.ndarray: Forecasted outputs. + """ + output_length = infer_kwargs.get("output_length", 96) + + # Batch processing + outputs = [] + for series in inputs: + output = self.model.generate(series, output_length=output_length) + outputs.append(output) + outputs = np.array(outputs) return outputs - def postprocess(self, outputs): - if isinstance(outputs, np.ndarray): - outputs = torch.from_numpy(outputs).float() - outputs = super().postprocess(outputs.unsqueeze(1)) + def postprocess(self, outputs: np.ndarray, **infer_kwargs) -> list[torch.Tensor]: + """ + Postprocess the model's outputs. + + Parameters: + outputs (np.ndarray): Model output to be processed. + **infer_kwargs: Additional inference parameters. + + Returns: + list of torch.Tensor: List of 2D-tensors with shape [target_count(1), output_length]. + """ + + # Transform outputs into a 2D-tensor: [batch_size, output_length] + outputs = torch.from_numpy(outputs).float() + outputs = [outputs[i].unsqueeze(0) for i in range(outputs.size(0))] + outputs = super().postprocess(outputs, **infer_kwargs) return outputs diff --git a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/stl_forecaster/config.json b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/stl_forecaster/config.json index bfe71dbc4861..dff4d5d15a1f 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/model/sktime/stl_forecaster/config.json +++ b/iotdb-core/ainode/iotdb/ainode/core/model/sktime/stl_forecaster/config.json @@ -1,7 +1,7 @@ { "model_type": "sktime", "model_id": "stl_forecaster", - "predict_length": 1, + "output_length": 1, "sp": 2, "seasonal": 7, "trend": null, diff --git a/iotdb-core/ainode/iotdb/ainode/core/model/sundial/pipeline_sundial.py b/iotdb-core/ainode/iotdb/ainode/core/model/sundial/pipeline_sundial.py index 69422dfadb25..1715f190e32d 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/model/sundial/pipeline_sundial.py +++ b/iotdb-core/ainode/iotdb/ainode/core/model/sundial/pipeline_sundial.py @@ -20,42 +20,92 @@ from iotdb.ainode.core.exception import InferenceModelInternalException from iotdb.ainode.core.inference.pipeline.basic_pipeline import ForecastPipeline +from iotdb.ainode.core.log import Logger +from iotdb.ainode.core.model.model_info import ModelInfo + +logger = Logger() class SundialPipeline(ForecastPipeline): - def __init__(self, model_info, **model_kwargs): + def __init__(self, model_info: ModelInfo, **model_kwargs): super().__init__(model_info, model_kwargs=model_kwargs) - def preprocess(self, inputs): + def preprocess(self, inputs, **infer_kwargs) -> torch.Tensor: """ - The inputs shape should be 3D, but Sundial only supports 2D tensor: [batch_size, sequence_length], - we need to squeeze the target_count dimension. + Preprocess the input data by converting it to a 2D tensor (Sundial only supports 2D inputs). + + Parameters: + inputs (list): A list of dictionaries containing input data, + where each dictionary includes a "targets" key. + **infer_kwargs: Additional keyword arguments passed to the method. + + Returns: + torch.Tensor: A 2D tensor with shape [batch_size, input_length] after squeezing + the target_count dimension. + + Raises: + InferenceModelInternalException: If the model receives more than one target variable + (i.e., when inputs.shape[1] != 1). """ - inputs = super().preprocess(inputs) + model_id = self.model_info.model_id + inputs = super().preprocess(inputs, **infer_kwargs) + # Here, we assume element in list has same history_length, + # otherwise, the model cannot proceed + if inputs[0].get("past_covariates", None) or inputs[0].get( + "future_covariates", None + ): + logger.warning( + f"[Inference] Past_covariates and future_covariates will be ignored, as they are not supported for model {model_id}." + ) + + # stack the data and get a 3D-tensor:[batch_size, target_count(1), input_length] + inputs = torch.stack([data["targets"] for data in inputs], dim=0) if inputs.shape[1] != 1: raise InferenceModelInternalException( - f"[Inference] Model sundial only supports univarate forecast, but receives {inputs.shape[1]} target variables." + f"Model {model_id} only supports univariate forecast, but receives {inputs.shape[1]} target variables." ) inputs = inputs.squeeze(1) return inputs - def forecast(self, inputs, **infer_kwargs): - predict_length = infer_kwargs.get("predict_length", 96) + def forecast(self, inputs: torch.Tensor, **infer_kwargs) -> torch.Tensor: + """ + Generate forecasted outputs using the Sundial model based on the provided inputs. + + Parameters: + inputs (torch.Tensor): A 2D tensor of shape [batch_size, input_length]. + **infer_kwargs: Additional inference parameters: + - output_length (int): The length of the forecast output (default: 96). + - num_samples (int): The number of samples to generate (default: 10). + - revin (bool): Whether to apply revin (default: True). + + Returns: + torch.Tensor: A tensor containing the forecasted outputs with shape [batch_size, num_samples, output_length]. + """ + output_length = infer_kwargs.get("output_length", 96) num_samples = infer_kwargs.get("num_samples", 10) revin = infer_kwargs.get("revin", True) outputs = self.model.generate( inputs, - max_new_tokens=predict_length, + max_new_tokens=output_length, num_samples=num_samples, revin=revin, ) return outputs - def postprocess(self, outputs: torch.Tensor): + def postprocess(self, outputs: torch.Tensor, **infer_kwargs) -> list[torch.Tensor]: """ - The outputs shape should be 3D, we need to take the mean value across num_samples dimension and expand dims. + Postprocess the model's output by averaging across the num_samples dimension and + expanding the dimensions to match the expected shape. + + Parameters: + outputs (torch.Tensor): The raw output 3D-tensor from the model with shape [batch_size, num_samples, output_length]. + **infer_kwargs: Additional inference parameters passed to the method. + + Returns: + list of torch.Tensor: A list of 2D tensors with shape [target_count(1), output_length]. """ outputs = outputs.mean(dim=1).unsqueeze(1) - outputs = super().postprocess(outputs) + outputs = [outputs[i] for i in range(outputs.size(0))] + outputs = super().postprocess(outputs, **infer_kwargs) return outputs diff --git a/iotdb-core/ainode/iotdb/ainode/core/model/timer_xl/pipeline_timer.py b/iotdb-core/ainode/iotdb/ainode/core/model/timer_xl/pipeline_timer.py index 041cb8c7cbae..bb54eed4ec6e 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/model/timer_xl/pipeline_timer.py +++ b/iotdb-core/ainode/iotdb/ainode/core/model/timer_xl/pipeline_timer.py @@ -20,37 +20,83 @@ from iotdb.ainode.core.exception import InferenceModelInternalException from iotdb.ainode.core.inference.pipeline.basic_pipeline import ForecastPipeline +from iotdb.ainode.core.log import Logger +from iotdb.ainode.core.model.model_info import ModelInfo + +logger = Logger() class TimerPipeline(ForecastPipeline): - def __init__(self, model_info, **model_kwargs): + def __init__(self, model_info: ModelInfo, **model_kwargs): super().__init__(model_info, model_kwargs=model_kwargs) - def preprocess(self, inputs): + def preprocess(self, inputs, **infer_kwargs) -> torch.Tensor: """ - The inputs shape should be 3D, but Timer-XL only supports 2D tensor: [batch_size, sequence_length], - we need to squeeze the target_count dimension. + Preprocess the input data by converting it to a 2D tensor (Timer-XL only supports 2D inputs). + + Parameters: + inputs (list): A list of dictionaries containing input data, + where each dictionary should include a "targets" key. + **infer_kwargs: Additional keyword arguments passed to the method. + + Returns: + torch.Tensor: A 2D tensor of shape [batch_size, input_length] after squeezing + the target_count dimension. + + Raises: + InferenceModelInternalException: If the model receives more than one target variable + (i.e., when inputs.shape[1] != 1). """ - inputs = super().preprocess(inputs) + model_id = self.model_info.model_id + inputs = super().preprocess(inputs, **infer_kwargs) + # Here, we assume element in list has same history_length, + # otherwise, the model cannot proceed + if inputs[0].get("past_covariates", None) or inputs[0].get( + "future_covariates", None + ): + logger.warning( + f"[Inference] Past_covariates and future_covariates will be ignored, as they are not supported for model {model_id}." + ) + + # stack the data and get a 3D-tensor:[batch_size, target_count(1), input_length] + inputs = torch.stack([data["targets"] for data in inputs], dim=0) if inputs.shape[1] != 1: raise InferenceModelInternalException( - f"[Inference] Model timer_xl only supports univarate forecast, but receives {inputs.shape[1]} target variables." + f"Model {model_id} only supports univariate forecast, but receives {inputs.shape[1]} target variables." ) inputs = inputs.squeeze(1) return inputs - def forecast(self, inputs, **infer_kwargs): - predict_length = infer_kwargs.get("predict_length", 96) + def forecast(self, inputs: torch.Tensor, **infer_kwargs) -> torch.Tensor: + """ + Generate forecasted outputs using the model based on the provided inputs. + + Parameters: + inputs (torch.Tensor): A 2D tensor of shape [batch_size, input_length]. + **infer_kwargs: Additional inference parameters: + - output_length (int): The length of the forecast output (default: 96). + - revin (bool): Whether to apply revin (default: True). + + Returns: + torch.Tensor: A tensor containing the forecasted outputs, with shape [batch_size, output_length]. + """ + output_length = infer_kwargs.get("output_length", 96) revin = infer_kwargs.get("revin", True) - outputs = self.model.generate( - inputs, max_new_tokens=predict_length, revin=revin - ) + outputs = self.model.generate(inputs, max_new_tokens=output_length, revin=revin) return outputs - def postprocess(self, outputs: torch.Tensor): + def postprocess(self, outputs: torch.Tensor, **infer_kwargs) -> list[torch.Tensor]: """ - The outputs shape should be 3D, so we need to expand dims. + Postprocess the model's output by expanding its dimensions to match the expected shape. + + Parameters: + outputs (torch.Tensor): The raw output 2D-tensor from the model with shape [batch_size, output_length]. + **infer_kwargs: Additional inference parameters passed to the method. + + Returns: + list of torch.Tensor: A list of 2D tensors with shape [target_count(1), output_length]. """ - outputs = super().postprocess(outputs.unsqueeze(1)) + outputs = [outputs[i].unsqueeze(0) for i in range(outputs.size(0))] + outputs = super().postprocess(outputs, **infer_kwargs) return outputs diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java index 950d8c464e81..dcb27825e314 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/ForecastTableFunction.java @@ -77,14 +77,14 @@ public class ForecastTableFunction implements TableFunction { public static class ForecastTableFunctionHandle implements TableFunctionHandle { - String modelId; - int maxInputLength; - int outputLength; - long outputStartTime; - long outputInterval; - boolean keepInput; - Map options; - List targetColumntypes; + protected String modelId; + protected int maxInputLength; + protected int outputLength; + protected long outputStartTime; + protected long outputInterval; + protected boolean keepInput; + protected Map options; + protected List targetColumntypes; public ForecastTableFunctionHandle() {} @@ -182,22 +182,22 @@ public int hashCode() { } } - private static final String TARGETS_PARAMETER_NAME = "TARGETS"; - private static final String MODEL_ID_PARAMETER_NAME = "MODEL_ID"; - private static final String OUTPUT_LENGTH_PARAMETER_NAME = "OUTPUT_LENGTH"; - private static final int DEFAULT_OUTPUT_LENGTH = 96; - private static final String OUTPUT_START_TIME = "OUTPUT_START_TIME"; + protected static final String TARGETS_PARAMETER_NAME = "TARGETS"; + protected static final String MODEL_ID_PARAMETER_NAME = "MODEL_ID"; + protected static final String OUTPUT_LENGTH_PARAMETER_NAME = "OUTPUT_LENGTH"; + protected static final int DEFAULT_OUTPUT_LENGTH = 96; + protected static final String OUTPUT_START_TIME = "OUTPUT_START_TIME"; public static final long DEFAULT_OUTPUT_START_TIME = Long.MIN_VALUE; - private static final String OUTPUT_INTERVAL = "OUTPUT_INTERVAL"; + protected static final String OUTPUT_INTERVAL = "OUTPUT_INTERVAL"; public static final long DEFAULT_OUTPUT_INTERVAL = 0L; public static final String TIMECOL_PARAMETER_NAME = "TIMECOL"; - private static final String DEFAULT_TIME_COL = "time"; - private static final String KEEP_INPUT_PARAMETER_NAME = "PRESERVE_INPUT"; - private static final Boolean DEFAULT_KEEP_INPUT = Boolean.FALSE; - private static final String IS_INPUT_COLUMN_NAME = "is_input"; - private static final String OPTIONS_PARAMETER_NAME = "MODEL_OPTIONS"; - private static final String DEFAULT_OPTIONS = ""; - private static final int MAX_INPUT_LENGTH = 2880; + protected static final String DEFAULT_TIME_COL = "time"; + protected static final String KEEP_INPUT_PARAMETER_NAME = "PRESERVE_INPUT"; + protected static final Boolean DEFAULT_KEEP_INPUT = Boolean.FALSE; + protected static final String IS_INPUT_COLUMN_NAME = "is_input"; + protected static final String OPTIONS_PARAMETER_NAME = "MODEL_OPTIONS"; + protected static final String DEFAULT_OPTIONS = ""; + protected static final int MAX_INPUT_LENGTH = 2880; private static final String INVALID_OPTIONS_FORMAT = "Invalid options: %s"; @@ -366,7 +366,7 @@ public TableFunctionDataProcessor getDataProcessor() { } // only allow for INT32, INT64, FLOAT, DOUBLE - private void checkType(Type type, String columnName) { + public void checkType(Type type, String columnName) { if (!ALLOWED_INPUT_TYPES.contains(type)) { throw new SemanticException( String.format( @@ -375,7 +375,7 @@ private void checkType(Type type, String columnName) { } } - private static Map parseOptions(String options) { + public static Map parseOptions(String options) { if (options.isEmpty()) { return Collections.emptyMap(); } @@ -397,22 +397,22 @@ private static Map parseOptions(String options) { return optionsMap; } - private static class ForecastDataProcessor implements TableFunctionDataProcessor { + protected static class ForecastDataProcessor implements TableFunctionDataProcessor { - private static final TsBlockSerde SERDE = new TsBlockSerde(); - private static final IClientManager CLIENT_MANAGER = + protected static final TsBlockSerde SERDE = new TsBlockSerde(); + protected static final IClientManager CLIENT_MANAGER = AINodeClientManager.getInstance(); - private final String modelId; + protected final String modelId; private final int maxInputLength; - private final int outputLength; + protected final int outputLength; private final long outputStartTime; private final long outputInterval; private final boolean keepInput; - private final Map options; - private final LinkedList inputRecords; - private final List resultColumnAppenderList; - private final TsBlockBuilder inputTsBlockBuilder; + protected final Map options; + protected final LinkedList inputRecords; + protected final List resultColumnAppenderList; + protected final TsBlockBuilder inputTsBlockBuilder; public ForecastDataProcessor(ForecastTableFunctionHandle functionHandle) { this.modelId = functionHandle.modelId; @@ -536,7 +536,7 @@ public void finish( } } - private TsBlock forecast() { + protected TsBlock forecast() { // construct inputTSBlock for AINode while (!inputRecords.isEmpty()) { Record row = inputRecords.removeFirst(); diff --git a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift index ea32f01b6e2d..8a5971823ec2 100644 --- a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift +++ b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift @@ -85,7 +85,9 @@ struct TForecastReq { 1: required string modelId 2: required binary inputData 3: required i32 outputLength - 4: optional map options + 4: optional string historyCovs + 5: optional string futureCovs + 6: optional map options } struct TForecastResp {