Skip to content

Commit eebd7d8

Browse files
committed
add grpc server architecture doc
1 parent b744ca8 commit eebd7d8

File tree

1 file changed

+316
-0
lines changed

1 file changed

+316
-0
lines changed

GRPC_SERVER_ARCHITECTURE.md

Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
# Server Architecture
2+
3+
## Overview
4+
5+
The cuOpt gRPC server (`cuopt_grpc_server`) is a multi-process architecture designed for:
6+
- **Isolation**: Each solve runs in a separate worker process for fault tolerance
7+
- **Parallelism**: Multiple workers can process jobs concurrently
8+
- **Large Payloads**: Handles multi-GB problems and solutions
9+
- **Real-Time Feedback**: Log streaming and incumbent callbacks during solve
10+
11+
For gRPC protocol and client API, see `GRPC_INTERFACE.md`. Server source files live under `cpp/src/grpc/server/`.
12+
13+
## Process Model
14+
15+
```text
16+
┌────────────────────────────────────────────────────────────────────┐
17+
│ Main Server Process │
18+
│ │
19+
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────────┐ │
20+
│ │ gRPC │ │ Job │ │ Background Threads │ │
21+
│ │ Service │ │ Tracker │ │ - Result retrieval │ │
22+
│ │ Handler │ │ (job status,│ │ - Incumbent retrieval │ │
23+
│ │ │ │ results) │ │ - Worker monitor │ │
24+
│ └─────────────┘ └──────────────┘ └─────────────────────────────┘ │
25+
│ │ ▲ │
26+
│ │ shared memory │ pipes │
27+
│ ▼ │ │
28+
│ ┌─────────────────────────────────────────────────────────────────┐│
29+
│ │ Shared Memory Queues ││
30+
│ │ ┌─────────────────┐ ┌─────────────────────┐ ││
31+
│ │ │ Job Queue │ │ Result Queue │ ││
32+
│ │ │ (MAX_JOBS=100) │ │ (MAX_RESULTS=100) │ ││
33+
│ │ └─────────────────┘ └─────────────────────┘ ││
34+
│ └─────────────────────────────────────────────────────────────────┘│
35+
└────────────────────────────────────────────────────────────────────┘
36+
│ ▲
37+
│ fork() │
38+
▼ │
39+
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
40+
│ Worker 0 │ │ Worker 1 │ │ Worker N │
41+
│ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
42+
│ │ GPU Solve │ │ │ │ GPU Solve │ │ │ │ GPU Solve │ │
43+
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
44+
│ (separate proc)│ │ (separate proc)│ │ (separate proc)│
45+
└─────────────────┘ └─────────────────┘ └─────────────────┘
46+
```
47+
48+
## Inter-Process Communication
49+
50+
### Shared Memory Segments
51+
52+
| Segment | Purpose |
53+
|---------|---------|
54+
| `/cuopt_job_queue` | Job metadata (ID, type, size, status) |
55+
| `/cuopt_result_queue` | Result metadata (ID, status, size, error) |
56+
| `/cuopt_control` | Server control flags (shutdown, worker count) |
57+
58+
### Pipe Communication
59+
60+
Each worker has dedicated pipes for data transfer:
61+
62+
```cpp
63+
struct WorkerPipes {
64+
int to_worker_fd; // Main → Worker: job data (server writes)
65+
int from_worker_fd; // Worker → Main: result data (server reads)
66+
int worker_read_fd; // Worker end of input pipe (worker reads)
67+
int worker_write_fd; // Worker end of output pipe (worker writes)
68+
int incumbent_from_worker_fd; // Worker → Main: incumbent solutions (server reads)
69+
int worker_incumbent_write_fd; // Worker end of incumbent pipe (worker writes)
70+
};
71+
```
72+
73+
**Why pipes instead of shared memory for data?**
74+
- Pipes handle backpressure naturally (blocking writes)
75+
- No need to manage large shared memory segments
76+
- Works well with streaming uploads (data flows through)
77+
78+
### Source File Roles
79+
80+
All paths below are under `cpp/src/grpc/server/`.
81+
82+
| File | Role |
83+
|------|------|
84+
| `grpc_server_main.cpp` | `main()`, `print_usage()`, argument parsing, shared-memory init, gRPC server run/stop. |
85+
| `grpc_service_impl.cpp` | `CuOptRemoteServiceImpl`: all 14 RPC handlers (SubmitJob, CheckStatus, GetResult, chunked upload/download, StreamLogs, GetIncumbents, CancelJob, DeleteResult, WaitForCompletion, Status probe). Uses mappers and job_management to enqueue jobs and trigger pipe I/O. |
86+
| `grpc_server_types.hpp` | Shared structs (e.g. `JobQueueEntry`, `ResultQueueEntry`, `ServerConfig`, `JobInfo`), enums, globals (atomics, mutexes, condition variables), and forward declarations used across server .cpp files. |
87+
| `grpc_field_element_size.hpp` | Maps `cuopt::remote::ArrayFieldId` to element byte size; used by pipe deserialization and chunked logic. |
88+
| `grpc_pipe_serialization.hpp` | Streaming pipe I/O: write/read individual length-prefixed protobuf messages (ChunkedProblemHeader, ChunkedResultHeader, ArrayChunk) directly to/from pipe fds. Avoids large intermediate buffers. Also serializes SubmitJobRequest for unary pipe transfer. |
89+
| `grpc_incumbent_proto.hpp` | Build `Incumbent` proto from (job_id, objective, assignment) and parse it back; used by worker when pushing incumbents and by main when reading from the incumbent pipe. |
90+
| `grpc_worker.cpp` | `worker_process(worker_index)`: loop over job queue, receive job data via pipe (unary or chunked), call solver, send result (and optionally incumbents) back. Contains `IncumbentPipeCallback` and `store_simple_result`. |
91+
| `grpc_worker_infra.cpp` | Pipe creation/teardown, `spawn_worker` / `spawn_workers`, `wait_for_workers`, `mark_worker_jobs_failed`, `cleanup_shared_memory`. |
92+
| `grpc_server_threads.cpp` | `worker_monitor_thread`, `result_retrieval_thread`, `incumbent_retrieval_thread`, `session_reaper_thread`. |
93+
| `grpc_job_management.cpp` | Low-level pipe read/write, `send_job_data_pipe` / `recv_job_data_pipe`, `submit_job_async`, `check_job_status`, `cancel_job`, `generate_job_id`, log-dir helpers. |
94+
95+
### Large Payload Handling
96+
97+
For large problems uploaded via chunked gRPC RPCs:
98+
99+
1. Server holds chunked upload state in memory (`ChunkedUploadState`: header + array chunks per `upload_id`).
100+
2. When `FinishChunkedUpload` is called, the header and chunks are stored in `pending_chunked_data`. The data dispatch thread streams them directly to the worker pipe as individual length-prefixed protobuf messages — no intermediate blob is created.
101+
3. Worker reads the streamed messages from the pipe, reassembles arrays, runs the solver, and writes the result (and optionally incumbents) back via pipes using the same streaming format.
102+
4. Main process result-retrieval thread reads the streamed result messages from the pipe and stores the result for `GetResult` or chunked download.
103+
104+
This streaming approach avoids creating a single large buffer, eliminating the 2 GiB protobuf serialization limit for pipe transfers and reducing peak memory usage. Each individual protobuf message (max 64 MiB) is serialized with standard `SerializeToArray`/`ParseFromArray`.
105+
106+
No disk spooling: chunked data is kept in memory in the main process until forwarded to the worker.
107+
108+
## Job Lifecycle
109+
110+
### 1. Submission
111+
112+
```text
113+
Client Server Worker
114+
│ │ │
115+
│─── SubmitJob ──────────►│ │
116+
│ │ Create job entry │
117+
│ │ Store problem data │
118+
│ │ job_queue[slot].ready=true│
119+
│◄── job_id ──────────────│ │
120+
```
121+
122+
### 2. Processing
123+
124+
```text
125+
Client Server Worker
126+
│ │ │
127+
│ │ │ Poll job_queue
128+
│ │ │ Claim job (CAS)
129+
│ │◄─────────────────────────│ Read problem via pipe
130+
│ │ │
131+
│ │ │ Convert CPU→GPU
132+
│ │ │ solve_lp/solve_mip
133+
│ │ │ Convert GPU→CPU
134+
│ │ │
135+
│ │ result_queue[slot].ready │◄──────────────────
136+
│ │◄── result data via pipe ─│
137+
```
138+
139+
### 3. Result Retrieval
140+
141+
```text
142+
Client Server Worker
143+
│ │ │
144+
│─── CheckStatus ────────►│ │
145+
│◄── COMPLETED ───────────│ │
146+
│ │ │
147+
│─── GetResult ──────────►│ │
148+
│ │ Look up job_tracker │
149+
│◄── solution ────────────│ │
150+
```
151+
152+
## Data Type Conversions
153+
154+
Workers perform CPU↔GPU conversions to minimize client complexity:
155+
156+
```text
157+
Client Worker
158+
│ │
159+
│ cpu_optimization_ │
160+
│ problem_t ──────►│ map_proto_to_problem()
161+
│ │ ↓
162+
│ │ to_optimization_problem()
163+
│ │ ↓ (GPU)
164+
│ │ solve_lp() / solve_mip()
165+
│ │ ↓ (GPU)
166+
│ │ cudaMemcpy() to host
167+
│ │ ↓
168+
│ cpu_lp_solution_t/ │ map_lp_solution_to_proto() /
169+
│ cpu_mip_solution_t ◄────│ map_mip_solution_to_proto()
170+
```
171+
172+
## Background Threads
173+
174+
### Result Retrieval Thread
175+
176+
- Monitors `result_queue` for completed jobs
177+
- Reads result data from worker pipes
178+
- Updates `job_tracker` with results
179+
- Notifies waiting clients (via condition variable)
180+
181+
### Incumbent Retrieval Thread
182+
183+
- Monitors incumbent pipes from all workers
184+
- Parses `Incumbent` protobuf messages
185+
- Stores in `job_tracker[job_id].incumbents`
186+
- Enables `GetIncumbents` RPC to return data
187+
188+
### Worker Monitor Thread
189+
190+
- Detects crashed workers (via `waitpid`)
191+
- Marks affected jobs as FAILED
192+
- Can respawn workers (optional)
193+
194+
### Session Reaper Thread
195+
196+
- Runs every 60 seconds
197+
- Removes stale chunked upload and download sessions after 300 seconds of inactivity
198+
- Prevents memory leaks from abandoned upload/download sessions
199+
200+
## Log Streaming
201+
202+
Workers write logs to per-job files:
203+
204+
```text
205+
/tmp/cuopt_logs/job_<job_id>.log
206+
```
207+
208+
The `StreamLogs` RPC:
209+
1. Opens the log file
210+
2. Reads and sends new content periodically
211+
3. Closes when job completes
212+
213+
## Job States
214+
215+
```text
216+
┌─────────┐ submit ┌───────────┐ claim ┌────────────┐
217+
│ QUEUED │──────────►│ PROCESSING│─────────►│ COMPLETED │
218+
└─────────┘ └───────────┘ └────────────┘
219+
│ │
220+
│ cancel │ error
221+
▼ ▼
222+
┌───────────┐ ┌─────────┐
223+
│ CANCELLED │ │ FAILED │
224+
└───────────┘ └─────────┘
225+
```
226+
227+
## Configuration Options
228+
229+
```bash
230+
cuopt_grpc_server [options]
231+
232+
-p, --port PORT gRPC listen port (default: 8765)
233+
-w, --workers NUM Number of worker processes (default: 1)
234+
--max-message-mb N Max gRPC message size in MiB (default: 256; clamped to [4 KiB, ~2 GiB])
235+
--max-message-bytes N Max gRPC message size in bytes (exact; min 4096)
236+
--enable-transfer-hash Log data hashes for streaming transfers (for testing)
237+
--log-to-console Echo solver logs to server console
238+
-q, --quiet Reduce verbosity (verbose is the default)
239+
240+
TLS Options:
241+
--tls Enable TLS encryption
242+
--tls-cert PATH Server certificate (PEM)
243+
--tls-key PATH Server private key (PEM)
244+
--tls-root PATH Root CA certificate (for client verification)
245+
--require-client-cert Require client certificate (mTLS)
246+
```
247+
248+
## Fault Tolerance
249+
250+
### Worker Crashes
251+
252+
If a worker process crashes:
253+
1. Monitor thread detects via `waitpid(WNOHANG)`
254+
2. Any jobs the worker was processing are marked as FAILED
255+
3. A replacement worker is automatically spawned (unless shutting down)
256+
4. Other workers continue operating unaffected
257+
258+
### Graceful Shutdown
259+
260+
On SIGINT/SIGTERM:
261+
1. Set `shm_ctrl->shutdown_requested = true`
262+
2. Workers finish current job and exit
263+
3. Main process waits for workers
264+
4. Cleanup shared memory segments
265+
266+
### Job Cancellation
267+
268+
When `CancelJob` is called:
269+
1. Set `job_queue[slot].cancelled = true`
270+
2. Worker checks the flag before starting the solve
271+
3. If cancelled, worker stores CANCELLED result and skips to the next job
272+
4. If the solve has already started, it runs to completion (no mid-solve cancellation)
273+
274+
## Memory Management
275+
276+
| Resource | Location | Cleanup |
277+
|----------|----------|---------|
278+
| Job queue entries | Shared memory | Reused after completion |
279+
| Result queue entries | Shared memory | Reused after retrieval |
280+
| Problem data | Pipe (transient) | Consumed by worker |
281+
| Chunked upload state | Main process memory | After `FinishChunkedUpload` (forwarded to worker) |
282+
| Result data | `job_tracker` map | `DeleteResult` RPC |
283+
| Log files | `/tmp/cuopt_logs/` | `DeleteResult` RPC |
284+
285+
## Performance Considerations
286+
287+
### Worker Count
288+
289+
- Each worker needs a GPU (or shares with others)
290+
- Too many workers: GPU memory contention
291+
- Too few workers: Underutilized when jobs queue
292+
- Recommendation: 1-2 workers per GPU
293+
294+
### Pipe Buffering
295+
296+
- Pipe buffer size is set to 1 MiB via `fcntl(F_SETPIPE_SZ)` (Linux default is 64 KiB)
297+
- Large results block worker until main process reads
298+
- Result retrieval thread should read promptly
299+
- Deadlock prevention: Set `result.ready = true` BEFORE writing pipe
300+
301+
### Shared Memory Limits
302+
303+
- `MAX_JOBS = 100`: Maximum concurrent queued jobs
304+
- `MAX_RESULTS = 100`: Maximum stored results
305+
- Increase if needed for high-throughput scenarios
306+
307+
## File Locations
308+
309+
| Path | Purpose |
310+
|------|---------|
311+
| `/tmp/cuopt_logs/` | Per-job solver log files |
312+
| `/cuopt_job_queue` | Shared memory (job metadata) |
313+
| `/cuopt_result_queue` | Shared memory (result metadata) |
314+
| `/cuopt_control` | Shared memory (server control) |
315+
316+
Chunked upload state is held in memory in the main process (no upload directory).

0 commit comments

Comments
 (0)