Skip to content

Commit 0d9e838

Browse files
authored
Per-process manifest for targeted snapshot requests (#13)
1 parent 758ea4f commit 0d9e838

15 files changed

Lines changed: 321 additions & 94 deletions

docs/api.md

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,24 @@ The full public API is declared in [`include/bacaro.h`](../include/bacaro.h).
55
## Lifecycle
66

77
```c
8-
bacaro_t *bacaro_new (const char *name);
8+
bacaro_t *bacaro_new (const char *name, const char **published_domains);
99
void bacaro_destroy(bacaro_t **self);
1010
```
1111
1212
`bacaro_new` creates a new instance with the given process name, binds its sockets, and starts peer discovery. Returns `NULL` on failure.
1313
14-
`bacaro_destroy` disconnects all peers, removes IPC files, and frees all resources. Sets `*self` to `NULL`.
14+
`published_domains` is an optional NULL-terminated array of domain strings that this process will publish. When provided, a `.manifest` file is written alongside the `.pub` and `.rep` files so that peers can skip snapshot requests for non-overlapping domains. Pass `NULL` if you don't want to declare a manifest — the bus works identically without one.
15+
16+
```c
17+
// Declare published domains (optional optimisation)
18+
const char *domains[] = {"sensors", "power", NULL};
19+
bacaro_t *b = bacaro_new("powerd", domains);
20+
21+
// No manifest — backwards compatible
22+
bacaro_t *b = bacaro_new("monitor", NULL);
23+
```
24+
25+
`bacaro_destroy` disconnects all peers, removes IPC files (including the `.manifest` if one was written), and frees all resources. Sets `*self` to `NULL`.
1526

1627
## Subscriptions
1728

@@ -108,7 +119,8 @@ typedef void (*bacaro_fn)(bacaro_t *self, const char *path,
108119
#include <msgpack.hpp>
109120
110121
// Publisher
111-
bacaro_t *pub = bacaro_new("powerd");
122+
const char *domains[] = {"system", "power", nullptr};
123+
bacaro_t *pub = bacaro_new("powerd", domains);
112124
113125
msgpack::sbuffer buf;
114126
msgpack::pack(buf, 87.3);
@@ -124,7 +136,7 @@ static void on_update(bacaro_t *self, const char *path,
124136
// called on every received update
125137
}
126138
127-
bacaro_t *sub = bacaro_new("monitor");
139+
bacaro_t *sub = bacaro_new("monitor", nullptr);
128140
bacaro_subscribe(sub, "system");
129141
bacaro_on_update(sub, on_update, NULL);
130142

docs/architecture.md

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,18 @@ Each process uses a **single shared SUB socket** that connects to every peer's P
2929
1. Generate a UUID for this instance.
3030
2. Resolve the runtime directory (`BACARO_RUNTIME_DIR`, default `/tmp/bacaro`).
3131
3. Create the runtime directory if it does not exist.
32-
4. Bind a `ZMQ_PUB` socket → IPC file `<name>.<uuid>.pub`
33-
5. Bind a `ZMQ_ROUTER` socket → IPC file `<name>.<uuid>.rep`
34-
6. Call `discovery_init` (see Discovery below).
32+
4. If `published_domains` is non-NULL, write a `.manifest` file (`<name>.<uuid>.manifest`) listing one domain per line — written **before** discovery so peers see it when they find the `.pub` file.
33+
5. Bind a `ZMQ_PUB` socket → IPC file `<name>.<uuid>.pub`
34+
6. Bind a `ZMQ_ROUTER` socket → IPC file `<name>.<uuid>.rep`
35+
7. Call `discovery_init` (see Discovery below).
3536

3637
The UUID in the filename prevents collisions when a process restarts quickly under the same name.
3738

3839
### Shutdown (`bacaro_destroy`)
3940

4041
1. `discovery_cleanup`: disconnect all peers, close epoll and inotify fds.
4142
2. Set `ZMQ_LINGER = 200ms` on the PUB socket before closing, so any recently published messages have time to flush to connected peers (see [Slow-Joiner Note](#slow-joiner-note)).
42-
3. Close and remove the `.pub` and `.rep` IPC files.
43+
3. Close and remove the `.pub`, `.rep`, and `.manifest` IPC files.
4344
4. Destroy the ZMQ context.
4445

4546
---
@@ -69,10 +70,9 @@ Triggered by a new `.pub` file appearing (either from the initial scan or from a
6970
1. Extract the peer name from the filename (everything before the first `.`).
7071
2. Derive the `.rep` filename by replacing the `.pub` suffix.
7172
3. Call `zmq_connect` on the **shared SUB socket** to the peer's PUB endpoint.
72-
4. Create a per-peer `ZMQ_DEALER` socket, connect to `ipc://<runtime_dir>/<rep_filename>`.
73-
5. Register the DEALER socket's ZMQ fd with epoll.
74-
6. Record the peer in `peers` and `dealer_fd_to_filename` maps. Store the PUB endpoint string for later `zmq_disconnect`.
75-
7. Send a snapshot request for each currently subscribed domain prefix.
73+
4. Read the peer's `.manifest` file if present; store declared domains and `has_manifest` flag in `PeerInfo`.
74+
5. Record the peer in the `peers` map, storing both the PUB and REP endpoints.
75+
6. **Lazy DEALER**: if the process has active subscriptions, iterate them. For each prefix that overlaps with the peer's manifest (or if the peer has no manifest), create a per-peer `ZMQ_DEALER` socket, connect to the REP endpoint, register its fd with epoll, and send a snapshot request. Peers with a manifest that doesn't overlap any active subscription get no DEALER socket at all.
7676

7777
### Peer disconnect (`discovery_peer_disconnect`)
7878

@@ -257,7 +257,10 @@ The main handle. One per process. Contains:
257257
### `PeerInfo`
258258

259259
Per-peer connection state:
260-
- `dealer_sock` — per-peer ZMQ DEALER socket for snapshot protocol
260+
- `dealer_sock` — per-peer ZMQ DEALER socket for snapshot protocol (created lazily, may be `nullptr`)
261261
- `name` — peer process name (extracted from filename)
262262
- `pub_endpoint` — stored for `zmq_disconnect` on the shared SUB socket
263-
- `dealer_fd` — ZMQ fd registered with epoll
263+
- `rep_endpoint` — stored for lazy DEALER connect
264+
- `dealer_fd` — ZMQ fd registered with epoll (`-1` until DEALER is created)
265+
- `manifest` — declared published domains read from the peer's `.manifest` file
266+
- `has_manifest``true` if a `.manifest` file was found at discovery time

docs/concepts.md

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ There is no central broker. Each process:
2222

2323
1. Binds a `ZMQ_PUB` socket and a `ZMQ_ROUTER` socket, creating IPC files in the runtime directory.
2424
2. Watches the runtime directory with **inotify** for new peers.
25-
3. Connects a `ZMQ_SUB` socket and a `ZMQ_DEALER` socket to each peer on discovery.
26-
4. Requests a **snapshot** of current state from each peer on connect, seeding its local cache.
25+
3. Connects a single shared `ZMQ_SUB` socket to each peer's PUB endpoint on discovery.
26+
4. Creates a per-peer `ZMQ_DEALER` socket lazily — only when a subscription overlaps with the peer's manifest (or when the peer has no manifest).
27+
5. Requests a **snapshot** of current state from each relevant peer on connect, seeding its local cache.
2728

2829
After the initial snapshot, live `PUB/SUB` updates keep every subscriber's cache current.
2930

@@ -33,7 +34,7 @@ Any process can publish on any path at any time. There is no ownership enforceme
3334

3435
## Publisher identity
3536

36-
No identity frame is added to the wire format. The publisher's name is inferred from which IPC socket the message arrived on — zero overhead.
37+
The publisher's name is carried explicitly as a frame in the wire format. This is necessary because all messages from all peers arrive on a single shared SUB socket — there is no per-peer socket to infer identity from.
3738

3839
## Late join / snapshot protocol
3940

@@ -43,14 +44,30 @@ Bacaro solves this automatically: every time a new peer is discovered or a new s
4344

4445
This means `bacaro_get` always returns the latest known value immediately, without any blocking network call.
4546

47+
## Manifest
48+
49+
An optional optimisation for deployments where the set of processes and their published domains is known at build time.
50+
51+
When calling `bacaro_new`, pass a NULL-terminated array of domain strings that the process will publish:
52+
53+
```c
54+
const char *domains[] = {"sensors", "power", NULL};
55+
bacaro_t *b = bacaro_new("powerd", domains);
56+
```
57+
58+
Bacaro writes this list to a `.manifest` file alongside the `.pub` file. When peers discover the process, they read the manifest and only open a DEALER socket (and request a snapshot) if their subscriptions overlap with the declared domains. Peers with no overlapping subscriptions skip the handshake entirely, reducing boot-time connection overhead.
59+
60+
Passing `NULL` disables the manifest — the bus works identically without one. The manifest is an optimisation hint, not access control: a process can still publish on any path regardless of what it declared.
61+
4662
## Wire format
4763
48-
Every published message is a three-frame ZMQ multipart message:
64+
Every published message is a four-frame ZMQ multipart message:
4965
5066
```
51-
Frame 0 — topic : "domain.sub.property" (ZMQ SUB prefix filter)
52-
Frame 1 — header : version(1) | flags(1) | sequence(8) | timestamp(8)
53-
Frame 2 — payload : MessagePack bytes
67+
Frame 0 — topic : "domain.sub.property" (ZMQ SUB prefix filter)
68+
Frame 1 — publisher : publisher process name
69+
Frame 2 — header : version(1) | flags(1) | sequence(8) | timestamp(8)
70+
Frame 3 — payload : MessagePack bytes
5471
```
5572
5673
Snapshot request/reply uses the same framing over the `DEALER/ROUTER` pair, with a flag byte indicating request, reply, or end-of-snapshot.

include/bacaro.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@ typedef void (*bacaro_fn)(bacaro_t *self,
2525
const uint8_t *data, size_t len,
2626
void *userdata);
2727

28-
/* Lifecycle */
29-
bacaro_t *bacaro_new (const char *name);
28+
/* Lifecycle.
29+
published_domains is an optional NULL-terminated array of domain prefixes
30+
this process will publish. If provided, a manifest file is written so peers
31+
can skip snapshot requests for non-overlapping domains. Pass NULL to opt out
32+
(all peers will snapshot as usual). */
33+
bacaro_t *bacaro_new (const char *name, const char **published_domains);
3034
void bacaro_destroy(bacaro_t **self);
3135

3236
/* Subscriptions */

llms.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ Optional build flags (CMake option or env variable):
4242

4343
```c
4444
// Lifecycle
45-
bacaro_t *bacaro_new(const char *name);
45+
bacaro_t *bacaro_new(const char *name, const char **published_domains);
4646
void bacaro_destroy(bacaro_t **self);
4747

4848
// Subscriptions

src/bacaro.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <sys/epoll.h>
77
#include <cstdlib>
88
#include <filesystem>
9+
#include <fstream>
910
#include <iomanip>
1011
#include <random>
1112
#include <sstream>
@@ -77,7 +78,18 @@ static void drain_zmq(void *sock, const std::function<void(void *)> &handler)
7778

7879
// ── Lifecycle ─────────────────────────────────────────────────────────────────
7980

80-
bacaro_t *bacaro_new(const char *name)
81+
static void write_manifest(bacaro_t *self, const char **published_domains)
82+
{
83+
if (!published_domains)
84+
return;
85+
86+
std::string path = ipc_path(self, ".manifest");
87+
std::ofstream out(path);
88+
for (const char **p = published_domains; *p; ++p)
89+
out << *p << '\n';
90+
}
91+
92+
bacaro_t *bacaro_new(const char *name, const char **published_domains)
8193
{
8294
if (!name || name[0] == '\0')
8395
return nullptr;
@@ -96,6 +108,9 @@ bacaro_t *bacaro_new(const char *name)
96108
if (ec)
97109
goto fail;
98110

111+
// Write manifest before discovery so peers see it when they find our .pub
112+
write_manifest(self, published_domains);
113+
99114
self->zmq_ctx = zmq_ctx_new();
100115
if (!self->zmq_ctx)
101116
goto fail;
@@ -155,6 +170,9 @@ void bacaro_destroy(bacaro_t **self_ptr)
155170
close_and_remove(self->pub_sock, ".pub");
156171
close_and_remove(self->router_sock, ".rep");
157172

173+
// Remove manifest file if it exists
174+
{ std::error_code ec; fs::remove(ipc_path(self, ".manifest"), ec); }
175+
158176
if (self->zmq_ctx) {
159177
zmq_ctx_destroy(self->zmq_ctx);
160178
self->zmq_ctx = nullptr;
@@ -181,8 +199,11 @@ int bacaro_subscribe(bacaro_t *self, const char *domain)
181199
// Apply to the shared SUB socket (covers all connected peers)
182200
zmq_setsockopt(self->sub_sock, ZMQ_SUBSCRIBE, prefix.c_str(), prefix.size());
183201

184-
// Ensure each peer has a DEALER socket and request snapshot
202+
// Ensure each peer has a DEALER socket and request snapshot.
203+
// Skip peers whose manifest doesn't overlap with this subscription.
185204
for (auto &[filename, peer] : self->peers) {
205+
if (!manifest_overlaps(peer, prefix))
206+
continue;
186207
if (discovery_ensure_dealer(self, filename, peer) == BACARO_OK)
187208
snapshot_send_request(peer.dealer_sock, prefix);
188209
}

src/discovery.cpp

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <sys/inotify.h>
66
#include <unistd.h>
77
#include <filesystem>
8+
#include <fstream>
89
#include <cstring>
910

1011
namespace fs = std::filesystem;
@@ -73,6 +74,27 @@ void discovery_epoll_remove_zmq(bacaro_t *self, void *sock)
7374
epoll_modify(self->epoll_fd, get_zmq_fd(sock), EPOLL_CTL_DEL);
7475
}
7576

77+
static void read_manifest(const bacaro_t *self, const std::string &pub_filename,
78+
std::vector<std::string> &out, bool &found)
79+
{
80+
// Replace .pub suffix with .manifest
81+
std::string manifest_file = self->runtime_dir + "/"
82+
+ pub_filename.substr(0, pub_filename.size() - 4) + ".manifest";
83+
84+
std::ifstream in(manifest_file);
85+
if (!in.is_open()) {
86+
found = false;
87+
return;
88+
}
89+
90+
found = true;
91+
std::string line;
92+
while (std::getline(in, line)) {
93+
if (!line.empty())
94+
out.push_back(std::move(line));
95+
}
96+
}
97+
7698
static std::string ipc_endpoint(const bacaro_t *self, const std::string &filename)
7799
{
78100
return "ipc://" + self->runtime_dir + "/" + filename;
@@ -103,18 +125,32 @@ int discovery_peer_connect(bacaro_t *self, const std::string &filename)
103125
return BACARO_EZMQ;
104126

105127
std::string rep_ep = ipc_endpoint(self, rep_filename);
106-
self->peers[filename] = { nullptr, peer_name, pub_ep, rep_ep, -1 };
107128

108-
// ── Lazy DEALER: only create if we have active subscriptions ─────────
129+
PeerInfo peer_info;
130+
peer_info.name = peer_name;
131+
peer_info.pub_endpoint = pub_ep;
132+
peer_info.rep_endpoint = rep_ep;
133+
134+
// Read manifest if available
135+
read_manifest(self, filename, peer_info.manifest, peer_info.has_manifest);
136+
137+
self->peers[filename] = std::move(peer_info);
138+
139+
// ── Lazy DEALER: only create if subscriptions overlap with manifest ─
109140
if (!self->subscriptions.empty()) {
110-
if (discovery_ensure_dealer(self, filename, self->peers[filename]) != BACARO_OK) {
111-
zmq_disconnect(self->sub_sock, pub_ep.c_str());
112-
self->peers.erase(filename);
113-
return BACARO_EZMQ;
141+
auto &peer = self->peers[filename];
142+
for (const auto &prefix : self->subscriptions) {
143+
if (!manifest_overlaps(peer, prefix))
144+
continue;
145+
if (!peer.dealer_sock) {
146+
if (discovery_ensure_dealer(self, filename, peer) != BACARO_OK) {
147+
zmq_disconnect(self->sub_sock, pub_ep.c_str());
148+
self->peers.erase(filename);
149+
return BACARO_EZMQ;
150+
}
151+
}
152+
snapshot_send_request(peer.dealer_sock, prefix);
114153
}
115-
116-
for (const auto &prefix : self->subscriptions)
117-
snapshot_send_request(self->peers[filename].dealer_sock, prefix);
118154
}
119155

120156
return BACARO_OK;

src/internal.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,27 @@ struct PeerInfo {
2121
std::string pub_endpoint; // "ipc://..." for zmq_disconnect on the shared SUB
2222
std::string rep_endpoint; // "ipc://..." for lazy DEALER connect
2323
int dealer_fd = -1; // ZMQ_FD of dealer_sock (epoll)
24+
std::vector<std::string> manifest; // declared published domains (empty if no manifest)
25+
bool has_manifest = false; // true if peer had a .manifest file at discovery time
2426
};
2527

28+
// Check if a subscription prefix overlaps with a peer's manifest.
29+
// Returns true if the peer has no manifest or if any declared domain matches.
30+
static inline bool manifest_overlaps(const PeerInfo &peer, const std::string &prefix)
31+
{
32+
if (!peer.has_manifest)
33+
return true;
34+
for (const auto &domain : peer.manifest) {
35+
if (prefix.empty() || domain == prefix
36+
|| (domain.size() > prefix.size() && domain[prefix.size()] == '.'
37+
&& domain.compare(0, prefix.size(), prefix) == 0)
38+
|| (prefix.size() > domain.size() && prefix[domain.size()] == '.'
39+
&& prefix.compare(0, domain.size(), domain) == 0))
40+
return true;
41+
}
42+
return false;
43+
}
44+
2645
struct bacaro_s {
2746
std::string name;
2847
std::string runtime_dir;

0 commit comments

Comments
 (0)