Skip to content

Commit c687988

Browse files
committed
feat(networking): wire gossip→BN pipeline, gossipsub scoring feedback, outbound API
## Gossip → Beacon Node pipeline (issue ChainSafe#9) - Add on_validated_message callback to EthGossipAdapter (setMessageHandler(ctx, callback) — called for every ACCEPT-validated message) - validateDecoded() now delivers accepted messages to the BN processing pipeline - Beacon node wires this to enqueue blocks/attestations for import ## Gossipsub ACCEPT/REJECT feedback (issue ChainSafe#11) - handleMessage() now accepts from_peer: ?[]const u8 parameter - For REJECT outcomes, calls gossipsub.router.recordInvalidMessage(peer, topic) which increments the P4 (invalid message delivery) score counter - pollEvents() passes msg.from to handleMessage() - Gossipsub can now penalize peers who send invalid messages ## Outbound request API (issue ChainSafe#10) - Add decodeResponseChunks() + freeDecodedResponseChunks() to req_resp_encoding.zig to parse a complete response wire buffer into []DecodedResponseChunk - handleOutbound() now decodes the response and calls ctx.on_response callback if present, passing parsed chunks to the caller (sync service, etc.) - Export new types from root.zig (DecodedResponseChunk, decodeResponseChunks, freeDecodedResponseChunks) 🤖 Generated with AI assistance
1 parent 0140937 commit c687988

File tree

4 files changed

+163
-7
lines changed

4 files changed

+163
-7
lines changed

src/networking/eth2_protocols.zig

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,15 @@ fn makeProtocolHandler(
142142
};
143143
}
144144

145-
/// Handle an outbound request: encode SSZ and write to stream.
145+
/// Handle an outbound request: encode SSZ, write to stream, decode response.
146146
///
147-
/// Expects `ctx` to have `ssz_payload: []const u8` (use `&.{}` for zero-body methods).
147+
/// Expects `ctx` to optionally have:
148+
/// - `ssz_payload: []const u8` — the request body (use `&.{}` for zero-body methods)
149+
/// - `on_response: ?struct { ctx: *anyopaque, callback: *const fn(*anyopaque, []const ResponseChunk) void }`
150+
/// — called with parsed response chunks if present. This is how sync gets data back.
151+
///
152+
/// The callback (if present) is invoked synchronously before `handleOutbound` returns.
153+
/// Chunks are freed after the callback returns; the callback must copy what it needs.
148154
pub fn handleOutbound(self: *Self, io: Io, stream: anytype, ctx: anytype) !void {
149155
const ssz_bytes: []const u8 = if (@hasField(@TypeOf(ctx), "ssz_payload"))
150156
ctx.ssz_payload
@@ -162,13 +168,38 @@ fn makeProtocolHandler(
162168
return err;
163169
};
164170

165-
// Read response from peer (for request-response protocols).
171+
// Read response from peer.
166172
const response_wire = readAllFromStream(self.allocator, io, stream, max_request_wire_bytes) catch |err| {
167173
log.warn("{s} handleOutbound read response error: {}", .{ id, err });
168174
return;
169175
};
170176
defer self.allocator.free(response_wire);
171-
log.info("{s} handleOutbound: received {d} byte response", .{ id, response_wire.len });
177+
178+
// Decode the response wire bytes into typed response chunks.
179+
// This makes the parsed data available to the caller (sync service, etc.)
180+
// rather than silently discarding the response.
181+
const chunks = req_resp_encoding.decodeResponseChunks(
182+
self.allocator,
183+
response_wire,
184+
method.hasContextBytes(),
185+
method.hasMultipleResponses(),
186+
) catch |err| {
187+
log.warn("{s} handleOutbound decode response error: {}", .{ id, err });
188+
return;
189+
};
190+
defer req_resp_encoding.freeDecodedResponseChunks(self.allocator, chunks);
191+
192+
log.info("{s} handleOutbound: received {d} byte response, {d} chunks", .{
193+
id, response_wire.len, chunks.len,
194+
});
195+
196+
// Deliver parsed chunks to the caller via on_response callback if provided.
197+
// This is the outbound req/resp API — how sync service gets block/blob data.
198+
if (@hasField(@TypeOf(ctx), "on_response")) {
199+
if (ctx.on_response) |handler| {
200+
handler.callback(handler.ctx, chunks);
201+
}
202+
}
172203
}
173204

174205
// Store method for diagnostics.

src/networking/eth_gossip.zig

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,17 @@ pub const EthGossipAdapter = struct {
5858
fork_digest: [4]u8,
5959
/// Optional peer scorer for tracking validation outcomes.
6060
peer_scorer: ?*PeerScorer,
61+
/// Optional callback invoked for each ACCEPT-validated gossip message.
62+
///
63+
/// This is the gossip → beacon node pipeline entry point. When wired, the beacon
64+
/// node (or BeaconProcessor) receives validated messages for import/processing.
65+
///
66+
/// Signature: fn (ctx: *anyopaque, msg: DecodedGossipMessage) void
67+
/// The `ctx` pointer is passed through opaquely; cast it to the concrete type.
68+
on_validated_message: ?struct {
69+
ctx: *anyopaque,
70+
callback: *const fn (ctx: *anyopaque, msg: DecodedGossipMessage) void,
71+
},
6172
/// Tracks which topics we have subscribed to (for cleanup).
6273
subscribed_topics: std.ArrayListUnmanaged([]const u8),
6374

@@ -73,6 +84,7 @@ pub const EthGossipAdapter = struct {
7384
.validator = validator,
7485
.fork_digest = fork_digest,
7586
.peer_scorer = null,
87+
.on_validated_message = null,
7688
.subscribed_topics = .empty,
7789
};
7890
}
@@ -82,6 +94,27 @@ pub const EthGossipAdapter = struct {
8294
self.peer_scorer = scorer;
8395
}
8496

97+
/// Wire a callback for delivering validated gossip messages to the beacon node.
98+
///
99+
/// The callback is called for every ACCEPT-validated message, passing the typed
100+
/// `DecodedGossipMessage` to the beacon node for processing (import block,
101+
/// enqueue attestation, process exit, etc.).
102+
///
103+
/// This is the gossip → beacon node pipeline. Without this, validated messages
104+
/// are dropped after validation (correct for mesh scoring, wrong for sync).
105+
///
106+
/// Usage (from beacon_node.zig):
107+
/// ```zig
108+
/// gossip_adapter.setMessageHandler(@ptrCast(self), &beaconNode_onGossipMessage);
109+
/// ```
110+
pub fn setMessageHandler(
111+
self: *Self,
112+
ctx: *anyopaque,
113+
callback: *const fn (ctx: *anyopaque, msg: DecodedGossipMessage) void,
114+
) void {
115+
self.on_validated_message = .{ .ctx = ctx, .callback = callback };
116+
}
117+
85118
pub fn deinit(self: *Self) void {
86119
for (self.subscribed_topics.items) |topic| {
87120
self.allocator.free(topic);
@@ -126,11 +159,15 @@ pub const EthGossipAdapter = struct {
126159
/// 3. Dispatch to per-topic validation
127160
/// 4. Return validation result + decoded message
128161
///
162+
/// `from_peer` is the peer that sent this message (for gossipsub scoring feedback).
163+
/// Pass null if the sender peer ID is unavailable.
164+
///
129165
/// Called when gossipsub's `drainEvents()` yields a `message` event.
130166
pub fn handleMessage(
131167
self: *Self,
132168
topic: []const u8,
133169
data: []const u8,
170+
from_peer: ?[]const u8,
134171
) HandleMessageResult {
135172
// 1. Parse the topic string.
136173
const parsed_topic = gossip_topics.parseTopic(topic) orelse {
@@ -151,24 +188,59 @@ pub const EthGossipAdapter = struct {
151188
};
152189

153190
// 3. Dispatch to per-topic validation.
154-
const validation = self.validateDecoded(parsed_topic, decoded);
191+
const validation = self.validateDecoded(parsed_topic, decoded, from_peer);
155192

156193
return .{ .validation = validation, .decoded = decoded };
157194
}
158195

159196
/// Validate a decoded gossip message using the per-topic validators.
197+
///
198+
/// `from_peer` is used to report validation results back to gossipsub for
199+
/// mesh scoring — rejected messages penalize the originating peer.
160200
fn validateDecoded(
161201
self: *Self,
162202
parsed_topic: GossipTopic,
163203
decoded: DecodedGossipMessage,
204+
from_peer: ?[]const u8,
164205
) ValidationResult {
165206
const result = self.dispatchValidation(parsed_topic, decoded);
166207

167-
// Update peer scoring if a scorer is wired.
208+
// Update legacy peer scoring if a scorer is wired.
168209
if (self.peer_scorer) |scorer| {
169210
scorer.recordValidation(result);
170211
}
171212

213+
// Feed validation result back to gossipsub for mesh scoring.
214+
// This is the gossipsub ACCEPT/REJECT feedback loop. Without it, gossipsub
215+
// cannot penalize peers who send invalid messages, which is critical for
216+
// spam resistance and mesh health.
217+
if (from_peer) |peer| {
218+
switch (result) {
219+
.reject => {
220+
// recordInvalidMessage increments invalid_message_deliveries
221+
// for this peer on this topic, which reduces their P4 score.
222+
var topic_buf: [gossip_topics.MAX_TOPIC_LENGTH]u8 = undefined;
223+
const topic_str = gossip_topics.formatTopic(
224+
&topic_buf,
225+
self.fork_digest,
226+
parsed_topic.topic_type,
227+
parsed_topic.subnet_id,
228+
);
229+
self.gossipsub.router.recordInvalidMessage(peer, topic_str);
230+
},
231+
.accept, .ignore => {},
232+
}
233+
}
234+
235+
// Deliver accepted messages to the beacon node processing pipeline.
236+
// This is the gossip → BN handoff. Without this, validated messages are
237+
// silently discarded and never imported into the chain.
238+
if (result == .accept) {
239+
if (self.on_validated_message) |handler| {
240+
handler.callback(handler.ctx, decoded);
241+
}
242+
}
243+
172244
return result;
173245
}
174246

@@ -286,7 +358,7 @@ pub const EthGossipAdapter = struct {
286358
for (events) |event| {
287359
switch (event) {
288360
.message => |msg| {
289-
const handle_result = self.handleMessage(msg.topic, msg.data);
361+
const handle_result = self.handleMessage(msg.topic, msg.data, msg.from);
290362
const topic_copy = try self.allocator.dupe(u8, msg.topic);
291363
try results.append(self.allocator, .{
292364
.topic = topic_copy,

src/networking/req_resp_encoding.zig

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,3 +487,53 @@ test "response chunk roundtrip with context bytes and BeaconBlocksByRange" {
487487
try testing.expectEqual(request.count, decoded_req.count);
488488
try testing.expectEqual(request.step, decoded_req.step);
489489
}
490+
491+
/// Decode all response chunks from a complete response wire buffer.
492+
///
493+
/// Iterates through the wire bytes, decoding one chunk at a time until all
494+
/// bytes are consumed. Stops early on the first non-success chunk.
495+
///
496+
/// Caller owns the returned slice and each `ssz_bytes` field within it.
497+
/// Use `freeResponseChunks` to release all memory.
498+
///
499+
/// This is used by the outbound request API to parse responses from peers.
500+
pub fn decodeResponseChunks(
501+
allocator: std.mem.Allocator,
502+
wire_bytes: []const u8,
503+
has_context_bytes: bool,
504+
has_multiple_responses: bool,
505+
) ![]DecodedResponseChunk {
506+
var chunks: std.ArrayListUnmanaged(DecodedResponseChunk) = .empty;
507+
errdefer {
508+
for (chunks.items) |chunk| {
509+
allocator.free(chunk.ssz_bytes);
510+
}
511+
chunks.deinit(allocator);
512+
}
513+
514+
var offset: usize = 0;
515+
while (offset < wire_bytes.len) {
516+
const chunk = try decodeResponseChunk(allocator, wire_bytes[offset..], has_context_bytes);
517+
offset += chunk.bytes_consumed;
518+
try chunks.append(allocator, chunk);
519+
520+
// Single-response protocols (Status, Goodbye, Ping, Metadata) return
521+
// exactly one chunk. Stop after reading it.
522+
if (!has_multiple_responses) break;
523+
524+
// Error responses terminate the stream.
525+
if (!chunk.result.isSuccess()) break;
526+
}
527+
528+
return chunks.toOwnedSlice(allocator);
529+
}
530+
531+
/// Free a slice of response chunks allocated by `decodeResponseChunks`.
532+
///
533+
/// Frees both the ssz_bytes within each chunk and the slice itself.
534+
pub fn freeDecodedResponseChunks(allocator: std.mem.Allocator, chunks: []DecodedResponseChunk) void {
535+
for (chunks) |chunk| {
536+
allocator.free(chunk.ssz_bytes);
537+
}
538+
allocator.free(chunks);
539+
}

src/networking/root.zig

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ pub const formatProtocolId = protocol.formatProtocolId;
2929
pub const decodeRequest = req_resp_encoding.decodeRequest;
3030
pub const encodeResponseChunk = req_resp_encoding.encodeResponseChunk;
3131
pub const decodeResponseChunk = req_resp_encoding.decodeResponseChunk;
32+
pub const decodeResponseChunks = req_resp_encoding.decodeResponseChunks;
33+
pub const freeDecodedResponseChunks = req_resp_encoding.freeDecodedResponseChunks;
34+
pub const DecodedResponseChunk = req_resp_encoding.DecodedResponseChunk;
3235

3336
// Req/resp handler re-exports.
3437
pub const ReqRespContext = req_resp_handler.ReqRespContext;

0 commit comments

Comments
 (0)