Skip to content

Commit d6b661d

Browse files
authored
[TE] Add ProgressWorker skeleton (#2199)
* [TE] Add explicit progressBatch API * [TE] Add progress worker skeleton
1 parent 54411af commit d6b661d

11 files changed

Lines changed: 1069 additions & 53 deletions

File tree

mooncake-transfer-engine/tent/config/transfer-engine.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"log_level": "warning",
1313
"max_failover_attempts": 3,
1414
"enable_auto_failover_on_poll": true,
15+
"enable_progress_worker": false,
1516
"metrics": {
1617
"enabled": true,
1718
"http_port": 9100,
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2024 KVCache.AI
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef PROGRESS_WORKER_H_
16+
#define PROGRESS_WORKER_H_
17+
18+
#include <atomic>
19+
#include <condition_variable>
20+
#include <deque>
21+
#include <mutex>
22+
#include <thread>
23+
#include <unordered_set>
24+
25+
#include "tent/common/types.h"
26+
27+
namespace mooncake {
28+
namespace tent {
29+
30+
class TransferEngineImpl;
31+
32+
// Event-driven progress worker for issue #2116. When the engine is configured
33+
// with enable_progress_worker=true, transports (or test hooks) call
34+
// notifyBatchMaybeReady to wake this worker, which then drives one
35+
// progressBatch step per notification. This decouples failover/resubmit from
36+
// the caller polling loop, so integrators that turn off
37+
// enable_auto_failover_on_poll do not need to spin a polling thread of their
38+
// own to keep failover progressing.
39+
class ProgressWorker {
40+
public:
41+
explicit ProgressWorker(TransferEngineImpl* impl);
42+
~ProgressWorker();
43+
44+
ProgressWorker(const ProgressWorker&) = delete;
45+
ProgressWorker& operator=(const ProgressWorker&) = delete;
46+
47+
void start();
48+
49+
// Idempotent. Signals the worker thread to exit and joins it. After stop()
50+
// returns, notifyBatchMaybeReady becomes a no-op.
51+
void stop();
52+
53+
// Safe from any thread. De-duplicates: enqueueing a batch that is already
54+
// queued is a no-op. No-op if the worker has been stopped or never
55+
// started.
56+
void notifyBatchMaybeReady(BatchID batch_id);
57+
58+
private:
59+
void runner();
60+
61+
TransferEngineImpl* impl_;
62+
std::atomic<bool> running_{false};
63+
std::thread thread_;
64+
65+
std::mutex mu_;
66+
std::condition_variable cv_;
67+
std::unordered_set<BatchID> queued_;
68+
std::deque<BatchID> order_;
69+
};
70+
71+
} // namespace tent
72+
} // namespace mooncake
73+
74+
#endif // PROGRESS_WORKER_H_

mooncake-transfer-engine/tent/include/tent/runtime/transfer_engine_impl.h

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class ControlService;
4646
class SegmentTracker;
4747
class Platform;
4848
class ProxyManager;
49+
class ProgressWorker;
4950

5051
struct TaskInfo {
5152
TransportType type{UNSPEC};
@@ -152,6 +153,8 @@ class TransferEngineImpl {
152153

153154
Status getTransferStatus(BatchID batch_id, TransferStatus& overall_status);
154155

156+
Status progressBatch(BatchID batch_id, TransferStatus& overall_status);
157+
155158
Status waitTransferCompletion(BatchID batch_id);
156159

157160
Status transferSync(const std::vector<Request>& request_list);
@@ -172,6 +175,11 @@ class TransferEngineImpl {
172175
}
173176
}
174177

178+
// Wake the optional event-driven progress worker for `batch_id`. No-op if
179+
// enable_progress_worker is false. Currently used by test/integration
180+
// hooks; transports will be migrated to call this in a follow-up PR.
181+
void notifyBatchMaybeReady(BatchID batch_id);
182+
175183
private:
176184
Status construct();
177185

@@ -189,8 +197,15 @@ class TransferEngineImpl {
189197

190198
Status resubmitTransferTask(Batch* batch, size_t task_id);
191199

192-
void updateTaskStatusFromPoll(Batch* batch, size_t task_id,
193-
TransferStatus& task_status);
200+
Status pollTaskStatus(Batch* batch, size_t task_id,
201+
TransferStatus& task_status);
202+
203+
void updateTaskStatusAfterPoll(Batch* batch, size_t task_id,
204+
TransferStatus& task_status,
205+
bool allow_failover);
206+
207+
Status getBatchStatus(BatchID batch_id, TransferStatus& overall_status,
208+
bool allow_failover);
194209

195210
SelectionResult resolveTransport(const Request& req, int transport_index,
196211
bool invalidate_on_fail = true);
@@ -244,6 +259,15 @@ class TransferEngineImpl {
244259
bool merge_requests_;
245260
int max_failover_attempts_{3};
246261
bool enable_auto_failover_on_poll_{true};
262+
bool enable_progress_worker_{false};
263+
264+
// Guards alive_batches_ and serializes pollTaskStatus /
265+
// updateTaskStatusAfterPoll / lazyFreeBatch against the optional
266+
// ProgressWorker thread. Recursive because freeBatch -> lazyFreeBatch ->
267+
// getTransferStatus can re-enter on the same thread. See issue #2116.
268+
std::recursive_mutex progress_mutex_;
269+
std::unordered_set<BatchID> alive_batches_;
270+
std::unique_ptr<ProgressWorker> progress_worker_;
247271
};
248272
} // namespace tent
249273
} // namespace mooncake

mooncake-transfer-engine/tent/include/tent/transfer_engine.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,14 @@ class TransferEngine {
308308

309309
Status getTransferStatus(BatchID batch_id, TransferStatus& overall_status);
310310

311+
// Drive one progress step on a batch and return its aggregated status.
312+
// Unlike getTransferStatus, this always allows internal failover/resubmit
313+
// regardless of enable_auto_failover_on_poll. The call is non-blocking and
314+
// performs at most one state-machine step per task; callers that want to
315+
// wait for completion must invoke it in a loop. PENDING means "make
316+
// progress later"; terminal states (COMPLETED/FAILED) will not be revived.
317+
Status progressBatch(BatchID batch_id, TransferStatus& overall_status);
318+
311319
private:
312320
std::unique_ptr<TransferEngineImpl> impl_;
313321
};
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2024 KVCache.AI
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "tent/runtime/progress_worker.h"
16+
17+
#include "tent/common/status.h"
18+
#include "tent/runtime/transfer_engine_impl.h"
19+
20+
namespace mooncake {
21+
namespace tent {
22+
23+
ProgressWorker::ProgressWorker(TransferEngineImpl* impl) : impl_(impl) {}
24+
25+
ProgressWorker::~ProgressWorker() { stop(); }
26+
27+
void ProgressWorker::start() {
28+
if (running_.exchange(true, std::memory_order_acq_rel)) return;
29+
thread_ = std::thread(&ProgressWorker::runner, this);
30+
}
31+
32+
void ProgressWorker::stop() {
33+
if (!running_.exchange(false, std::memory_order_acq_rel)) return;
34+
{
35+
std::lock_guard<std::mutex> lk(mu_);
36+
// Drop pending work; outstanding batches will be reaped via the
37+
// user thread's freeBatch path.
38+
order_.clear();
39+
queued_.clear();
40+
}
41+
cv_.notify_all();
42+
if (thread_.joinable()) thread_.join();
43+
}
44+
45+
void ProgressWorker::notifyBatchMaybeReady(BatchID batch_id) {
46+
if (!batch_id) return;
47+
if (!running_.load(std::memory_order_acquire)) return;
48+
{
49+
std::lock_guard<std::mutex> lk(mu_);
50+
if (!queued_.insert(batch_id).second) return;
51+
order_.push_back(batch_id);
52+
}
53+
cv_.notify_one();
54+
}
55+
56+
void ProgressWorker::runner() {
57+
while (true) {
58+
BatchID batch_id = 0;
59+
{
60+
std::unique_lock<std::mutex> lk(mu_);
61+
cv_.wait(lk, [&] {
62+
return !running_.load(std::memory_order_acquire) ||
63+
!order_.empty();
64+
});
65+
if (!running_.load(std::memory_order_acquire)) return;
66+
batch_id = order_.front();
67+
order_.pop_front();
68+
queued_.erase(batch_id);
69+
}
70+
// progressBatch acquires the engine's progress_mutex_ and silently
71+
// returns InvalidArgument if the batch was freed before we got here.
72+
// PENDING means "kick again later"; the next notify wakes us up.
73+
// Terminal states leave the batch alone — freeBatch on the user
74+
// thread is responsible for reclamation.
75+
TransferStatus s;
76+
(void)impl_->progressBatch(batch_id, s);
77+
}
78+
}
79+
80+
} // namespace tent
81+
} // namespace mooncake

mooncake-transfer-engine/tent/src/runtime/proxy_manager.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,8 +385,7 @@ Status ProxyManager::transferEventLoop(StagingTask& task,
385385

386386
case StageState::INFLIGHT: {
387387
TransferStatus xfer_status;
388-
CHECK_STATUS(
389-
impl_->getTransferStatus(chunk.batch, xfer_status));
388+
CHECK_STATUS(impl_->progressBatch(chunk.batch, xfer_status));
390389
if (xfer_status.s == PENDING) {
391390
event_queue.push(id);
392391
break;

0 commit comments

Comments
 (0)