Skip to content

Commit dae2d8a

Browse files
authored
[onert] Add BulkPipelineManager for multi-model orchestration (#16339)
Add new BulkPipelineManager class to coordinate execution of multiple models in sequence with proper resource management. ONE-DCO-1.0-Signed-off-by: Jonghwa Lee <[email protected]> Signed-off-by: Jonghwa Lee <[email protected]>
1 parent 5b5a249 commit dae2d8a

3 files changed

Lines changed: 340 additions & 0 deletions

File tree

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "BulkPipelineManager.h"
18+
19+
#include <iostream>
20+
#include <sstream>
21+
#include <algorithm>
22+
#include <thread>
23+
#include <chrono>
24+
25+
namespace onert
26+
{
27+
namespace backend
28+
{
29+
namespace trix
30+
{
31+
namespace ops
32+
{
33+
34+
BulkPipelineManager::BulkPipelineManager(const PipelineConfig &config) : _config(config)
35+
{
36+
// DO NOTHING
37+
}
38+
39+
BulkPipelineManager::~BulkPipelineManager() { shutdown(); }
40+
41+
bool BulkPipelineManager::initialize()
42+
{
43+
if (_initialized.load())
44+
{
45+
// Already initialized
46+
return true;
47+
}
48+
49+
try
50+
{
51+
createModels();
52+
prepareModels();
53+
54+
_initialized = true;
55+
return true;
56+
}
57+
catch (const std::exception &e)
58+
{
59+
std::cerr << "Failed to initialize pipeline: " + std::string(e.what()) << std::endl;
60+
shutdown();
61+
return false;
62+
}
63+
}
64+
65+
void BulkPipelineManager::shutdown()
66+
{
67+
if (!_initialized.load())
68+
{
69+
return;
70+
}
71+
72+
_initialized = false;
73+
74+
// Wait until all executions are finished
75+
while (_executing.load())
76+
{
77+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
78+
}
79+
80+
// Release models and clear buffer pool
81+
for (auto &model : _models)
82+
{
83+
if (model)
84+
{
85+
model->release();
86+
}
87+
}
88+
_models.clear();
89+
}
90+
91+
void BulkPipelineManager::execute(const std::vector<const IPortableTensor *> &inputs,
92+
std::vector<IPortableTensor *> &outputs)
93+
{
94+
if (!_initialized.load())
95+
{
96+
throw std::runtime_error("Pipeline is not initialized");
97+
}
98+
99+
if (_models.empty())
100+
{
101+
throw std::runtime_error("No models in pipeline");
102+
}
103+
104+
_executing = true;
105+
106+
try
107+
{
108+
auto current_inputs = inputs;
109+
auto current_outputs = outputs;
110+
111+
for (size_t i = 0; i < _models.size(); ++i)
112+
{
113+
auto &model = _models[i];
114+
if (!model || !model->isPrepared())
115+
{
116+
throw std::runtime_error("Model at index " + std::to_string(i) + " is not prepared");
117+
}
118+
119+
// Wait for buffer ready before execution
120+
model->waitForBufferReady();
121+
122+
// Execute model
123+
model->run(current_inputs, current_outputs);
124+
125+
// The input of the next model is the output of the current model
126+
if (i < _models.size() - 1)
127+
{
128+
current_inputs.clear();
129+
for (const auto &output : current_outputs)
130+
{
131+
current_inputs.push_back(const_cast<IPortableTensor *>(output));
132+
}
133+
}
134+
}
135+
}
136+
catch (...)
137+
{
138+
_executing = false;
139+
throw;
140+
}
141+
142+
_executing = false;
143+
}
144+
145+
void BulkPipelineManager::createModels()
146+
{
147+
_models.clear();
148+
_models.reserve(_config.model_paths.size());
149+
150+
for (size_t i = 0; i < _config.model_paths.size(); ++i)
151+
{
152+
auto model = std::make_shared<BulkPipelineModel>(_config.model_paths[i], _config.device_id);
153+
if (!model->initialize())
154+
{
155+
throw std::runtime_error("Failed to initialize model: " + model->modelPath());
156+
}
157+
_models.push_back(model);
158+
}
159+
}
160+
161+
void BulkPipelineManager::prepareModels()
162+
{
163+
for (auto &model : _models)
164+
{
165+
if (!model->prepare())
166+
{
167+
throw std::runtime_error("Failed to prepare model: " + model->modelPath());
168+
}
169+
}
170+
}
171+
172+
} // namespace ops
173+
} // namespace trix
174+
} // namespace backend
175+
} // namespace onert
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#ifndef __ONERT_BACKEND_TRIX_OPS_BULK_PIPE_LINE_MANAGER_H__
18+
#define __ONERT_BACKEND_TRIX_OPS_BULK_PIPE_LINE_MANAGER_H__
19+
20+
#include <memory>
21+
#include <vector>
22+
#include <string>
23+
#include <array>
24+
#include <atomic>
25+
#include <mutex>
26+
#include <exception>
27+
#include <backend/IPortableTensor.h>
28+
#include "BulkPipelineModel.h"
29+
30+
namespace onert
31+
{
32+
namespace backend
33+
{
34+
namespace trix
35+
{
36+
namespace ops
37+
{
38+
39+
class BulkPipelineManager
40+
{
41+
public:
42+
struct PipelineConfig
43+
{
44+
std::vector<std::string> model_paths;
45+
int device_id{0};
46+
};
47+
48+
public:
49+
explicit BulkPipelineManager(const PipelineConfig &config);
50+
~BulkPipelineManager();
51+
52+
// Disallow copying
53+
BulkPipelineManager(const BulkPipelineManager &) = delete;
54+
BulkPipelineManager &operator=(const BulkPipelineManager &) = delete;
55+
56+
bool initialize();
57+
void shutdown();
58+
bool isInitialized() const { return _initialized; }
59+
60+
void execute(const std::vector<const IPortableTensor *> &inputs,
61+
std::vector<IPortableTensor *> &outputs);
62+
63+
private:
64+
void createModels();
65+
void prepareModels();
66+
67+
private:
68+
PipelineConfig _config;
69+
std::atomic<bool> _initialized{false};
70+
std::atomic<bool> _executing{false};
71+
72+
std::vector<std::shared_ptr<BulkPipelineModel>> _models;
73+
};
74+
75+
} // namespace ops
76+
} // namespace trix
77+
} // namespace backend
78+
} // namespace onert
79+
80+
#endif // __ONERT_BACKEND_TRIX_OPS_BULK_PIPE_LINE_MANAGER_H__
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "../BulkPipelineManager.h"
18+
#include <gtest/gtest.h>
19+
20+
#include "mock_syscalls.h"
21+
22+
using namespace onert::backend::trix::ops;
23+
using namespace onert::backend::trix::ops::test;
24+
25+
class BulkPipelineManagerTest : public ::testing::Test
26+
{
27+
protected:
28+
void SetUp() override
29+
{
30+
BulkPipelineManager::PipelineConfig config;
31+
config.device_id = 0;
32+
config.model_paths.push_back("model_path");
33+
manager = std::make_unique<BulkPipelineManager>(config);
34+
35+
// Reset all mock syscalls before each test
36+
MockSyscallsManager::getInstance().resetAll();
37+
38+
MockSyscallsManager::getInstance().setFreadHook(
39+
[](void *ptr, size_t size, size_t, FILE *) -> int {
40+
if (size == NPUBIN_META_SIZE)
41+
{
42+
auto meta = reinterpret_cast<npubin_meta *>(ptr);
43+
meta->program_size = 1024;
44+
meta->weight_size = 1024;
45+
meta->size = 4096;
46+
}
47+
return 1;
48+
});
49+
50+
MockSyscallsManager::getInstance().setIoctlHook(
51+
[](int, unsigned long request, void *arg) -> int {
52+
// Get Version
53+
if (request == _IOR(0x88, 1, unsigned int))
54+
{
55+
// Return version 3.2.X.X for trix backend sanity checking
56+
*static_cast<int *>(arg) = 0x3020000;
57+
}
58+
return 0;
59+
});
60+
}
61+
void TearDown() override {}
62+
63+
std::unique_ptr<BulkPipelineManager> manager;
64+
};
65+
66+
TEST_F(BulkPipelineManagerTest, test_initilize)
67+
{
68+
EXPECT_TRUE(manager->initialize());
69+
EXPECT_TRUE(manager->isInitialized());
70+
}
71+
72+
TEST_F(BulkPipelineManagerTest, test_shutdown)
73+
{
74+
EXPECT_TRUE(manager->initialize());
75+
manager->shutdown();
76+
EXPECT_FALSE(manager->isInitialized());
77+
}
78+
79+
TEST_F(BulkPipelineManagerTest, test_execute)
80+
{
81+
EXPECT_TRUE(manager->initialize());
82+
const std::vector<const onert::backend::IPortableTensor *> inputs;
83+
std::vector<onert::backend::IPortableTensor *> outputs;
84+
EXPECT_NO_THROW(manager->execute(inputs, outputs));
85+
}

0 commit comments

Comments
 (0)