-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtypes.go
More file actions
84 lines (69 loc) · 2.81 KB
/
types.go
File metadata and controls
84 lines (69 loc) · 2.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package xray
import wiretappb "github.com/ethp2p/xray/proto/wiretap"
// Sink receives envelopes from the emitter.
type Sink interface {
// Write delivers an envelope to the sink.
// Returns false if the sink cannot accept (backpressure).
Write(env *wiretappb.Envelope) bool
// Close shuts down the sink.
Close() error
}
// Direction represents the direction of traffic flow.
type Direction int
const (
DirectionUnknown Direction = iota
DirectionIn
DirectionOut
)
// Tag represents a key with one or more values for message categorization.
type Tag struct {
Name string
Values []string
}
// DecodedMessage is produced by the async decode pipeline for each
// complete protocol message. Handlers receive these off the hot path.
type DecodedMessage struct {
StreamID uint32
ConnID uint32
Protocol string
Direction Direction
WireBytes int
Tags []Tag
Parsed any
}
// OnMessage processes a single decoded message. Called from a per-stream
// goroutine; never concurrent for the same stream, but multiple streams
// may call different OnMessage instances concurrently.
type OnMessage func(msg DecodedMessage)
// OnMessageFactory creates a per-stream OnMessage handler. Called once when
// a decoder matches the stream's protocol. Return nil to skip this stream.
type OnMessageFactory func(streamID uint32, protocol string) OnMessage
// EmitFunc is called once per decoded message.
// wireBytes is the number of bytes attributable to the message on the stream
// (including any framing bytes). parsed carries the decoder's native object
// (e.g. *pubsubpb.RPC) for downstream handlers.
type EmitFunc func(wireBytes int, tags []Tag, parsed any)
// StreamDecoder is a stateful decoder bound to one stream.
//
// ObserveRead and ObserveWrite may be called concurrently (read/write goroutines),
// but instrumentation may assume at most one goroutine calls each method at a time.
type StreamDecoder interface {
// ObserveRead consumes bytes read from the stream (remote -> local).
// Implementations must not retain data after returning.
ObserveRead(data []byte, emit EmitFunc) error
// ObserveWrite consumes bytes written to the stream (local -> remote).
// Implementations must not retain data after returning.
ObserveWrite(data []byte, emit EmitFunc) error
// BufferedRead returns bytes buffered waiting for a full message.
//
// This is used on stream close/reset to account for bytes that were read
// but never completed a full framed message (e.g., stream closed mid-message).
BufferedRead() int
// BufferedWrite returns bytes buffered waiting for a full message.
//
// This is used on stream close/reset to account for bytes that were written
// but never completed a full framed message (e.g., stream closed mid-message).
BufferedWrite() int
// Reset clears internal buffers (called on stream close/reset).
Reset()
}