Skip to content

Commit 54411af

Browse files
alogfansclaude
andauthored
[TENT] Add rule-based transport and device selection (#2079)
* [TENT] Add configuration-driven transport selector Add TransportSelector for flexible, configuration-driven transport selection policy while maintaining full backward compatibility. Key features: - Configuration-driven transport selection via JSON policy rules - Support for segment_type filtering, device allocation, and transport priority - Legacy mode option (use_legacy_transport_selection) for exact original behavior - Default policies match original hardcoded behavior Changes: - Add TransportSelector class with SelectionContext, SelectionPolicy, SelectionResult - Integrate TransportSelector into TransferEngineImpl - Add legacy mode support to preserve original code path - Restore TaskInfo fields (xport_priority, failover_count) for backward compatibility - Add max_failover_attempts configuration option Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * [TENT] Add comprehensive unit tests for TransportSelector Add transport_selector_test.cpp with test coverage for: - Default policies matching original behavior (File/Memory segments) - Transport type name parsing - Legacy mode enable/disable - Transport availability based on capabilities - Priority offset for fallback scenarios - Device mask handling - NVLINK same-machine constraint - ROCm memory type support - GPU-to-GPU, CPU-to-CPU, CPU-to-GPU, GPU-to-CPU transfers Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * [TENT] Fix FakeTransport to use protected caps member Fix compilation error by accessing Transport::caps (protected) through helper methods instead of a separate public member. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix * fix format issues * Add priority-based rule * Reformat * Fix review comments * Fix memory leak in endpoint_store_integration_test When ibv_get_device_list returns a non-NULL list but num_devices == 0, we need to call ibv_free_device_list before returning to avoid leaking the allocated memory. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Trigger CI --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3dfee31 commit 54411af

11 files changed

Lines changed: 1539 additions & 46 deletions

File tree

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# TENT Transport Selector
2+
3+
## Overview
4+
5+
The Transport Selector is responsible for choosing the optimal transport and devices for each transfer request based on configuration policies.
6+
7+
## Request Priority
8+
9+
TENT uses a unified priority system across all components. See [QoS.md](qos.md) for detailed description of priority levels and their usage throughout the system.
10+
11+
Quick reference:
12+
- `"high"` / `0` - High-priority requests (metadata, control, latency-sensitive)
13+
- `"medium"` / `1` - Medium-priority requests (interactive queries, serving)
14+
- `"low"` / `2` - Low-priority requests (bulk transfer, background jobs)
15+
16+
## Configuration-Based Transport Selection
17+
18+
Transport selection is driven by configuration with pattern-based rules.
19+
20+
### Configuration Example
21+
22+
```json
23+
{
24+
"policy": [
25+
{
26+
"name": "high_prio_fast",
27+
"segment_type": "memory",
28+
"priority": "high",
29+
"devices": ["mlx5_0", "mlx5_1", "mlx5_2"],
30+
"transports": ["nvlink", "rdma", "shm"]
31+
},
32+
{
33+
"name": "low_prio_slow",
34+
"segment_type": "memory",
35+
"priority": "low",
36+
"devices": ["mlx5_0"],
37+
"transports": ["rdma", "tcp"]
38+
},
39+
{
40+
"name": "file_storage",
41+
"segment_type": "file",
42+
"transports": ["gds", "io_uring", "rdma"]
43+
}
44+
]
45+
}
46+
```
47+
48+
### Policy Fields
49+
50+
| Field | Type | Required | Description |
51+
|-------|------|----------|-------------|
52+
| `name` | string | Yes | Policy identifier (for logging) |
53+
| `segment_type` | string | Yes | `"memory"` or `"file"` |
54+
| `priority` | string or int | No | Match only requests with this priority: `"high"` (0), `"medium"` (1), `"low"` (2) |
55+
| `devices` | array[string] | No | List of allowed device names (empty = all devices) |
56+
| `transports` | array[string] | No | Transport preference list (evaluated in order) |
57+
58+
### Memory Type Filters
59+
60+
For `memory` segments, you can filter by source/destination memory type:
61+
62+
| Pattern | Matches |
63+
|---------|---------|
64+
| `"cuda"` | CUDA GPU memory |
65+
| `"cpu"` | CPU/host memory |
66+
| `"hip"` | ROCm/HIP GPU memory |
67+
| `"npu"` | Ascend NPU memory |
68+
| `"*"` | Any memory type |
69+
70+
### Size Filters
71+
72+
Restrict policies to specific transfer sizes:
73+
74+
```json
75+
{
76+
"name": "small_transfers",
77+
"segment_type": "memory",
78+
"min_size": 0,
79+
"max_size": 1048576,
80+
"transports": ["shm", "nvlink"]
81+
}
82+
```
83+
84+
## Device Mask
85+
86+
The `devices` field in a policy creates a bitmask that restricts which NICs can be used:
87+
88+
```json
89+
{
90+
"name": "use_nic_0_only",
91+
"segment_type": "memory",
92+
"devices": ["mlx5_0"],
93+
"transports": ["rdma"]
94+
}
95+
```
96+
97+
This is translated internally to a 64-bit bitmask where each bit represents one device:
98+
- `devices: ["mlx5_0"]``device_mask = 0x0001` (bit 0 set)
99+
- `devices: ["mlx5_1", "mlx5_2"]``device_mask = 0x0006` (bits 1 and 2 set)
100+
- `devices: []` (empty) → `device_mask = ~0ULL` (all devices)
101+
102+
## Transport Fallback
103+
104+
When multiple transports are listed in the `transports` array, they act as fallback options:
105+
106+
```json
107+
{
108+
"transports": ["rdma", "tcp"]
109+
}
110+
```
111+
112+
This means:
113+
1. Try RDMA first
114+
2. If RDMA is unavailable or fails, fall back to TCP
115+
116+
### Fallback with `transport_index`
117+
118+
For programmatic control, the `transport_index` parameter selects which transport to use:
119+
120+
```cpp
121+
// transport_index = 0 → First transport (rdma)
122+
// transport_index = 1 → Second transport (tcp)
123+
// transport_index = 2 → Third transport (if exists)
124+
auto result = selector.select(context, transports, transport_index);
125+
```
126+
127+
## Default Behavior
128+
129+
If no `policy` is configured, TENT falls back to original behavior:
130+
131+
| Segment Type | Default Transport Order |
132+
|--------------|-------------------------|
133+
| File | GDS → IOURING → RDMA |
134+
| Memory | Uses `buffer_transports` order from buffer registration |
135+
136+
## Complete Example
137+
138+
```json
139+
{
140+
"policy": [
141+
{
142+
"name": "high_priority_local",
143+
"segment_type": "memory",
144+
"same_machine": true,
145+
"priority": "high",
146+
"transports": ["nvlink", "shm"]
147+
},
148+
{
149+
"name": "high_priority_remote",
150+
"segment_type": "memory",
151+
"same_machine": false,
152+
"priority": "high",
153+
"local_memory": "cuda",
154+
"remote_memory": "cuda",
155+
"devices": ["mlx5_0", "mlx5_1", "mlx5_2"],
156+
"transports": ["rdma"]
157+
},
158+
{
159+
"name": "medium_priority",
160+
"segment_type": "memory",
161+
"priority": "medium",
162+
"transports": ["rdma"]
163+
},
164+
{
165+
"name": "bulk_transfer",
166+
"segment_type": "memory",
167+
"priority": "low",
168+
"min_size": 104857600,
169+
"transports": ["rdma", "tcp"]
170+
},
171+
{
172+
"name": "file_ops",
173+
"segment_type": "file",
174+
"transports": ["gds", "io_uring"]
175+
}
176+
]
177+
}
178+
```
179+
180+
## Unified Priority System
181+
182+
The priority value propagates through multiple layers:
183+
184+
```
185+
Request.priority (application layer)
186+
187+
TransportSelector policy matching
188+
189+
DeviceSelector allocation (QoS filtering)
190+
191+
Worker thread scheduling queues
192+
```
193+
194+
**Layer interactions**:
195+
196+
1. **TransportSelector**: Policy's `priority` field filters which requests match
197+
2. **DeviceSelector**: `priority` parameter controls device eligibility (QoS mode)
198+
3. **Workers**: Separate queues per priority level with strict draining order
199+
200+
See [QoS.md](qos.md) for details on worker scheduling and global slot coordination.
201+
202+
## Data Flow
203+
204+
```
205+
Request with priority
206+
207+
TransportSelector.select(context, transports, transport_index)
208+
209+
Match policy by:
210+
- segment_type (file/memory)
211+
- priority (exact match if specified in policy)
212+
- location constraints
213+
- size constraints
214+
215+
Build device_mask from policy.devices
216+
217+
Select transport from policy.transports[transport_index]
218+
219+
Return SelectionResult { transport, device_mask }
220+
221+
RdmaTransport.submitTransferTasks(batch, requests)
222+
223+
DeviceSelector.allocate(..., request.priority, batch.device_mask)
224+
225+
Worker scheduling (separate queues per priority)
226+
```

mooncake-transfer-engine/tent/config/transfer-engine.json

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,5 +89,18 @@
8989
"mnnvl": {
9090
"enable" : false
9191
}
92-
}
92+
},
93+
"policy": [
94+
{
95+
"name": "default_memory",
96+
"segment_type": "memory",
97+
"devices": ["mlx5_0", "mlx5_2"],
98+
"transports": ["nvlink", "rdma", "shm"]
99+
},
100+
{
101+
"name": "file_storage",
102+
"segment_type": "file",
103+
"transports": ["gds", "io_uring", "rdma"]
104+
}
105+
]
93106
}

mooncake-transfer-engine/tent/include/tent/runtime/transfer_engine_impl.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "tent/common/status.h"
3232
#include "tent/common/types.h"
3333
#include "tent/common/concurrent/thread_local_storage.h"
34+
#include "tent/runtime/transport_selector.h"
3435

3536
namespace mooncake {
3637
namespace tent {
@@ -49,14 +50,15 @@ class ProxyManager;
4950
struct TaskInfo {
5051
TransportType type{UNSPEC};
5152
int sub_task_id{-1};
52-
bool derived{false}; // merged by other tasks
53-
int xport_priority{0};
53+
bool derived{false}; // merged by other tasks
54+
int xport_priority{0}; // transport priority (for fallback)
55+
int failover_count{0}; // number of failover attempts
56+
uint64_t device_mask{~0ULL}; // Device mask for quota allocation
5457
Request request;
5558
bool staging{false};
5659
TransferStatusEnum status{TransferStatusEnum::PENDING};
5760
volatile TransferStatusEnum staging_status{TransferStatusEnum::PENDING};
5861
std::chrono::steady_clock::time_point start_time{}; // For latency tracking
59-
int failover_count{0}; // Number of cross-transport failover attempts
6062
};
6163

6264
class TransferEngineImpl {
@@ -179,7 +181,8 @@ class TransferEngineImpl {
179181

180182
Status lazyFreeBatch();
181183

182-
TransportType getTransportType(const Request& request, int priority = 0);
184+
SelectionResult getTransportType(const Request& request,
185+
int transport_index = 0);
183186

184187
std::vector<TransportType> getSupportedTransports(
185188
TransportType request_type);
@@ -189,8 +192,8 @@ class TransferEngineImpl {
189192
void updateTaskStatusFromPoll(Batch* batch, size_t task_id,
190193
TransferStatus& task_status);
191194

192-
TransportType resolveTransport(const Request& req, int priority,
193-
bool invalidate_on_fail = true);
195+
SelectionResult resolveTransport(const Request& req, int transport_index,
196+
bool invalidate_on_fail = true);
194197

195198
Status loadTransports();
196199

@@ -220,6 +223,7 @@ class TransferEngineImpl {
220223
std::shared_ptr<Config> conf_;
221224
std::shared_ptr<ControlService> metadata_;
222225
std::shared_ptr<Topology> topology_;
226+
std::unique_ptr<TransportSelector> transport_selector_;
223227
bool available_;
224228

225229
std::array<std::shared_ptr<Transport>, kSupportedTransportTypes>

mooncake-transfer-engine/tent/include/tent/runtime/transport.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ struct Capabilities {
4545
class Transport {
4646
public:
4747
struct SubBatch {
48-
SubBatch() {}
48+
SubBatch() : device_mask(~0ULL) {}
4949
virtual ~SubBatch() {}
5050
virtual size_t size() const = 0;
51+
uint64_t device_mask; // Device mask for transport selection
5152
};
5253

5354
using SubBatchRef = SubBatch *;

0 commit comments

Comments
 (0)