Skip to content

Commit 8f0927f

Browse files
committed
[notify] Introduce StreamAllTransactions stream
Signed-off-by: Liran Funaro <liran.funaro@gmail.com>
1 parent 549bf5c commit 8f0927f

22 files changed

Lines changed: 750 additions & 151 deletions

cmd/config/app_config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func TestReadConfigSidecar(t *testing.T) {
7171
MaxTimeout: sidecar.DefaultNotificationMaxTimeout,
7272
MaxActiveTxIDs: sidecar.DefaultMaxActiveTxIDs,
7373
MaxTxIDsPerRequest: sidecar.DefaultMaxTxIDsPerRequest,
74+
StreamWriteTimeout: sidecar.DefaultStreamWriteTimeout,
7475
},
7576
LastCommittedBlockSetInterval: sidecar.DefaultLastCommittedBlockSetInterval,
7677
WaitingTxsLimit: sidecar.DefaultWaitingTxsLimit,
@@ -120,6 +121,7 @@ func TestReadConfigSidecar(t *testing.T) {
120121
MaxTimeout: 10 * time.Minute,
121122
MaxActiveTxIDs: 100_000,
122123
MaxTxIDsPerRequest: 1000,
124+
StreamWriteTimeout: 30 * time.Second,
123125
},
124126
LastCommittedBlockSetInterval: sidecar.DefaultLastCommittedBlockSetInterval,
125127
WaitingTxsLimit: 20_000_000,

cmd/config/samples/sidecar.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ notification:
124124
max-active-tx-ids: 100_000
125125
# Default: 1,000
126126
max-tx-ids-per-request: 1000
127+
# Timeout for writing notifications to the stream.
128+
# If a client cannot consume notifications within this timeout, it will be disconnected
129+
# to prevent blocking the notification service.
130+
# Default: 30s
131+
stream-write-timeout: 30s
127132

128133
# How often the sidecar updates the coordinator with the last committed block number.
129134
# More frequent updates improve coordinator accuracy for dependency resolution but

cmd/config/viper.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func NewViperWithSidecarDefaults() *viper.Viper {
4545
v.SetDefault("notification.max-timeout", sidecar.DefaultNotificationMaxTimeout)
4646
v.SetDefault("notification.max-active-tx-ids", sidecar.DefaultMaxActiveTxIDs)
4747
v.SetDefault("notification.max-tx-ids-per-request", sidecar.DefaultMaxTxIDsPerRequest)
48+
v.SetDefault("notification.stream-write-timeout", sidecar.DefaultStreamWriteTimeout)
4849
v.SetDefault("last-committed-block-set-interval", sidecar.DefaultLastCommittedBlockSetInterval)
4950
v.SetDefault("waiting-txs-limit", sidecar.DefaultWaitingTxsLimit)
5051
v.SetDefault("channel-buffer-size", sidecar.DefaultBufferSize)

docs/architecture.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ The **Query Service** operates independently of the commit pipeline. Clients and
9393

9494
The Sidecar acts as the critical bridge between the Hyperledger Fabric-X Ordering Service and the Committer's internal processing pipeline. It is responsible for fetching blocks sequentially from the Ordering Service, performing initial transaction validation to filter out malformed transactions, and maintaining a durable local block store on the file system.
9595

96-
Beyond block ingestion, the Sidecar serves as the delivery endpoint for clients, providing committed blocks to registered applications and offering a notification service for transaction status updates. It also exposes query APIs that allow clients to fetch historical blocks and transactions directly from the block store.
96+
Beyond block ingestion, the Sidecar serves as the delivery endpoint for clients, providing committed blocks to registered applications and offering two notification mechanisms: transaction ID subscription for tracking specific transactions, and an all-transactions stream for monitoring all blockchain activity with optional filtering. It also exposes query APIs that allow clients to fetch historical blocks and transactions directly from the block store.
9797

9898
**Key Characteristics:**
9999

docs/notification-service.md

Lines changed: 123 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,42 @@ SPDX-License-Identifier: Apache-2.0
66

77
# Notification Service — Client Usage Guide
88

9-
The Sidecar exposes a Notification Service that allows clients to subscribe to transaction status
10-
updates and receive asynchronous notifications when transactions are committed, rejected, or aborted.
11-
This is the primary mechanism for clients that submit transactions asynchronously to the Ordering
12-
Service to learn the outcome of their transactions, without polling or scanning the entire block stream.
9+
The Sidecar exposes a Notification Service that provides two mechanisms for receiving transaction status updates:
1310

14-
The Notification Service uses a bidirectional gRPC stream: clients send subscription requests
15-
containing transaction IDs of interest, and the server pushes status responses as transactions
16-
complete. Multiple subscription requests can be sent on the same stream.
11+
1. **Transaction ID Subscription** — Allows clients to subscribe to transaction status
12+
updates and receive asynchronous notifications when transactions are committed, rejected, or aborted.
13+
This is the primary mechanism for clients that submit transactions asynchronously to the Ordering
14+
Service to learn the outcome of their transactions, without polling or scanning the entire block stream.
15+
It uses a bidirectional gRPC stream: clients send subscription requests
16+
containing transaction IDs of interest, and the server pushes status responses as transactions
17+
complete. Multiple subscription requests can be sent on the same stream.
18+
19+
2. **All Transactions Stream** — Allows clients to subscribe to a stream of all committed transactions in block order,
20+
with optional filtering by namespace and status. This is useful for audit, monitoring, event-driven applications, and
21+
replication systems.
1722

1823
For internal architecture details, see [sidecar.md — Section 6](sidecar.md#6-notification-service).
1924

2025
## 1. API Definition
2126

22-
The Notification Service is defined as a bidirectional streaming RPC:
27+
The Notification Service provides two streaming RPCs:
2328

2429
From [fabric-x-common/api/committerpb](https://github.com/hyperledger/fabric-x-common)
2530

2631
```protobuf
2732
service Notifier {
33+
// Subscribe to specific transaction IDs
2834
rpc OpenNotificationStream (stream NotificationRequest) returns (stream NotificationResponse);
35+
36+
// Subscribe to all committed transactions
37+
rpc StreamAllTransactions (StreamAllRequest) returns (stream TxBatch);
2938
}
3039
```
3140

41+
## 2. Transaction ID Subscription API
42+
43+
### 2.1. API Definition
44+
3245
The client sends `NotificationRequest` messages, each containing a batch of transaction IDs to watch
3346
and a timeout:
3447

@@ -70,7 +83,7 @@ message RejectedTxIds {
7083
}
7184
```
7285

73-
## 2. Subscribing to Transaction Status Updates
86+
### 2.2. Subscribing to Transaction Status Updates
7487

7588
Create a gRPC connection to the Sidecar, open a notification stream, and send a
7689
`NotificationRequest` with the transaction IDs to watch:
@@ -103,7 +116,7 @@ err = stream.Send(&committerpb.NotificationRequest{
103116
Multiple `NotificationRequest` messages can be sent on the same stream. Each request is
104117
tracked independently with its own timeout.
105118

106-
## 3. Receiving Notifications
119+
## 2.3. Receiving Notifications
107120

108121
The client receives `NotificationResponse` messages by calling `Recv()` on the stream.
109122
Each response contains one of the following payloads:
@@ -156,7 +169,7 @@ Responses are batched per stream for efficiency — if multiple subscribed trans
156169
complete in the same coordinator status update, they are grouped into a single
157170
`NotificationResponse`.
158171

159-
## 4. Recommended Client Pattern
172+
### 2.4. Recommended Client Pattern
160173

161174
To avoid missing notifications, clients should follow this sequence:
162175

@@ -170,27 +183,119 @@ If the notification stream breaks (e.g., sidecar restart) or the timeout expires
170183
the transaction completes, the client should fall back to the Block Query API to check
171184
the transaction status.
172185

173-
## 5. Concurrency Limits
186+
## 3. All Transactions Stream API
187+
188+
### 3.1. API Definition
189+
190+
The `StreamAllTransactions` RPC provides a server-streaming interface that delivers all committed transactions in block
191+
order. Unlike the transaction ID subscription API, this stream does not require clients to know transaction IDs in
192+
advance.
193+
194+
```protobuf
195+
message StreamAllRequest {
196+
repeated string filter_namespaces = 1; // Optional: filter by namespace(s)
197+
repeated Status filter_status = 2; // Optional: filter by status
198+
bool include_read_write_sets = 3; // Optional: include read/write sets
199+
bool include_endorsements = 4; // Optional: include endorsements
200+
}
201+
202+
message TxBatch {
203+
repeated TxEvent events = 1;
204+
}
205+
206+
message TxEvent {
207+
TxRef ref = 1; // Transaction reference
208+
Status status = 2; // Transaction status
209+
repeated TxNamespace namespaces = 3; // Namespaces (if filtering enabled)
210+
repeated Endorsement endorsements = 4; // Endorsements (if requested)
211+
}
212+
```
213+
214+
### 3.2. Opening a Stream
215+
216+
Create a gRPC connection to the Sidecar and call `StreamAllTransactions`:
217+
218+
```go
219+
client := committerpb.NewNotifierClient(conn)
220+
stream, err := client.StreamAllTransactions(ctx, &committerpb.StreamAllRequest{
221+
FilterNamespaces: []string{"namespace-1", "namespace-2"},
222+
FilterStatus: []committerpb.Status{committerpb.Status_COMMITTED},
223+
IncludeReadWriteSets: true,
224+
IncludeEndorsements: false,
225+
})
226+
```
227+
228+
**Request Parameters:**
229+
230+
- **`filter_namespaces`** (optional): If specified, only transactions touching these namespaces are included. Empty
231+
means all namespaces. Multiple namespaces use OR logic: a transaction is included if it touches any of the specified
232+
namespaces.
233+
234+
- **`filter_status`** (optional): If specified, only transactions with these status codes are included. Empty means all
235+
statuses.
236+
237+
- **`include_read_write_sets`** (optional): If true, the `namespaces` field in each `TxEvent`
238+
includes the read/write sets for each namespace. Default: false.
239+
240+
- **`include_endorsements`** (optional): If true, the `endorsements` field in each `TxEvent`
241+
includes the transaction endorsements. Default: false.
242+
243+
### 3.3. Receiving Transaction Events
244+
245+
The server sends `TxBatch` messages containing one or more `TxEvent` entries. Transactions are batched by block. All
246+
transactions from the same block are delivered in a single `TxBatch`.
247+
248+
```go
249+
for {
250+
batch, err := stream.Recv()
251+
...
252+
253+
for _, event := range batch.Events {
254+
...
255+
}
256+
}
257+
```
258+
259+
### 3.4. Stream Behavior
260+
261+
**Block Order Guarantee:**
262+
Transactions are streamed in deterministic block order. All transactions from block N are delivered before any
263+
transactions from block N+1.
264+
265+
**Starting Point:**
266+
The stream starts from the currently processed block when the client connects. Historical transactions are not included.
267+
268+
**No Recovery:**
269+
If the stream disconnects, the client must reconnect and will resume from the current block. Transactions from missed
270+
blocks are not replayed. For guaranteed delivery, use the Block Delivery API instead.
271+
272+
**Backpressure:**
273+
If the client cannot keep up with the transaction rate, the server will block sending to that client. The server uses a
274+
configurable write timeout (default: 30 seconds) to prevent slow clients from blocking the system.
275+
276+
## 4. Concurrency Limits
174277

175278
The Notification Service shares the server's `max-concurrent-streams` limit (default: 10)
176279
with the Block Delivery streams ([Section 5 of sidecar.md](sidecar.md#5-block-delivery-service)).
177-
Both stream types compete for the same pool of stream slots.
280+
All stream types compete for the same pool of stream slots.
178281

179282
When the limit is reached, new stream requests are rejected with a gRPC `ResourceExhausted`
180283
status code. Clients should handle this error with appropriate backoff and retry logic.
181284

182-
## 6. Configuration
285+
## 5. Configuration
183286

184287
The following configuration options in `sidecar.yaml` control notification behavior:
185288

186-
| Setting | Default | Description |
187-
|---------|---------|-------------|
188-
| `notification.max-timeout` | `1m` | Upper limit on per-request timeout. Client timeouts are capped to this value. |
189-
| `server.max-concurrent-streams` | `10` | Maximum concurrent streaming RPCs across all stream types (Deliver + Notification). |
289+
| Setting | Default | Description |
290+
|-------------------------------------|---------|-------------------------------------------------------------------------------------------------|
291+
| `notification.max-timeout` | `1m` | Upper limit on per-request timeout for transaction ID subscriptions. |
292+
| `notification.stream-write-timeout` | `30s` | Write timeout for all transactions stream. Prevents slow clients from blocking. |
293+
| `server.max-concurrent-streams` | `10` | Maximum concurrent streaming RPCs across all stream types (Deliver + Notification + StreamAll). |
190294

191295
Sample configuration:
192296

193297
```yaml
194298
notification:
195299
max-timeout: 10m
300+
stream-write-timeout: 10s
196301
```

docs/sidecar.md

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ The Sidecar performs four main tasks:
3838
4. **Persist Committed Blocks:** Stores blocks confirmed as committed by the Coordinator in a local, append-only file
3939
store for durability and auditability.
4040
5. **Deliver to Clients:** Delivers the committed blocks to registered client applications.
41-
6. **Notification:** Notifies subscribers when transactions are committed or aborted, on a per-transaction ID basis.
41+
6. **Notification:** Provides two notification mechanisms:
42+
- **Transaction ID Subscription:** Notifies subscribers when specific transactions (by ID) are committed or aborted.
43+
- **All Transactions Stream:** Streams all committed transactions in block order with optional filtering.
4244

4345
Note that the fourth task is executed only when users/clients creates a stream with the sidecar.
4446

@@ -287,6 +289,8 @@ the status updates for all transactions within that block using `blockWithStatus
287289
```go
288290
blockWithStatus struct {
289291
block *common.Block
292+
blockNumber uint64
293+
txs []*protoblocktx.Tx
290294
txStatus []validationCode
291295
txIDToTxIndex map[string]int
292296
pendingCount int
@@ -303,9 +307,14 @@ A block is only considered fully processed and ready for commitment when two con
303307

304308
**g. Enqueueing Committed Blocks**:
305309

306-
When a block satisfies the criteria outlined in step f, the relay component first appends the collected
307-
transaction statuses within the metadata of the original Fabric block (sourced from `blocksToBeCommitted`).
308-
Subsequently, this modified block is enqueued onto the `committedBlocks` output channel.
310+
When a block satisfies the criteria outlined in step f, the relay component performs two actions:
311+
312+
1. Appends the collected transaction statuses within the metadata of the original Fabric block
313+
(sourced from `blocksToBeCommitted`) and enqueues it onto the `committedBlocks` output channel
314+
for persistence.
315+
316+
2. Creates a `committedBlockWithTxs` structure containing the block number, transaction list, and
317+
status information, and sends it to the notification service for distribution to subscribers.
309318

310319
### Task 3. Persisting Committed Block in the File System
311320

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ require (
2121
github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0
2222
github.com/hyperledger/fabric-lib-go v1.1.3
2323
github.com/hyperledger/fabric-protos-go-apiv2 v0.3.7
24-
github.com/hyperledger/fabric-x-common v0.2.5-0.20260526151648-855f12b2d438
24+
github.com/hyperledger/fabric-x-common v0.2.5
2525
github.com/jackc/puddle/v2 v2.2.2
2626
github.com/mitchellh/mapstructure v1.5.0
2727
github.com/prometheus/client_golang v1.23.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ github.com/hyperledger/fabric-lib-go v1.1.3 h1:alvgtBlbm373P3BLIxdHKVT4I64mORDwQ
188188
github.com/hyperledger/fabric-lib-go v1.1.3/go.mod h1:zwnGaLwmQ/usgC6Xbzh8jCnOniViRHSsg7WYErxbr30=
189189
github.com/hyperledger/fabric-protos-go-apiv2 v0.3.7 h1:sQ5qv8vQQfwewa1JlCiSCC8dLElmaU2/frLolpgibEY=
190190
github.com/hyperledger/fabric-protos-go-apiv2 v0.3.7/go.mod h1:bJnwzfv03oZQeCc863pdGTDgf5nmCy6Za3RAE7d2XsQ=
191-
github.com/hyperledger/fabric-x-common v0.2.5-0.20260526151648-855f12b2d438 h1:yVjqCihBd0YJ58XyldjlrGc+7+87fIftQE+eVXwrPlE=
192-
github.com/hyperledger/fabric-x-common v0.2.5-0.20260526151648-855f12b2d438/go.mod h1:EdyBG6jVFYYxJ5DgjxGVLE/Lav3GPhdftABZv5j+ttg=
191+
github.com/hyperledger/fabric-x-common v0.2.5 h1:AhyeIfAzLvLGIjf7RjDW5JPcnks2iNhpuek5rA1dmII=
192+
github.com/hyperledger/fabric-x-common v0.2.5/go.mod h1:EdyBG6jVFYYxJ5DgjxGVLE/Lav3GPhdftABZv5j+ttg=
193193
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
194194
github.com/imkira/go-interpol v1.1.0 h1:KIiKr0VSG2CUW1hl1jpiyuzuJeKUUpC8iM1AIE7N1Vk=
195195
github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA=

integration/runner/runtime.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type (
6060
SidecarClientConfig *connection.ClientConfig
6161
NotifyClient committerpb.NotifierClient
6262
NotifyStream committerpb.Notifier_OpenNotificationStreamClient
63+
StreamAllTxStream committerpb.Notifier_StreamAllTransactionsClient
6364

6465
CommittedBlock chan *common.Block
6566
TxBuilder *workload.TxBuilder
@@ -303,6 +304,8 @@ func (c *CommitterRuntime) OpenNotificationStream(ctx context.Context, t *testin
303304
var err error
304305
c.NotifyStream, err = c.NotifyClient.OpenNotificationStream(ctx)
305306
require.NoError(t, err)
307+
c.StreamAllTxStream, err = c.NotifyClient.StreamAllTransactions(ctx, nil)
308+
require.NoError(t, err)
306309
}
307310

308311
// Start runs all services and load generator as configured by the serviceFlags.
@@ -549,7 +552,6 @@ func (c *CommitterRuntime) ValidateExpectedResultsInCommittedBlock(t *testing.T,
549552
persistedTxIDsStatus := make([]*committerpb.TxStatus, 0, len(expected.TxIDs))
550553
duplicateTxIDsStatus := make([]*committerpb.TxStatus, 0, len(expected.TxIDs))
551554
for i, tID := range expected.TxIDs {
552-
//nolint:gosec // int -> uint32.
553555
s := committerpb.NewTxStatus(expected.Statuses[i], tID, blk.Header.Number, uint32(i))
554556
if s.Status == committerpb.Status_REJECTED_DUPLICATE_TX_ID {
555557
duplicateTxIDsStatus = append(duplicateTxIDsStatus, s)
@@ -575,6 +577,7 @@ func (c *CommitterRuntime) ValidateExpectedResultsInCommittedBlock(t *testing.T,
575577
}
576578

577579
sidecar.RequireNotifications(t, c.NotifyStream, blk.Header.Number, expected.TxIDs, expected.Statuses)
580+
sidecar.RequireStreamAllTransactions(t, c.StreamAllTxStream, blk.Header.Number, expected.TxIDs, expected.Statuses)
578581
}
579582

580583
// CountStatus returns the number of transactions with a given tx status.

0 commit comments

Comments
 (0)