Skip to content

Commit 60ade2e

Browse files
committed
Check graph pool
TODO: -> this requires additional patch in MP to reset initialized_ flag in CalculatorGraph and verify if that works. Previous MP tests with reruns worked due to using AddVectorSink which changes the underlying graph and does not use OutputStreamPollers. Need to verify if change in MP will enable graph pool or we need to go back to thread pool.
1 parent abf5c1f commit 60ade2e

9 files changed

+196
-44
lines changed

Diff for: src/BUILD

+15
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,20 @@ cc_library(
156156
visibility = ["//visibility:public",],
157157
linkopts = [],
158158
)
159+
cc_library(
160+
name = "mediapipe_internal_graphqueue",
161+
hdrs = ["mediapipe_internal/graphqueue.hpp"],
162+
# srcs = ["mediapipe_internal/graphqueue.cpp"],
163+
deps = [
164+
"libovms_queue",
165+
"libovmslogging",
166+
"//third_party:openvino",
167+
"@mediapipe//mediapipe/framework:calculator_graph",
168+
],
169+
copts = [],
170+
visibility = ["//visibility:public",],
171+
linkopts = [],
172+
)
159173
cc_library(
160174
name = "libovms_ovinferrequestsqueue",
161175
hdrs = ["ovinferrequestsqueue.hpp"],
@@ -538,6 +552,7 @@ cc_library(
538552
})
539553
+ select({
540554
"//conditions:default": [
555+
"mediapipe_internal_graphqueue",
541556
"@mediapipe_calculators//:mediapipe_calculators", # Need this dependencies here because we use ovms/src - cannot add in ovms_dependencies because we copy src directory later in Dockerfile
542557
"@mediapipe//mediapipe/graphs/holistic_tracking:holistic_tracking_to_render_data",
543558
"@mediapipe//mediapipe/graphs/iris_tracking:iris_tracking_cpu_deps",

Diff for: src/mediapipe_internal/graphqueue.hpp

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//*****************************************************************************
2+
// Copyright 2021 Intel Corporation
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//*****************************************************************************
16+
#pragma once
17+
18+
#include <atomic>
19+
#include <condition_variable>
20+
#include <future>
21+
#include <memory>
22+
#include <mutex>
23+
#include <optional>
24+
#include <queue>
25+
#include <thread>
26+
#include <utility>
27+
#include <vector>
28+
29+
#include "../queue.hpp"
30+
31+
#include "mediapipe/framework/calculator_graph.h"
32+
#include "mediapipe/framework/port/status.h"
33+
namespace ovms {
34+
35+
class GraphQueue : public Queue<std::unique_ptr<::mediapipe::CalculatorGraph>> {
36+
public:
37+
/**
38+
* @brief Allocating idle stream for execution
39+
*/
40+
std::future<int> getIdleStream() {
41+
// OVMS_PROFILE_FUNCTION();
42+
int value;
43+
std::promise<int> idleStreamPromise;
44+
std::future<int> idleStreamFuture = idleStreamPromise.get_future();
45+
std::unique_lock<std::mutex> lk(front_mut);
46+
if (streams[front_idx] < 0) { // we need to wait for any idle stream to be returned
47+
std::unique_lock<std::mutex> queueLock(queue_mutex);
48+
promises.push(std::move(idleStreamPromise));
49+
} else { // we can give idle stream right away
50+
value = streams[front_idx];
51+
streams[front_idx] = -1; // negative value indicate consumed vector index
52+
front_idx = (front_idx + 1) % streams.size();
53+
lk.unlock();
54+
idleStreamPromise.set_value(value);
55+
}
56+
return idleStreamFuture;
57+
}
58+
59+
GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, int streamsLength) : Queue(streamsLength) {
60+
inferRequests.reserve(streamsLength);
61+
for (auto i =0; i < streamsLength; ++i) {
62+
inferRequests.emplace_back(std::make_unique<::mediapipe::CalculatorGraph>());
63+
std::ignore = inferRequests.back()->Initialize(config); // TODO FIXME
64+
}
65+
}
66+
};
67+
68+
struct GraphIdGuard {
69+
GraphQueue& queue;
70+
const int id;
71+
::mediapipe::CalculatorGraph& graph;
72+
GraphIdGuard(GraphQueue& queue) :
73+
queue(queue),
74+
id(queue.getIdleStream().get()),
75+
graph(*(queue.getInferRequest(id).get())) {}
76+
~GraphIdGuard(){
77+
this->queue.returnStream(this->id);
78+
}
79+
};
80+
} // namespace ovms

Diff for: src/mediapipe_internal/mediapipegraphdefinition.cpp

+9-3
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ Status MediapipeGraphDefinition::validateForConfigFileExistence() {
7979
ifs.seekg(0, std::ios::beg);
8080
std::stringstream config;
8181
config << ifs.rdbuf();
82+
this->queue = std::make_unique<GraphQueue>(this->config, 48);
8283
this->mgconfig.setCurrentGraphPbTxtMD5(ovms::FileSystem::getStringMD5(config.str()));
8384
this->chosenConfig.assign(config.str());
8485
return StatusCode::OK;
@@ -188,9 +189,14 @@ MediapipeGraphDefinition::MediapipeGraphDefinition(const std::string name,
188189
name(name),
189190
status(SCHEDULER_CLASS_NAME, this->name),
190191
pythonBackend(pythonBackend),
191-
reporter(std::make_unique<MediapipeServableMetricReporter>(metricConfig, registry, name)) {
192+
reporter(std::make_unique<MediapipeServableMetricReporter>(metricConfig, registry, name))
193+
{
192194
mgconfig = config;
193195
passKfsRequestFlag = false;
196+
/*if (!sharedThreadPool) {
197+
SPDLOG_ERROR("Created shared Thread Pool XXX");
198+
//sharedThreadPool = std::make_shared<mediapipe::ThreadPoolExecutor>(std::thread::hardware_concurrency()); // TODO FIXME should be in MP factory
199+
}*/
194200
}
195201

196202
Status MediapipeGraphDefinition::createInputsInfo() {
@@ -254,10 +260,10 @@ Status MediapipeGraphDefinition::create(std::shared_ptr<MediapipeGraphExecutor>&
254260
return status;
255261
}
256262
SPDLOG_DEBUG("Creating Mediapipe graph executor: {}", getName());
257-
263+
GraphIdGuard graphIdGuard(*(this->queue)); // TODO timeout?
258264
pipeline = std::make_shared<MediapipeGraphExecutor>(getName(), std::to_string(getVersion()),
259265
this->config, this->inputTypes, this->outputTypes, this->inputNames, this->outputNames,
260-
this->pythonNodeResourcesMap, this->genAiServableMap, this->pythonBackend, this->reporter.get());
266+
this->pythonNodeResourcesMap, this->llmNodeResourcesMap, this->pythonBackend, this->reporter.get(), std::move(graphIdGuard));
261267
return status;
262268
}
263269

Diff for: src/mediapipe_internal/mediapipegraphdefinition.hpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
#include "mediapipegraphconfig.hpp"
4444
#include "packettypes.hpp"
45+
#include "graphqueue.hpp"
4546

4647
namespace ovms {
4748
class MediapipeGraphDefinitionUnloadGuard;
@@ -135,7 +136,7 @@ class MediapipeGraphDefinition {
135136
PipelineDefinitionStatus status;
136137

137138
MediapipeGraphConfig mgconfig;
138-
::mediapipe::CalculatorGraphConfig config;
139+
::mediapipe::CalculatorGraphConfig config; // TODO rename configs
139140

140141
Status createInputsInfo();
141142
Status createOutputsInfo();
@@ -165,6 +166,7 @@ class MediapipeGraphDefinition {
165166
PythonBackend* pythonBackend;
166167

167168
std::unique_ptr<MediapipeServableMetricReporter> reporter;
169+
std::unique_ptr<GraphQueue> queue;
168170
};
169171

170172
class MediapipeGraphDefinitionUnloadGuard {

Diff for: src/mediapipe_internal/mediapipegraphexecutor.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ MediapipeGraphExecutor::MediapipeGraphExecutor(
4444
const PythonNodeResourcesMap& pythonNodeResourcesMap,
4545
const GenAiServableMap& llmNodeResourcesMap,
4646
PythonBackend* pythonBackend,
47-
MediapipeServableMetricReporter* mediapipeServableMetricReporter) :
47+
MediapipeServableMetricReporter* mediapipeServableMetricReporter,
48+
GraphIdGuard&& guard) :
4849
name(name),
4950
version(version),
5051
config(config),
@@ -56,7 +57,8 @@ MediapipeGraphExecutor::MediapipeGraphExecutor(
5657
llmNodeResourcesMap(llmNodeResourcesMap),
5758
pythonBackend(pythonBackend),
5859
currentStreamTimestamp(STARTING_TIMESTAMP),
59-
mediapipeServableMetricReporter(mediapipeServableMetricReporter) {}
60+
mediapipeServableMetricReporter(mediapipeServableMetricReporter),
61+
guard(std::move(guard)) {}
6062

6163
const std::string MediapipeGraphExecutor::PYTHON_SESSION_SIDE_PACKET_TAG = "py";
6264
const std::string MediapipeGraphExecutor::LLM_SESSION_SIDE_PACKET_TAG = "llm";

Diff for: src/mediapipe_internal/mediapipegraphexecutor.hpp

+8-10
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "mediapipe_utils.hpp"
4040
#include "mediapipegraphdefinition.hpp" // for version in response and PythonNodeResourceMap
4141
#include "packettypes.hpp"
42+
#include "graphqueue.hpp"
4243

4344
namespace ovms {
4445
class PythonBackend;
@@ -87,6 +88,7 @@ class MediapipeGraphExecutor {
8788
::mediapipe::Timestamp currentStreamTimestamp;
8889

8990
MediapipeServableMetricReporter* mediapipeServableMetricReporter;
91+
GraphIdGuard guard;
9092

9193
public:
9294
static const std::string PYTHON_SESSION_SIDE_PACKET_TAG;
@@ -100,23 +102,19 @@ class MediapipeGraphExecutor {
100102
const PythonNodeResourcesMap& pythonNodeResourcesMap,
101103
const GenAiServableMap& llmNodeResourcesMap,
102104
PythonBackend* pythonBackend,
103-
MediapipeServableMetricReporter* mediapipeServableMetricReporter);
105+
MediapipeServableMetricReporter* mediapipeServableMetricReporter, GraphIdGuard&&guard);
104106

105107
template <typename RequestType, typename ResponseType>
106108
Status infer(const RequestType* request, ResponseType* response, ExecutionContext executionContext) {
107109
OVMS_PROFILE_FUNCTION();
108110
SPDLOG_DEBUG("Start unary KServe request mediapipe graph: {} execution", this->name);
109111
MetricCounterGuard failedRequestsGuard(this->mediapipeServableMetricReporter->getRequestsMetric(executionContext, false));
110112
MetricGaugeGuard currentGraphsGuard(this->mediapipeServableMetricReporter->currentGraphs.get());
111-
::mediapipe::CalculatorGraph graph;
112-
MP_RETURN_ON_FAIL(graph.Initialize(this->config), std::string("failed initialization of MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR);
113-
enum : unsigned int {
114-
PROCESS,
115-
TIMER_END2
116-
};
117-
Timer<TIMER_END2> timer;
118-
timer.start(PROCESS);
119-
SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} initializationXXXend", this->name);
113+
::mediapipe::CalculatorGraph& graph = this->guard.graph;
114+
SPDLOG_ERROR("SetExecutor XXX");
115+
//std::ignore = graph.SetExecutor("", sharedThreadPool); // TODO FIXME
116+
SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} initializationXXXbegin", this->name);
117+
//MP_RETURN_ON_FAIL(graph.Initialize(this->config), std::string("failed initialization of MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR);
120118
std::unordered_map<std::string, ::mediapipe::OutputStreamPoller> outputPollers;
121119
for (auto& name : this->outputNames) {
122120
if (name.empty()) {

Diff for: src/test/mediapipeflow_test.cpp

+6-3
Original file line numberDiff line numberDiff line change
@@ -2494,13 +2494,14 @@ class MediapipeSerialization : public ::testing::Test {
24942494
stream_types_mapping_t outputTypes,
24952495
std::vector<std::string> inputNames, std::vector<std::string> outputNames,
24962496
const PythonNodeResourcesMap& pythonNodeResourcesMap,
2497-
MediapipeServableMetricReporter* mediapipeServableMetricReporter) :
2498-
MediapipeGraphExecutor(name, version, config, inputTypes, outputTypes, inputNames, outputNames, pythonNodeResourcesMap, {}, nullptr, mediapipeServableMetricReporter) {}
2497+
MediapipeServableMetricReporter* mediapipeServableMetricReporter, GraphIdGuard&& guard) :
2498+
MediapipeGraphExecutor(name, version, config, inputTypes, outputTypes, inputNames, outputNames, pythonNodeResourcesMap, {}, nullptr, mediapipeServableMetricReporter, std::move(guard)) {}
24992499
};
25002500

25012501
protected:
25022502
std::unique_ptr<MediapipeServableMetricReporter> reporter;
25032503
std::unique_ptr<MockedMediapipeGraphExecutor> executor;
2504+
std::unique_ptr<GraphQueue> queue;
25042505
::inference::ModelInferResponse mp_response;
25052506
void SetUp() {
25062507
ovms::stream_types_mapping_t mapping;
@@ -2514,7 +2515,9 @@ class MediapipeSerialization : public ::testing::Test {
25142515
const ::mediapipe::CalculatorGraphConfig config;
25152516
PythonNodeResourcesMap pythonNodeResourcesMap;
25162517
this->reporter = std::make_unique<MediapipeServableMetricReporter>(nullptr, nullptr, ""); // disabled reporter
2517-
executor = std::make_unique<MockedMediapipeGraphExecutor>("", "", config, mapping, mapping, inputNames, outputNames, pythonNodeResourcesMap, this->reporter.get());
2518+
queue = std::make_unique<GraphQueue>(config, 1);
2519+
GraphIdGuard guard(*queue);
2520+
executor = std::make_unique<MockedMediapipeGraphExecutor>("", "", config, mapping, mapping, inputNames, outputNames, pythonNodeResourcesMap, this->reporter.get(), std::move(guard));
25182521
}
25192522
};
25202523

Diff for: src/test/pythonnode_test.cpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -1006,8 +1006,8 @@ class MockedMediapipeGraphExecutorPy : public ovms::MediapipeGraphExecutor {
10061006
std::vector<std::string> inputNames, std::vector<std::string> outputNames,
10071007
const PythonNodeResourcesMap& pythonNodeResourcesMap,
10081008
PythonBackend* pythonBackend,
1009-
MediapipeServableMetricReporter* mediapipeServableMetricReporter) :
1010-
MediapipeGraphExecutor(name, version, config, inputTypes, outputTypes, inputNames, outputNames, pythonNodeResourcesMap, {}, pythonBackend, mediapipeServableMetricReporter) {}
1009+
MediapipeServableMetricReporter* mediapipeServableMetricReporter, GraphIdGuard&& guard) :
1010+
MediapipeGraphExecutor(name, version, config, inputTypes, outputTypes, inputNames, outputNames, pythonNodeResourcesMap, {}, pythonBackend, mediapipeServableMetricReporter, std::move(guard)) {}
10111011
};
10121012

10131013
TEST_F(PythonFlowTest, SerializePyObjectWrapperToKServeResponse) {
@@ -1017,7 +1017,9 @@ TEST_F(PythonFlowTest, SerializePyObjectWrapperToKServeResponse) {
10171017
const std::vector<std::string> outputNames;
10181018
const ::mediapipe::CalculatorGraphConfig config;
10191019
PythonNodeResourcesMap pythonNodeResourcesMap;
1020-
auto executor = MockedMediapipeGraphExecutorPy("", "", config, mapping, mapping, inputNames, outputNames, pythonNodeResourcesMap, getPythonBackend(), this->reporter.get());
1020+
std::unique_ptr<GraphQueue> queue = std::make_unique<GraphQueue>(config, 1);
1021+
GraphIdGuard guard(*queue);
1022+
auto executor = MockedMediapipeGraphExecutorPy("", "", config, mapping, mapping, inputNames, outputNames, pythonNodeResourcesMap, getPythonBackend(), this->reporter.get(), std::move(guard));
10211023

10221024
std::string datatype = "FP32";
10231025
std::string name = "python_result";

0 commit comments

Comments
 (0)