Skip to content

Commit cc9ee0d

Browse files
committed
feat(proxy): Implement Epic 2.4 Streaming Proxy Pipeline
1 parent 9a6447e commit cc9ee0d

2 files changed

Lines changed: 237 additions & 81 deletions

File tree

src/entity_mask.zig

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,14 @@ pub const AhoCorasick = struct {
9898
map[c] = idx;
9999
idx += 1;
100100
}
101-
map[' '] = idx; idx += 1;
102-
map['.'] = idx; idx += 1;
103-
map['-'] = idx; idx += 1;
104-
map['\''] = idx; idx += 1;
101+
map[' '] = idx;
102+
idx += 1;
103+
map['.'] = idx;
104+
idx += 1;
105+
map['-'] = idx;
106+
idx += 1;
107+
map['\''] = idx;
108+
idx += 1;
105109
// all mapped characters fit under 64. Unmapped bytes use index 0.
106110
break :blk map;
107111
};
@@ -425,6 +429,19 @@ pub const AcChunkState = struct {
425429
self.len = 0;
426430
return result;
427431
}
432+
433+
/// Emit remaining pending bytes after the last unmask chunk.
434+
/// Returns an owned slice — caller must free it.
435+
pub fn flushUnmask(
436+
self: *AcChunkState,
437+
em: *const EntityMap,
438+
allocator: std.mem.Allocator,
439+
) ![]u8 {
440+
if (self.len == 0) return try allocator.alloc(u8, 0);
441+
const result = try em.unmask(self.pending[0..self.len], allocator);
442+
self.len = 0;
443+
return result;
444+
}
428445
};
429446

430447
// ---------------------------------------------------------------------------
@@ -625,6 +642,79 @@ pub const EntityMap = struct {
625642

626643
return result;
627644
}
645+
646+
/// Create initial chunk state for streaming unmasking.
647+
/// Caller MUST call `state.deinit(allocator)` when finished to free internal buffers.
648+
pub fn initUnmaskChunkState(self: *const EntityMap) AcChunkState {
649+
var max_len: usize = 0;
650+
for (self.reverse_ac.pattern_lengths) |pl| {
651+
if (pl > max_len) max_len = pl;
652+
}
653+
return AcChunkState{
654+
.overlap = if (max_len > 0) max_len - 1 else 0,
655+
};
656+
}
657+
658+
/// Process one chunk for entity unmasking in streaming mode.
659+
///
660+
/// Scans the full `pending ++ chunk` buffer for reverse matches.
661+
/// Patterns spanning the safe/pending boundary are deferred to the next call.
662+
///
663+
/// Returns an owned output slice — caller must free it.
664+
/// After all chunks, call `state.flushUnmask(&em, allocator)` for the tail.
665+
pub fn unmaskChunked(
666+
self: *const EntityMap,
667+
chunk: []const u8,
668+
state: *AcChunkState,
669+
allocator: std.mem.Allocator,
670+
) ![]u8 {
671+
if (chunk.len == 0) {
672+
return try allocator.alloc(u8, 0);
673+
}
674+
675+
const old_pending_len = state.len;
676+
const total = old_pending_len + chunk.len;
677+
678+
// If combined data is smaller than the overlap window, just accumulate.
679+
if (total <= state.overlap) {
680+
@memcpy(state.pending[old_pending_len..][0..chunk.len], chunk);
681+
state.len = total;
682+
return try allocator.alloc(u8, 0);
683+
}
684+
685+
// Zero-allocation path: reuse the buffer inside state
686+
try state.combined_buf.resize(allocator, total);
687+
const combined = state.combined_buf.items;
688+
689+
if (old_pending_len > 0) {
690+
@memcpy(combined[0..old_pending_len], state.pending[0..old_pending_len]);
691+
}
692+
@memcpy(combined[old_pending_len..], chunk);
693+
694+
// safe_end: raw position up to which we emit unmasked output.
695+
// Everything from safe_end onward becomes new pending.
696+
const new_pending_len = @min(state.overlap, total);
697+
const safe_end = total - new_pending_len;
698+
699+
var consumed: usize = undefined;
700+
const result = try replaceAllBounded(
701+
&self.reverse_ac,
702+
combined,
703+
self.name_const_slices,
704+
safe_end,
705+
&consumed,
706+
allocator,
707+
);
708+
709+
// Save raw tail (from consumed onward) as new pending.
710+
const actual_pending = total - consumed;
711+
state.len = actual_pending;
712+
if (actual_pending > 0) {
713+
@memcpy(state.pending[0..actual_pending], combined[consumed..][0..actual_pending]);
714+
}
715+
716+
return result;
717+
}
628718
};
629719

630720
// ===========================================================================
@@ -836,7 +926,7 @@ test "bench - EntityMap mask throughput" {
836926
const allocator = std.testing.allocator;
837927

838928
const names = [_][]const u8{
839-
"John Doe", "Jane Smith", "Dr. Johnson",
929+
"John Doe", "Jane Smith", "Dr. Johnson",
840930
"Mary Williams", "Robert Brown",
841931
};
842932
var em = try EntityMap.init(allocator, &names);

src/proxy.zig

Lines changed: 142 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -78,50 +78,6 @@ pub fn handleRequest(
7878
break :blk session_entity_map;
7979
};
8080

81-
// --- Read incoming request body (if present) ---
82-
var req_body: std.ArrayListUnmanaged(u8) = .empty;
83-
defer req_body.deinit(allocator);
84-
85-
const has_body = method.requestHasBody();
86-
if (has_body) {
87-
var body_read_buf: [8192]u8 = undefined;
88-
if (request.readerExpectContinue(&body_read_buf)) |body_reader| {
89-
try body_reader.appendRemainingUnlimited(allocator, &req_body);
90-
} else |err| {
91-
std.debug.print("[WARN] Failed to read request body: {}\n", .{err});
92-
}
93-
}
94-
95-
// --- Request path: apply privacy pipeline to outbound body ---
96-
// sanitized_body is always a mutable owned allocation so that
97-
// sendBodyComplete (which requires []u8) can use it directly
98-
// without @constCast.
99-
var sanitized_body: ?[]u8 = null;
100-
defer if (sanitized_body) |sb| allocator.free(sb);
101-
102-
if (req_body.items.len > 0) {
103-
// 1. Entity mask: names -> aliases
104-
if (active_entity_map) |em| {
105-
const masked = try em.mask(req_body.items, allocator);
106-
// 2. SSN redact: digits -> * (in-place on the masked buffer)
107-
redact.redactSsn(masked);
108-
// 3. Fuzzy name redact: catch OCR variants missed by exact match
109-
const active_fuzzy = session_fuzzy_matcher;
110-
if (active_fuzzy) |fm| {
111-
const fuzzy_result = try fm.fuzzyRedact(masked, em.getAliases(), &.{}, allocator);
112-
allocator.free(masked);
113-
sanitized_body = fuzzy_result;
114-
} else {
115-
sanitized_body = masked;
116-
}
117-
} else {
118-
// No entity map — SSN redact in-place on a mutable copy
119-
const duped = try allocator.dupe(u8, req_body.items);
120-
redact.redactSsn(duped);
121-
sanitized_body = duped;
122-
}
123-
}
124-
12581
// --- Forward request to upstream ---
12682
var url_buf: [max_url_len]u8 = undefined;
12783
const target_url_str = try std.fmt.bufPrint(&url_buf, "http://{s}:{d}{s}", .{ target_host, target_port, uri_str });
@@ -135,17 +91,110 @@ pub fn handleRequest(
13591
break :blk .default;
13692
};
13793

94+
const has_body = method.requestHasBody();
95+
13896
var client_req = try client.request(method, target_uri, .{
13997
.headers = .{ .content_type = content_type_override },
98+
.transfer_encoding = if (has_body) .chunked else .none,
14099
});
141100
defer client_req.deinit();
142101

143-
if (has_body and sanitized_body != null) {
144-
// Send with body — sendBodyComplete sets content-length and flushes.
145-
// sanitized_body is always a mutable owned []u8, satisfying the API.
146-
try client_req.sendBodyComplete(sanitized_body.?);
102+
// --- Request path: apply privacy pipeline to outbound body ---
103+
if (has_body) {
104+
var ac_state: ?entity_mask.AcChunkState = null;
105+
if (active_entity_map) |em| ac_state = em.initChunkState();
106+
defer if (ac_state) |*s| s.deinit(allocator);
107+
108+
var ssn_state = redact.SsnChunkState{};
109+
110+
var fuzzy_state: ?fuzzy_match.FuzzyChunkState = null;
111+
if (session_fuzzy_matcher) |fm| fuzzy_state = fm.initChunkState();
112+
defer if (fuzzy_state) |*s| s.deinit(allocator);
113+
114+
var body_read_buf: [8192]u8 = undefined;
115+
if (request.readerExpectContinue(&body_read_buf)) |body_reader| {
116+
var raw_chunk_buf: [8192]u8 = undefined;
117+
while (true) {
118+
const bytes_read = try body_reader.read(&raw_chunk_buf);
119+
if (bytes_read == 0) break;
120+
121+
const raw_chunk = raw_chunk_buf[0..bytes_read];
122+
123+
var masked_chunk: []u8 = undefined;
124+
var masked_allocated = false;
125+
if (active_entity_map) |em| {
126+
masked_chunk = try em.maskChunked(raw_chunk, &ac_state.?, allocator);
127+
masked_allocated = true;
128+
} else {
129+
masked_chunk = try allocator.dupe(u8, raw_chunk);
130+
masked_allocated = true;
131+
}
132+
defer if (masked_allocated) allocator.free(masked_chunk);
133+
134+
const ssn_res = redact.redactSsnChunked(masked_chunk, &ssn_state);
135+
136+
if (session_fuzzy_matcher) |fm| {
137+
const em_aliases = if (active_entity_map) |em| em.getAliases() else &.{};
138+
if (ssn_res.finalized.len > 0) {
139+
const f1 = try fm.fuzzyRedactChunked(ssn_res.finalized, &fuzzy_state.?, em_aliases, &.{});
140+
defer allocator.free(f1);
141+
if (f1.len > 0) try client_req.writeAll(f1);
142+
}
143+
if (ssn_res.emitted.len > 0) {
144+
const f2 = try fm.fuzzyRedactChunked(ssn_res.emitted, &fuzzy_state.?, em_aliases, &.{});
145+
defer allocator.free(f2);
146+
if (f2.len > 0) try client_req.writeAll(f2);
147+
}
148+
} else {
149+
if (ssn_res.finalized.len > 0) try client_req.writeAll(ssn_res.finalized);
150+
if (ssn_res.emitted.len > 0) try client_req.writeAll(ssn_res.emitted);
151+
}
152+
}
153+
154+
// Flushes
155+
var ac_flushed: ?[]u8 = null;
156+
if (active_entity_map) |em| {
157+
ac_flushed = try ac_state.?.flush(em, allocator);
158+
}
159+
defer if (ac_flushed) |f| allocator.free(f);
160+
161+
var ssn_final_emissions = std.ArrayList(u8).init(allocator);
162+
defer ssn_final_emissions.deinit();
163+
164+
if (ac_flushed) |f| {
165+
if (f.len > 0) {
166+
const ssn_res = redact.redactSsnChunked(f, &ssn_state);
167+
if (ssn_res.finalized.len > 0) try ssn_final_emissions.appendSlice(ssn_res.finalized);
168+
if (ssn_res.emitted.len > 0) try ssn_final_emissions.appendSlice(ssn_res.emitted);
169+
}
170+
}
171+
172+
const ssn_flushed = ssn_state.flush();
173+
if (ssn_flushed.len > 0) {
174+
try ssn_final_emissions.appendSlice(ssn_flushed);
175+
}
176+
177+
if (session_fuzzy_matcher) |fm| {
178+
const em_aliases = if (active_entity_map) |em| em.getAliases() else &.{};
179+
if (ssn_final_emissions.items.len > 0) {
180+
const f_res = try fm.fuzzyRedactChunked(ssn_final_emissions.items, &fuzzy_state.?, em_aliases, &.{});
181+
defer allocator.free(f_res);
182+
if (f_res.len > 0) try client_req.writeAll(f_res);
183+
}
184+
const fuzzy_flushed = try fuzzy_state.?.flush(fm, em_aliases, &.{}, allocator);
185+
defer allocator.free(fuzzy_flushed);
186+
if (fuzzy_flushed.len > 0) try client_req.writeAll(fuzzy_flushed);
187+
} else {
188+
if (ssn_final_emissions.items.len > 0) {
189+
try client_req.writeAll(ssn_final_emissions.items);
190+
}
191+
}
192+
} else |err| {
193+
std.debug.print("[WARN] Failed to read request body: {}\n", .{err});
194+
}
195+
196+
try client_req.finish();
147197
} else {
148-
// Bodiless request (GET, DELETE, etc.)
149198
try client_req.sendBodilessUnflushed();
150199
if (client_req.connection) |conn| {
151200
try conn.flush();
@@ -159,45 +208,62 @@ pub fn handleRequest(
159208
var transfer_buf: [8192]u8 = undefined;
160209
var downstream_reader = downstream_res.reader(&transfer_buf);
161210

162-
var resp_body: std.ArrayListUnmanaged(u8) = .empty;
163-
defer resp_body.deinit(allocator);
164-
try downstream_reader.appendRemainingUnlimited(allocator, &resp_body);
165-
166-
// --- Response path: unmask aliases back to real names ---
167-
var final_response: []const u8 = resp_body.items;
168-
var unmasked_buf: ?[]u8 = null;
169-
defer if (unmasked_buf) |ub| allocator.free(ub);
170-
171-
if (active_entity_map) |em| {
172-
if (resp_body.items.len > 0) {
173-
unmasked_buf = try em.unmask(resp_body.items, allocator);
174-
final_response = unmasked_buf.?;
175-
}
176-
}
177-
178-
std.debug.print("[PRX] <- {d} ({} bytes)\n", .{
179-
@intFromEnum(downstream_res.head.status),
180-
final_response.len,
181-
});
211+
std.debug.print("[PRX] <- {d}\n", .{@intFromEnum(downstream_res.head.status)});
182212

183213
// Forward upstream Content-Type so the client receives the correct media type.
184214
const upstream_ct = downstream_res.head.content_type;
185215
var ct_headers = [_]http.Header{.{ .name = "Content-Type", .value = "" }};
216+
var extra_headers: []const http.Header = &.{};
186217

187218
if (upstream_ct != .default) {
188219
ct_headers[0].value = switch (upstream_ct) {
189220
.override => |v| v,
190221
.default => "",
191222
};
192-
try request.respond(final_response, .{
193-
.status = downstream_res.head.status,
194-
.extra_headers = &ct_headers,
195-
});
196-
} else {
197-
try request.respond(final_response, .{
223+
extra_headers = ct_headers[0..1];
224+
}
225+
226+
// --- Response path: unmask aliases back to real names ---
227+
var resp_buf8: [8192]u8 = undefined;
228+
var response_writer = try request.respondStreaming(&resp_buf8, .{
229+
.respond_options = .{
198230
.status = downstream_res.head.status,
199-
});
231+
.extra_headers = extra_headers,
232+
},
233+
});
234+
235+
if (downstream_res.head.method.responseHasBody()) {
236+
var unmask_state: ?entity_mask.AcChunkState = null;
237+
if (active_entity_map) |em| {
238+
unmask_state = em.initUnmaskChunkState();
239+
}
240+
defer if (unmask_state) |*s| s.deinit(allocator);
241+
242+
var resp_buf: [8192]u8 = undefined;
243+
while (true) {
244+
const bytes_read = try downstream_reader.read(&resp_buf);
245+
if (bytes_read == 0) break;
246+
247+
const raw_chunk = resp_buf[0..bytes_read];
248+
249+
if (active_entity_map) |em| {
250+
const unmasked = try em.unmaskChunked(raw_chunk, &unmask_state.?, allocator);
251+
defer allocator.free(unmasked);
252+
if (unmasked.len > 0) try response_writer.writeAll(unmasked);
253+
} else {
254+
try response_writer.writeAll(raw_chunk);
255+
}
256+
}
257+
258+
// Flush unmask state
259+
if (active_entity_map) |em| {
260+
const flushed = try unmask_state.?.flushUnmask(em, allocator);
261+
defer allocator.free(flushed);
262+
if (flushed.len > 0) try response_writer.writeAll(flushed);
263+
}
200264
}
265+
266+
try response_writer.end();
201267
}
202268

203269
// ===========================================================================

0 commit comments

Comments
 (0)