Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions iotdb-core/ainode/iotdb/ainode/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from iotdb.ainode.core.constant import (
AINODE_BUILD_INFO,
AINODE_BUILTIN_MODELS_DIR,
AINODE_CLUSTER_INGRESS_ADDRESS,
AINODE_CLUSTER_INGRESS_PASSWORD,
AINODE_CLUSTER_INGRESS_PORT,
Expand All @@ -31,7 +30,6 @@
AINODE_CONF_FILE_NAME,
AINODE_CONF_GIT_FILE_NAME,
AINODE_CONF_POM_FILE_NAME,
AINODE_FINETUNE_MODELS_DIR,
AINODE_INFERENCE_BATCH_INTERVAL_IN_MS,
AINODE_INFERENCE_EXTRA_MEMORY_RATIO,
AINODE_INFERENCE_MAX_PREDICT_LENGTH,
Expand All @@ -45,7 +43,6 @@
AINODE_SYSTEM_FILE_NAME,
AINODE_TARGET_CONFIG_NODE_LIST,
AINODE_THRIFT_COMPRESSION_ENABLED,
AINODE_USER_DEFINED_MODELS_DIR,
AINODE_VERSION_INFO,
)
from iotdb.ainode.core.exception import BadNodeUrlError
Expand Down Expand Up @@ -97,9 +94,6 @@ def __init__(self):

# Directory to save models
self._ain_models_dir = AINODE_MODELS_DIR
self._ain_builtin_models_dir = AINODE_BUILTIN_MODELS_DIR
self._ain_finetune_models_dir = AINODE_FINETUNE_MODELS_DIR
self._ain_user_defined_models_dir = AINODE_USER_DEFINED_MODELS_DIR
self._ain_system_dir = AINODE_SYSTEM_DIR

# Whether to enable compression for thrift
Expand Down Expand Up @@ -208,24 +202,6 @@ def get_ain_models_dir(self) -> str:
def set_ain_models_dir(self, ain_models_dir: str) -> None:
self._ain_models_dir = ain_models_dir

def get_ain_builtin_models_dir(self) -> str:
return self._ain_builtin_models_dir

def set_ain_builtin_models_dir(self, ain_builtin_models_dir: str) -> None:
self._ain_builtin_models_dir = ain_builtin_models_dir

def get_ain_finetune_models_dir(self) -> str:
return self._ain_finetune_models_dir

def set_ain_finetune_models_dir(self, ain_finetune_models_dir: str) -> None:
self._ain_finetune_models_dir = ain_finetune_models_dir

def get_ain_user_defined_models_dir(self) -> str:
return self._ain_user_defined_models_dir

def set_ain_user_defined_models_dir(self, ain_user_defined_models_dir: str) -> None:
self._ain_user_defined_models_dir = ain_user_defined_models_dir

def get_ain_system_dir(self) -> str:
return self._ain_system_dir

Expand Down
16 changes: 1 addition & 15 deletions iotdb-core/ainode/iotdb/ainode/core/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,23 +51,14 @@
AINODE_INFERENCE_MAX_PREDICT_LENGTH = 2880
AINODE_INFERENCE_MODEL_MEM_USAGE_MAP = {
"sundial": 1036 * 1024**2, # 1036 MiB
"timerxl": 856 * 1024**2, # 856 MiB
"timer_xl": 856 * 1024**2, # 856 MiB
} # the memory usage of each model in bytes
AINODE_INFERENCE_MEMORY_USAGE_RATIO = 0.4 # the device space allocated for inference
AINODE_INFERENCE_EXTRA_MEMORY_RATIO = (
1.2 # the overhead ratio for inference, used to estimate the pool size
)

AINODE_MODELS_DIR = os.path.join(IOTDB_AINODE_HOME, "data/ainode/models")
AINODE_BUILTIN_MODELS_DIR = os.path.join(
IOTDB_AINODE_HOME, "data/ainode/models/builtin"
) # For built-in models, we only need to store their weights and config.
AINODE_FINETUNE_MODELS_DIR = os.path.join(
IOTDB_AINODE_HOME, "data/ainode/models/finetune"
)
AINODE_USER_DEFINED_MODELS_DIR = os.path.join(
IOTDB_AINODE_HOME, "data/ainode/models/user_defined"
)
AINODE_SYSTEM_DIR = "data/ainode/system"
AINODE_LOG_DIR = "logs"

Expand All @@ -80,11 +71,6 @@
"log_inference_rank_{}_" # example: log_inference_rank_0_all.log
)

# AINode model management
MODEL_WEIGHTS_FILE_IN_SAFETENSORS = "model.safetensors"
MODEL_CONFIG_FILE_IN_JSON = "config.json"
MODEL_WEIGHTS_FILE_IN_PT = "model.pt"
MODEL_CONFIG_FILE_IN_YAML = "config.yaml"
DEFAULT_CHUNK_SIZE = 8192


Expand Down
5 changes: 1 addition & 4 deletions iotdb-core/ainode/iotdb/ainode/core/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
#
import re

from iotdb.ainode.core.constant import (
MODEL_CONFIG_FILE_IN_YAML,
MODEL_WEIGHTS_FILE_IN_PT,
)
from iotdb.ainode.core.model.model_constants import MODEL_WEIGHTS_FILE_IN_PT, MODEL_CONFIG_FILE_IN_YAML


class _BaseError(Exception):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from iotdb.ainode.core.constant import INFERENCE_LOG_FILE_NAME_PREFIX_TEMPLATE
from iotdb.ainode.core.inference.batcher.basic_batcher import BasicBatcher
from iotdb.ainode.core.inference.inference_request import InferenceRequest
from iotdb.ainode.core.inference.pipeline import get_pipeline
from iotdb.ainode.core.inference.pipeline.pipeline_loader import load_pipeline
from iotdb.ainode.core.inference.request_scheduler.basic_request_scheduler import (
BasicRequestScheduler,
)
Expand Down Expand Up @@ -82,7 +82,7 @@ def __init__(
self._batcher = BasicBatcher()
self._stop_event = mp.Event()

# self._inference_pipeline = get_pipeline(self.model_info.model_id, self.device)
self._inference_pipeline = None

self._logger = None

Expand Down Expand Up @@ -119,7 +119,6 @@ def _step(self):
batch_output = self._inference_pipeline.infer(
batch_inputs,
predict_length=requests[0].max_new_tokens,
# num_samples=10,
revin=True,
)
offset = 0
Expand All @@ -128,12 +127,9 @@ def _step(self):
cur_batch_size = request.batch_size
cur_output = batch_output[offset : offset + cur_batch_size]
offset += cur_batch_size
# request.write_step_output(cur_output.mean(dim=1))
request.write_step_output(cur_output)

# self._inference_pipeline.post_decode()
if request.is_finished():
# self._inference_pipeline.post_inference()
# ensure the output tensor is on CPU before sending to result queue
request.output_tensor = request.output_tensor.cpu()
self._finished_queue.put(request)
Expand All @@ -157,7 +153,7 @@ def run(self):
INFERENCE_LOG_FILE_NAME_PREFIX_TEMPLATE.format(self.device)
)
self._request_scheduler.device = self.device
self._inference_pipeline = get_pipeline(self.model_info.model_id, self.device)
self._inference_pipeline = load_pipeline(self.model_info, str(self.device))
self.ready_event.set()

activate_daemon = threading.Thread(
Expand Down
11 changes: 0 additions & 11 deletions iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,3 @@
# under the License.
#

from iotdb.ainode.core.inference.pipeline.sundial_pipeline import SundialPipeline
from iotdb.ainode.core.inference.pipeline.timerxl_pipeline import TimerxlPipeline


def get_pipeline(model_id, device):
if model_id == "timerxl":
return TimerxlPipeline(model_id, device=device)
elif model_id == "sundial":
return SundialPipeline(model_id, device=device)
else:
raise ValueError(f"Unsupported model_id: {model_id} with pipeline")
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@
import torch

from iotdb.ainode.core.exception import InferenceModelInternalError
from iotdb.ainode.core.manager.model_manager import get_model_manager
from iotdb.ainode.core.manager.model_manager import ModelManager


class BasicPipeline(ABC):
def __init__(self, model_id, **infer_kwargs):
self.model_id = model_id
self.device = infer_kwargs.get("device", "cpu")
# self.model = get_model_manager().load_model(model_id).to(self.device)
self.model = get_model_manager().load_model(
model_id, device_map=str(self.device)
self.model = ModelManager().load_model(
model_id, device_map=self.device
)

def _preprocess(self, inputs):
Expand All @@ -40,15 +39,6 @@ def _preprocess(self, inputs):
# TODO: Integrate with the data processing pipeline operators
pass

def infer(self, inputs):
pass

def _post_decode(self):
"""
Post-process the outputs after each decode step.
"""
pass

def _postprocess(self, output: torch.Tensor):
"""
Post-process the outputs after the entire inference task.
Expand All @@ -70,9 +60,6 @@ def _preprocess(self, inputs):
def forecast(self, inputs, **infer_kwargs):
pass

def _post_decode(self):
pass

def _postprocess(self, output: torch.Tensor):
pass

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from pathlib import Path

from iotdb.ainode.core.log import Logger
from iotdb.ainode.core.model.model_constants import ModelCategory
from iotdb.ainode.core.model.model_storage import ModelInfo
from iotdb.ainode.core.model.utils import temporary_sys_path, import_class_from_path

logger = Logger()


def load_pipeline(model_info: ModelInfo, device: str, **kwargs):
if model_info.category == ModelCategory.BUILTIN:
if model_info.model_id == "timer_xl":
from iotdb.ainode.core.model.timer_xl.pipeline_timer import TimerPipeline
pipeline_cls = TimerPipeline
elif model_info.model_id == "sundial":
from iotdb.ainode.core.model.sundial.pipeline_sundial import SundialPipeline
pipeline_cls = SundialPipeline
else:
logger.error(
f"Unsupported built-in model {model_info.model_id}."
)
return None
else:
module_parent = str(Path(model_info.path).parent.absolute())
with temporary_sys_path(module_parent):
pipeline_cls = import_class_from_path(
model_info.model_id, model_info.pipeline_cls
)

return pipeline_cls(model_info.model_id, device=device)
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
ScaleActionType,
)
from iotdb.ainode.core.log import Logger
from iotdb.ainode.core.manager.model_manager import get_model_manager, ModelManager
from iotdb.ainode.core.manager.model_manager import ModelManager
from iotdb.ainode.core.manager.utils import (
INFERENCE_EXTRA_MEMORY_RATIO,
INFERENCE_MEMORY_USAGE_RATIO,
Expand Down
12 changes: 5 additions & 7 deletions iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import threading
import time
from abc import ABC, abstractmethod
from typing import Dict

import pandas as pd
Expand All @@ -29,19 +28,17 @@
from iotdb.ainode.core.constant import TSStatusCode
from iotdb.ainode.core.exception import (
InferenceModelInternalError,
InvalidWindowArgumentError,
NumericalRangeException,
runtime_error_extractor,
)
from iotdb.ainode.core.inference.inference_request import (
InferenceRequest,
InferenceRequestProxy,
)
from iotdb.ainode.core.inference.pipeline import get_pipeline
from iotdb.ainode.core.inference.pipeline.pipeline_loader import load_pipeline
from iotdb.ainode.core.inference.pool_controller import PoolController
from iotdb.ainode.core.inference.utils import generate_req_id
from iotdb.ainode.core.log import Logger
from iotdb.ainode.core.manager.model_manager import get_model_manager
from iotdb.ainode.core.manager.model_manager import ModelManager
from iotdb.ainode.core.rpc.status import get_status
from iotdb.ainode.core.util.gpu_mapping import get_available_devices
from iotdb.ainode.core.util.serde import convert_to_binary
Expand All @@ -67,7 +64,7 @@ class InferenceManager:
) # How often to check for requests in the result queue

def __init__(self):
self._model_manager = get_model_manager()
self._model_manager = ModelManager()
self._model_mem_usage_map: Dict[str, int] = (
{}
) # store model memory usage for each model
Expand Down Expand Up @@ -211,7 +208,8 @@ def _run(
outputs = self._process_request(infer_req)
outputs = convert_to_binary(pd.DataFrame(outputs[0]))
else:
inference_pipeline = get_pipeline(model_id, device="cpu")
model_info = self._model_manager.get_model_info(model_id)
inference_pipeline = load_pipeline(model_info, device="cpu")
outputs = inference_pipeline.infer(
inputs, predict_length=predict_length, **inference_attrs
)
Expand Down
Loading