Skip to content

Commit 913bdf0

Browse files
lquerelutpilla
andauthored
Second round setup (open-telemetry#343)
Key Changes: - Definition of the `Exporter` trait - Alignment of the `Processor` trait signature with other traits - Creation of a testing infrastructure for the three node types and adaptation of existing tests to this new framework - Added documentation on pipeline definition --------- Co-authored-by: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com>
1 parent a6898e9 commit 913bdf0

20 files changed

Lines changed: 1228 additions & 239 deletions

File tree

rust/otap-dataflow/.clippy.toml

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,21 @@ allow-expect-in-tests = true
44
allow-unwrap-in-tests = true
55
allow-dbg-in-tests = true
66
allow-print-in-tests = true
7-
disallowed-methods = []
7+
8+
# Disallow specific methods
9+
disallowed-methods = []
10+
11+
# Disallow specific types
12+
disallowed-types = [
13+
{ path = "once_cell::sync::Lazy", reason = "Please use `std::sync::LazyLock` instead." },
14+
]
15+
16+
# Disallow specific macros
17+
disallowed-macros = [
18+
{ path = "lazy_static::lazy_static", reason = "Please use `std::sync::LazyLock` instead." },
19+
]
20+
21+
doc-valid-idents = [
22+
"OTEL",
23+
"..", # add the default list
24+
]

rust/otap-dataflow/.cursor/rules/single-threaded-async-runtime.mdc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ alwaysApply: true
66
# Design Principles
77

88
- Target a single-threaded async runtime
9-
- Favor ?Send futures whenever possible
9+
- Declare async traits as `?Send`, providing `!Send` implementations and futures whenever practical
1010
- Avoid synchronization primitives as much as possible
1111
- Optimize for performance
1212
- Avoid unbounded channels and data structures

rust/otap-dataflow/ROADMAP.md

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,31 @@ significant changes.
2727
- [ ] Continuous benchmarking and performance tracking
2828
- [ ] Comparison with Go Collector
2929

30-
### Pipeline Engine Foundations
30+
### Pipeline Engine Foundations (phase 1)
3131

3232
- Channels
3333
- [x] MPMC Channel
3434
- [x] MPSC Channel
3535
- [x] Receiver trait
36-
- [WIP] Processor trait
37-
- [ ] Exporter trait
36+
- [x] Processor trait
37+
- [x] Exporter trait
3838
- [ ] Pipeline Engine
39-
- [ ] Initial benchmarks and documentation
39+
- [ ] CPU & Memory Benchmarks
40+
- [ ] Documentation
41+
42+
### Pipeline Engine Foundations (phase 2)
43+
44+
- Channels
45+
- [ ] SPSC Channel
46+
- [ ] Broadcast Channel
47+
- [ ] Connector trait
48+
- [ ] Instrumentation
49+
- [ ] Thread pinning
50+
- Benchmarks
51+
- [ ] CPU benchmarks
52+
- [ ] Memory benchmarks
53+
- [ ] CPU & Memory Benchmarks
54+
- [ ] Documentation
4055

4156
### OTLP Pipeline
4257

rust/otap-dataflow/crates/channel/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ rust-version.workspace = true
1313
workspace = true
1414

1515
[dependencies]
16-
thiserror.workspace = true
17-
tokio.workspace = true
16+
thiserror = { workspace = true }
17+
tokio = { workspace = true }

rust/otap-dataflow/crates/config/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ rust-version.workspace = true
1313
workspace = true
1414

1515
[dependencies]
16-
thiserror.workspace = true
17-
serde.workspace = true
18-
serde_json.workspace = true
16+
thiserror = { workspace = true }
17+
serde = { workspace = true }
18+
serde_json = { workspace = true }

rust/otap-dataflow/crates/engine/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ workspace = true
1616
otap-df-channel = { path = "../channel" }
1717
otap-df-config = { path = "../config" }
1818

19-
thiserror.workspace = true
20-
serde_json.workspace = true
21-
tokio.workspace = true
22-
async-trait.workspace = true
19+
thiserror = { workspace = true }
20+
serde_json = { workspace = true }
21+
tokio = { workspace = true }
22+
async-trait = { workspace = true }
2323

2424
socket2 = "0.5.8"

rust/otap-dataflow/crates/engine/README.md

Lines changed: 80 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,46 +2,83 @@
22

33
Status: **WIP**
44

5-
## Nodes
6-
7-
The Pipeline Engine consists of three main node types:
8-
9-
- Receiver: A node that receives or scrapes telemetry data from a telemetry
10-
source.
11-
- Processor: A node that processes telemetry data.
12-
- Exporter: A node that exports telemetry data to a telemetry sink.
13-
14-
Each node can receive control messages from the Pipeline Engine to manage its
15-
operation.
16-
17-
```mermaid
18-
graph LR
19-
RecvControlIn[Control Channel] --> Receiver
20-
Receiver((Receiver))
21-
Receiver --> RecvControlOut[Control Channel]
22-
Receiver --> RecvDataOut[Data Channel]
23-
class Receiver node
24-
class RecvControlIn,RecvControlOut controlQueue
25-
class RecvDataOut dataQueue
26-
27-
ProcControlIn[Control Channel] --> Processor
28-
ProcDataIn[Data Channel] --> Processor
29-
Processor((Processor))
30-
Processor --> ProcControlOut[Control Channel]
31-
Processor --> ProcDataOut[Data Channel]
32-
class Processor node
33-
class ProcControlIn,ProcControlOut controlQueue
34-
class ProcDataIn,ProcDataOut dataQueue
35-
36-
ExControlIn[Control Channel] --> Exporter
37-
ExDataIn[Data Channel] --> Exporter
38-
Exporter((Exporter))
39-
Exporter --> ExControlOut[Control Channel]
40-
class Exporter node
41-
class ExControlIn,ExControlOut controlQueue
42-
class ExDataIn,ExDataOut dataQueue
43-
44-
classDef controlQueue fill:#ffdddd,stroke:#cc0000,color:#000,stroke-width:0px,font-size:10px,padding:0px
45-
classDef dataQueue fill:#ddffdd,stroke:#009900,color:#000,stroke-width:0px,font-size:10px,padding:0px
46-
classDef node fill:#4a90e2,color:#ffffff,stroke-width:0px,font-size:12px
47-
```
5+
## Introduction
6+
7+
The term pipeline is used here to represent an interconnection of nodes forming
8+
a Directed Acyclic Graph (DAG). The inputs of a pipeline are called receivers,
9+
the intermediate processing nodes are called processors, and the output nodes
10+
are referred to as exporters.
11+
12+
Messages flowing through a pipeline are generically referred to as pdata (i.e.
13+
Pipeline Data). An OTLP message or an OTAP message are examples of pdata types.
14+
This pipeline framework is generic over pdata, which means:
15+
16+
- It is possible to instantiate an OTLP pipeline, an OTAP pipeline, or even a
17+
pipeline designed to support another type of pdata.
18+
- It is not possible to support multiple pdata types within a single pipeline.
19+
However, a fourth type of component, called a connector, can be used to bridge
20+
two pipelines and enable interoperability between different pdata types.
21+
22+
This terminology aligns well with the concepts introduced in the OTEL Go
23+
Collector.
24+
25+
## Architecture
26+
27+
A list of the design principles followed by this project can be found
28+
[here](../../docs/design-principles.md). More specifically, the pipeline engine
29+
implemented in this crate follows a share-nothing, thread-per-core approach.
30+
In particular, one instance of the pipeline engine is created per core. This
31+
engine:
32+
33+
- Is based on a single-threaded async runtime
34+
- Avoids synchronization mechanisms whenever possible
35+
- Declares async traits as `?Send`, providing `!Send` implementations and
36+
futures whenever practical
37+
- Relies on listening sockets configured with the `SO_REUSEPORT` option,
38+
allowing the OS to handle connection load balancing
39+
- May share immutable data between cores, but ideally only within a single NUMA
40+
node
41+
42+
These design principles focus on achieving high performance, predictability, and
43+
maintainability in an observability gateway implemented in Rust. Targeting a
44+
single-threaded async runtime reduces complexity, enhances cache locality, and
45+
lowers overhead. Favoring `!Send` futures and declaring async traits as `?Send`
46+
maximizes flexibility and allows performance gains by avoiding unnecessary
47+
synchronization. Minimizing synchronization primitives prevents contention and
48+
overhead, thus ensuring consistently low latency. Avoiding unbounded channels
49+
and data structures protects against unpredictable resource consumption,
50+
maintaining stable performance. Finally, limiting external dependencies reduces
51+
complexity, security risks, and maintenance effort, further streamlining the
52+
gateway’s operation and reliability.
53+
54+
## Control Messages
55+
56+
Each node in a pipeline can receive control messages, which must be handled with
57+
priority. These control messages are issued by a control entity (e.g. a pipeline
58+
engine) and are used to orchestrate the behavior of pipeline nodes. For example,
59+
configuring or reconfiguring nodes, coordinating acknowledgment mechanisms,
60+
stopping a pipeline, and more.
61+
62+
## Testability
63+
64+
All node types, as well as the pipeline engine itself, are designed for isolated
65+
testing. Practically, this means it's possible to test components like an OTLP
66+
Receiver independently, without needing to construct an entire pipeline. This
67+
approach facilitates rapid and precise identification of issues such as memory
68+
overconsumption, bottlenecks, or logical errors within individual nodes.
69+
70+
The engine provides an extensive `testing` module containing utilities tailored
71+
to each pipeline component:
72+
73+
- Defined test message types and control message counters for monitoring
74+
component behavior.
75+
- Dedicated test contexts and runtimes specifically built for receivers,
76+
processors, and exporters.
77+
- Single-threaded asynchronous runtime utilities aligned with the engine's
78+
non-Send design philosophy.
79+
- Convenient helper functions for establishing and managing communication
80+
channels between components.
81+
82+
These utilities streamline the process of validating individual component
83+
behaviors, significantly reducing setup effort while enabling comprehensive
84+
testing.

rust/otap-dataflow/crates/engine/src/error.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,37 @@ pub enum Error<T> {
2424
/// The error that occurred.
2525
error: std::io::Error,
2626
},
27+
28+
/// A wrapper for the receiver errors.
29+
#[error("A receiver error occurred in node {receiver}: {error}")]
30+
ReceiverError {
31+
/// The name of the receiver that encountered the error.
32+
receiver: NodeName,
33+
34+
/// The error that occurred.
35+
/// ToDo We probably need to use a more specific error type here (JSON Node?).
36+
error: String,
37+
},
38+
39+
/// A wrapper for the processor errors.
40+
#[error("A processor error occurred in node {processor}: {error}")]
41+
ProcessorError {
42+
/// The name of the processor that encountered the error.
43+
processor: NodeName,
44+
45+
/// The error that occurred.
46+
/// ToDo We probably need to use a more specific error type here (JSON Node?).
47+
error: String,
48+
},
49+
50+
/// A wrapper for the exporter errors.
51+
#[error("An exporter error occurred in node {exporter}: {error}")]
52+
ExporterError {
53+
/// The name of the exporter that encountered the error.
54+
exporter: NodeName,
55+
56+
/// The error that occurred.
57+
/// ToDo We probably need to use a more specific error type here (JSON Node?).
58+
error: String,
59+
},
2760
}

0 commit comments

Comments
 (0)