Skip to content

Commit 5ee9729

Browse files
committed
[onert] Implement BulkPipeline operation for multi-model execution
Implement BulkPipeline operation to enable efficient multi-model execution with pipeline parallelism. This adds comprehensive pipeline management including buffer sharing, asynchronous execution, and model coordination for improved performance. ONE-DCO-1.0-Signed-off-by: Jonghwa Lee <jonghwa3.lee@samsung.com>
1 parent 5451c96 commit 5ee9729

9 files changed

Lines changed: 1362 additions & 5 deletions

runtime/onert/backend/trix/KernelGenerator.cc

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "KernelGenerator.h"
1818

19+
#include "ops/BulkPipeLayer.h"
1920
#include "ops/BulkLayer.h"
2021

2122
#include <backend/Backend.h>
@@ -66,11 +67,20 @@ void KernelGenerator::visit(const ir::operation::Bulk &node)
6667
// parameters
6768
const auto &binary_path = node.param().binary_path;
6869

69-
auto fn = std::make_unique<ops::BulkLayer>();
70-
71-
fn->configure(input_tensors, output_tensors, binary_path.front(), _dev_context);
72-
73-
_return_fn = std::move(fn);
70+
if (binary_path.size() == 1)
71+
{
72+
// For single model execution
73+
auto fn = std::make_unique<ops::BulkLayer>();
74+
fn->configure(input_tensors, output_tensors, binary_path.front(), _dev_context);
75+
_return_fn = std::move(fn);
76+
}
77+
else
78+
{
79+
// For pipeline execution (multiple models)
80+
auto fn = std::make_unique<ops::BulkPipeLayer>();
81+
fn->configure(input_tensors, output_tensors, binary_path);
82+
_return_fn = std::move(fn);
83+
}
7484
}
7585

7686
} // namespace onert::backend::trix
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "BulkPipeLayer.h"
18+
19+
#include <iostream>
20+
#include <memory>
21+
22+
namespace onert
23+
{
24+
namespace backend
25+
{
26+
namespace trix
27+
{
28+
namespace ops
29+
{
30+
31+
BulkPipeLayer::BulkPipeLayer() : _inputs(), _outputs()
32+
{
33+
// DO NOTHING
34+
}
35+
36+
BulkPipeLayer::~BulkPipeLayer()
37+
{
38+
// DO NOTHING - _pipeline_manager will be automatically cleaned up by unique_ptr
39+
}
40+
41+
void BulkPipeLayer::configure(const std::vector<const IPortableTensor *> &inputs,
42+
std::vector<IPortableTensor *> &outputs,
43+
const std::vector<std::string> &binary_path)
44+
{
45+
_inputs = inputs;
46+
_outputs = outputs;
47+
48+
// Configure BulkPipeLineManager
49+
BulkPipelineManager::PipelineConfig config;
50+
config.model_paths = binary_path;
51+
config.device_id = 0; // default device id = 0
52+
config.buffer_pool_size = 2; // Use 2 buffers
53+
54+
_pipeline_manager = std::make_unique<BulkPipelineManager>(config);
55+
56+
if (!_pipeline_manager->initialize())
57+
{
58+
throw std::runtime_error("Failed to initialize bulk pipeline manager");
59+
}
60+
}
61+
62+
void BulkPipeLayer::run()
63+
{
64+
if (!_pipeline_manager)
65+
{
66+
throw std::runtime_error("Pipeline manager is not initialized");
67+
}
68+
69+
try
70+
{
71+
_pipeline_manager->execute(_inputs, _outputs);
72+
}
73+
catch (const std::exception &e)
74+
{
75+
std::cerr << "BulkPipeLayer execution failed: " << e.what() << std::endl;
76+
throw;
77+
}
78+
}
79+
80+
void BulkPipeLayer::prepare()
81+
{
82+
// DO NOTHING
83+
}
84+
85+
} // namespace ops
86+
} // namespace trix
87+
} // namespace backend
88+
} // namespace onert
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#ifndef __ONERT_BACKEND_TRIX_OPS_BULKPIPELAYER_H__
18+
#define __ONERT_BACKEND_TRIX_OPS_BULKPIPELAYER_H__
19+
20+
#include <backend/IPortableTensor.h>
21+
#include "../DevContext.h"
22+
#include <exec/IFunction.h>
23+
#include "BulkPipelineManager.h"
24+
25+
namespace onert
26+
{
27+
namespace backend
28+
{
29+
namespace trix
30+
{
31+
namespace ops
32+
{
33+
34+
class BulkPipeLayer : public ::onert::exec::IFunction
35+
{
36+
public:
37+
BulkPipeLayer();
38+
~BulkPipeLayer() override;
39+
40+
public:
41+
void configure(const std::vector<const IPortableTensor *> &inputs,
42+
std::vector<IPortableTensor *> &outputs,
43+
const std::vector<std::string> &binary_path);
44+
45+
void run() override;
46+
47+
void prepare() override;
48+
49+
private:
50+
std::vector<const IPortableTensor *> _inputs;
51+
std::vector<IPortableTensor *> _outputs;
52+
53+
// Pipeline manager
54+
std::unique_ptr<BulkPipelineManager> _pipeline_manager;
55+
};
56+
57+
} // namespace ops
58+
} // namespace trix
59+
} // namespace backend
60+
} // namespace onert
61+
62+
#endif // __ONERT_BACKEND_TRIX_OPS_BULKPIPELAYER_H__
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "BulkPipelineBuffer.h"
18+
19+
#include <fcntl.h>
20+
#include <sys/ioctl.h>
21+
#include <sys/mman.h>
22+
#include <unistd.h>
23+
#include <cstring>
24+
#include <iostream>
25+
26+
namespace onert
27+
{
28+
namespace backend
29+
{
30+
namespace trix
31+
{
32+
namespace ops
33+
{
34+
35+
// FIXME: Using higher level API instead of raw API
36+
struct trix_ioctl_hwmem
37+
{
38+
int32_t type;
39+
uint64_t size;
40+
int32_t dbuf_fd;
41+
} __attribute__((packed));
42+
43+
#define TRIX_IOCTL_HWMEM_ALLOC _IOW(136, 21, struct trix_ioctl_hwmem)
44+
#define TRIX_IOCTL_HWMEM_DEALLOC _IOW(136, 22, struct trix_ioctl_hwmem)
45+
46+
BulkPipelineBuffer::BulkPipelineBuffer(BufferType type, size_t size, int device_id)
47+
: _type(type), _size(size), _device_id(device_id), _memory_type(getMemoryType())
48+
{
49+
// DO NOTHING
50+
}
51+
52+
BulkPipelineBuffer::~BulkPipelineBuffer() { deallocate(); }
53+
54+
BulkPipelineBuffer::BulkPipelineBuffer(BulkPipelineBuffer &&other) noexcept
55+
: _type(other._type), _size(other._size), _device_id(other._device_id), _dev_fd(other._dev_fd),
56+
_buffer(other._buffer), _memory_type(other._memory_type)
57+
{
58+
other._dev_fd = -1;
59+
other._buffer = nullptr;
60+
}
61+
62+
BulkPipelineBuffer &BulkPipelineBuffer::operator=(BulkPipelineBuffer &&other) noexcept
63+
{
64+
if (this != &other)
65+
{
66+
deallocate();
67+
68+
_type = other._type;
69+
_size = other._size;
70+
_device_id = other._device_id;
71+
_dev_fd = other._dev_fd;
72+
_buffer = other._buffer;
73+
_memory_type = other._memory_type;
74+
75+
other._dev_fd = -1;
76+
other._buffer = nullptr;
77+
}
78+
return *this;
79+
}
80+
81+
void *BulkPipelineBuffer::data() const { return _buffer ? _buffer->addr : nullptr; }
82+
83+
size_t BulkPipelineBuffer::size() const { return _buffer ? _buffer->size : 0; }
84+
85+
int BulkPipelineBuffer::dmabufFd() const { return _buffer ? _buffer->dmabuf : -1; }
86+
87+
bool BulkPipelineBuffer::isReady() const { return _buffer && _buffer->addr != nullptr; }
88+
89+
void BulkPipelineBuffer::allocate()
90+
{
91+
if (_buffer && _buffer->addr != nullptr)
92+
{
93+
// Already allocated
94+
return;
95+
}
96+
97+
if (!_buffer)
98+
{
99+
_buffer = new generic_buffer{};
100+
}
101+
102+
// Open the devbice
103+
char devname[16];
104+
snprintf(devname, sizeof(devname), "/dev/triv2-%d", _device_id);
105+
_dev_fd = open(devname, O_RDWR);
106+
if (_dev_fd < 0)
107+
{
108+
throw BufferAllocationException("Failed to open NPU device: " + std::string(devname));
109+
}
110+
111+
// Allocate a buffer
112+
struct trix_ioctl_hwmem hwmem;
113+
hwmem.type = (_memory_type == MemoryType::DMABUF_CONT) ? 0 : 1;
114+
hwmem.size = getAlignedSize(_size);
115+
116+
_buffer->dmabuf = ioctl(_dev_fd, TRIX_IOCTL_HWMEM_ALLOC, &hwmem);
117+
if (_buffer->dmabuf < 0)
118+
{
119+
close(_dev_fd);
120+
_dev_fd = -1;
121+
throw BufferAllocationException("Failed to allocate DMA buffer, size: " +
122+
std::to_string(hwmem.size));
123+
}
124+
125+
// Mapping the buffer
126+
_buffer->addr = mmap(nullptr, hwmem.size, PROT_READ | PROT_WRITE, MAP_SHARED, _buffer->dmabuf, 0);
127+
if (_buffer->addr == MAP_FAILED)
128+
{
129+
close(_buffer->dmabuf);
130+
close(_dev_fd);
131+
_buffer->dmabuf = -1;
132+
_dev_fd = -1;
133+
_buffer->addr = nullptr;
134+
throw BufferAllocationException("Failed to mmap DMA buffer");
135+
}
136+
137+
_buffer->size = _size;
138+
_buffer->type = BUFFER_DMABUF;
139+
}
140+
141+
void BulkPipelineBuffer::deallocate()
142+
{
143+
if (!_buffer)
144+
{
145+
return;
146+
}
147+
148+
if (_buffer->addr != nullptr)
149+
{
150+
size_t aligned_sz = getAlignedSize(_buffer->size);
151+
munmap(_buffer->addr, aligned_sz);
152+
_buffer->addr = nullptr;
153+
}
154+
155+
if (_buffer->dmabuf >= 0)
156+
{
157+
struct trix_ioctl_hwmem hwmem;
158+
hwmem.dbuf_fd = _buffer->dmabuf;
159+
ioctl(_dev_fd, TRIX_IOCTL_HWMEM_DEALLOC, &hwmem);
160+
close(_buffer->dmabuf);
161+
_buffer->dmabuf = -1;
162+
}
163+
164+
if (_dev_fd >= 0)
165+
{
166+
close(_dev_fd);
167+
_dev_fd = -1;
168+
}
169+
170+
delete _buffer;
171+
_buffer = nullptr;
172+
}
173+
174+
void BulkPipelineBuffer::fillFromFile(FILE *fp, size_t offset)
175+
{
176+
if (!isReady())
177+
{
178+
throw BufferOperationException("Buffer is not allocated");
179+
}
180+
181+
if (!fp)
182+
{
183+
throw BufferOperationException("Invalid file pointer");
184+
}
185+
186+
if (fseek(fp, static_cast<long>(offset), SEEK_SET) != 0)
187+
{
188+
throw BufferOperationException("Failed to seek file to offset: " + std::to_string(offset));
189+
}
190+
191+
if (fread(_buffer->addr, _buffer->size, 1, fp) != 1)
192+
{
193+
throw BufferOperationException("Failed to read " + std::to_string(_buffer->size) +
194+
" bytes from file");
195+
}
196+
}
197+
198+
size_t BulkPipelineBuffer::getAlignedSize(size_t size) const
199+
{
200+
// 4 KB (= Page size) aligned size
201+
constexpr size_t _4KB_M_1 = (1 << 12) - 1;
202+
return (size + _4KB_M_1) & ~_4KB_M_1;
203+
}
204+
205+
BulkPipelineBuffer::MemoryType BulkPipelineBuffer::getMemoryType() const
206+
{
207+
return (_type == BufferType::PROGRAM) ? MemoryType::DMABUF_CONT : MemoryType::DMABUF_IOMMU;
208+
}
209+
210+
} // namespace ops
211+
} // namespace trix
212+
} // namespace backend
213+
} // namespace onert

0 commit comments

Comments
 (0)