Skip to content

Commit 81d3bfe

Browse files
authored
Merge pull request #110 from gregmedd/feature/up1.6.0/use-pubsub-only
Removing all modes other than pub/sub
2 parents 7e335a4 + fa62d3e commit 81d3bfe

2 files changed

Lines changed: 3 additions & 205 deletions

File tree

include/up-transport-zenoh-cpp/ZenohUTransport.h

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -113,35 +113,16 @@ struct ZenohUTransport : public UTransport {
113113
static v1::UMessage sampleToUMessage(const zenoh::Sample& sample);
114114
static v1::UMessage queryToUMessage(const zenoh::Query& query);
115115

116-
v1::UStatus registerRequestListener_(const std::string& zenoh_key,
117-
CallableConn listener);
118-
119-
v1::UStatus registerResponseListener_(const std::string& zenoh_key,
120-
CallableConn listener);
121-
122116
v1::UStatus registerPublishNotificationListener_(
123117
const std::string& zenoh_key, CallableConn listener);
124118

125-
v1::UStatus sendRequest_(const std::string& zenoh_key,
126-
const std::string& payload,
127-
const v1::UAttributes& attributes);
128-
129-
v1::UStatus sendResponse_(const std::string& payload,
130-
const v1::UAttributes& attributes);
131-
132119
v1::UStatus sendPublishNotification_(const std::string& zenoh_key,
133120
const std::string& payload,
134121
const v1::UAttributes& attributes);
135122

136123
zenoh::Session session_;
137124

138-
ThreadSafeMap<UuriKey, CallableConn> rpc_callback_map_;
139-
140125
ThreadSafeMap<CallableConn, zenoh::Subscriber<void>> subscriber_map_;
141-
142-
ThreadSafeMap<CallableConn, zenoh::Queryable<void>> queryable_map_;
143-
144-
ThreadSafeMap<std::string, std::shared_ptr<zenoh::Query>> query_map_;
145126
};
146127

147128
} // namespace uprotocol::transport

src/ZenohUTransport.cpp

Lines changed: 3 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -185,46 +185,6 @@ ZenohUTransport::ZenohUTransport(const v1::UUri& defaultUri,
185185
spdlog::info("ZenohUTransport init");
186186
}
187187

188-
v1::UStatus ZenohUTransport::registerRequestListener_(
189-
const std::string& zenoh_key, CallableConn listener) {
190-
spdlog::info("registerRequestListener_: {}", zenoh_key);
191-
192-
// NOTE: listener is captured by copy here so that it does not go out
193-
// of scope when this function returns.
194-
auto on_query = [this, listener](const zenoh::Query& query) mutable {
195-
auto attributes = attachmentToUAttributes(query.get_attachment());
196-
auto id_str =
197-
datamodel::serializer::uuid::AsString().serialize(attributes.id());
198-
199-
// TODO(sashacmc): Replace this workaround with `query.clone()`
200-
// after zenohcpp 1.0.0-rc6 release
201-
auto cloned_query = std::make_shared<zenoh::Query>(nullptr);
202-
z_query_clone(zenoh::detail::as_owned_c_ptr(*cloned_query),
203-
zenoh::detail::loan(query));
204-
205-
query_map_.emplace(std::move(id_str), std::move(cloned_query));
206-
listener(queryToUMessage(query));
207-
};
208-
209-
auto on_drop = []() {};
210-
211-
auto queryable = session_.declare_queryable(zenoh_key, std::move(on_query),
212-
std::move(on_drop));
213-
214-
queryable_map_.emplace(listener, std::move(queryable));
215-
216-
return v1::UStatus();
217-
}
218-
219-
v1::UStatus ZenohUTransport::registerResponseListener_(
220-
const std::string& zenoh_key, CallableConn listener) {
221-
spdlog::info("registerResponseListener_: {}", zenoh_key);
222-
223-
rpc_callback_map_.emplace(zenoh_key, listener);
224-
225-
return v1::UStatus();
226-
}
227-
228188
v1::UStatus ZenohUTransport::registerPublishNotificationListener_(
229189
const std::string& zenoh_key, CallableConn listener) {
230190
spdlog::info("registerPublishNotificationListener_: {}", zenoh_key);
@@ -243,93 +203,6 @@ v1::UStatus ZenohUTransport::registerPublishNotificationListener_(
243203
return v1::UStatus();
244204
}
245205

246-
v1::UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key,
247-
const std::string& payload,
248-
const v1::UAttributes& attributes) {
249-
spdlog::debug("sendRequest_: {}: {}", zenoh_key, payload);
250-
zenoh::KeyExpr ke(zenoh_key);
251-
auto ke_search = [&](const std::pair<std::string, CallableConn>& pair) {
252-
return zenoh::KeyExpr(pair.first).intersects(ke);
253-
};
254-
255-
CallableConn resp_callback;
256-
257-
if (auto resp_callback_opt = rpc_callback_map_.find_if(ke_search);
258-
resp_callback_opt) {
259-
spdlog::debug("sendRequest_: found callback for '{}'", zenoh_key);
260-
resp_callback = *resp_callback_opt;
261-
} else {
262-
spdlog::error("sendRequest_: failed to find response callback for '{}'",
263-
zenoh_key);
264-
return uError(v1::UCode::UNAVAILABLE,
265-
"failed to find response callback");
266-
}
267-
auto on_reply = [=](const zenoh::Reply& reply) mutable {
268-
spdlog::debug("on_reply for {}", zenoh_key);
269-
if (reply.is_ok()) {
270-
const auto& sample = reply.get_ok();
271-
spdlog::debug("resp_callback: {}",
272-
sample.get_payload().deserialize<std::string>());
273-
resp_callback(sampleToUMessage(sample));
274-
spdlog::debug("resp_callback: done");
275-
} else {
276-
spdlog::error(
277-
"on_reply got en error: {}",
278-
reply.get_err().get_payload().deserialize<std::string>());
279-
// TODO: error report
280-
}
281-
};
282-
283-
auto attachment = uattributesToAttachment(attributes);
284-
285-
auto on_done = []() {};
286-
287-
try {
288-
// -Wpedantic disallows named member initialization until C++20,
289-
// so GetOptions needs to be explicitly created and passed with
290-
// std::move()
291-
zenoh::Session::GetOptions options;
292-
options.target = Z_QUERY_TARGET_BEST_MATCHING;
293-
options.consolidation =
294-
zenoh::QueryConsolidation(Z_CONSOLIDATION_MODE_NONE);
295-
options.payload = zenoh::Bytes::serialize(payload);
296-
options.attachment = zenoh::Bytes::serialize(attachment);
297-
session_.get(zenoh_key, "", std::move(on_reply), std::move(on_done),
298-
std::move(options));
299-
} catch (const zenoh::ZException& e) {
300-
return uError(v1::UCode::INTERNAL, e.what());
301-
}
302-
303-
return v1::UStatus();
304-
}
305-
306-
v1::UStatus ZenohUTransport::sendResponse_(const std::string& payload,
307-
const v1::UAttributes& attributes) {
308-
auto reqid_str =
309-
datamodel::serializer::uuid::AsString().serialize(attributes.reqid());
310-
spdlog::debug("sendResponse_: {}: {}", reqid_str, payload);
311-
std::shared_ptr<zenoh::Query> query(nullptr);
312-
if (auto query_opt = query_map_.find(reqid_str); query_opt) {
313-
query = *query_opt;
314-
} else {
315-
spdlog::error("sendResponse_: query doesn't exist");
316-
return uError(v1::UCode::INTERNAL, "query doesn't exist");
317-
}
318-
319-
spdlog::debug("sendResponse_ to query: {}",
320-
query->get_keyexpr().as_string_view());
321-
auto attachment = uattributesToAttachment(attributes);
322-
// -Wpedantic disallows named member initialization until C++20,
323-
// so PutOptions needs to be explicitly created and passed with
324-
// std::move()
325-
zenoh::Query::ReplyOptions options =
326-
zenoh::Query::ReplyOptions::create_default();
327-
options.attachment = zenoh::Bytes::serialize(attachment);
328-
query->reply(query->get_keyexpr(), payload, std::move(options));
329-
330-
return v1::UStatus();
331-
}
332-
333206
v1::UStatus ZenohUTransport::sendPublishNotification_(
334207
const std::string& zenoh_key, const std::string& payload,
335208
const v1::UAttributes& attributes) {
@@ -371,76 +244,20 @@ v1::UStatus ZenohUTransport::sendImpl(const v1::UMessage& message) {
371244
attributes.source(), attributes.sink());
372245
}
373246

374-
switch (attributes.type()) {
375-
case v1::UMessageType::UMESSAGE_TYPE_PUBLISH: {
376-
return sendPublishNotification_(zenoh_key, payload, attributes);
377-
}
378-
case v1::UMessageType::UMESSAGE_TYPE_NOTIFICATION: {
379-
return sendPublishNotification_(zenoh_key, payload, attributes);
380-
}
381-
case v1::UMessageType::UMESSAGE_TYPE_REQUEST: {
382-
return sendRequest_(zenoh_key, payload, attributes);
383-
}
384-
case v1::UMessageType::UMESSAGE_TYPE_RESPONSE: {
385-
return sendResponse_(payload, attributes);
386-
}
387-
// These sentinel values come from the protobuf compiler.
388-
// They are illegal for the enum, but cause linting problems.
389-
// In order to suppress the linting error, they need to
390-
// be included in the switch-case statement.
391-
// It is deemed acceptable to use an exception here because
392-
// it is in the sending code. An exception would not be
393-
// acceptable in receiving code. The correct strategy wopuld be
394-
// to drop the message.
395-
case v1::UMessageType::UMessageType_INT_MIN_SENTINEL_DO_NOT_USE_:
396-
case v1::UMessageType::UMessageType_INT_MAX_SENTINEL_DO_NOT_USE_:
397-
throw std::runtime_error(
398-
"Sentinel values detected in attribute type switch-case");
399-
case v1::UMessageType::UMESSAGE_TYPE_UNSPECIFIED:
400-
default: {
401-
return uError(v1::UCode::INVALID_ARGUMENT,
402-
"Wrong Message type in v1::UAttributes");
403-
}
404-
}
405-
return v1::UStatus();
247+
return sendPublishNotification_(zenoh_key, payload, attributes);
406248
}
407249

408250
v1::UStatus ZenohUTransport::registerListenerImpl(
409251
CallableConn&& listener, const v1::UUri& source_filter,
410252
std::optional<v1::UUri>&& sink_filter) {
411253
std::string zenoh_key = toZenohKeyString(getEntityUri().authority_name(),
412254
source_filter, sink_filter);
413-
if (!sink_filter) {
414-
// When only a single filter is provided, this signals that the
415-
// listener is for a pub/sub-like communication mode where then
416-
// messages are expected to only have a source address.
417-
registerPublishNotificationListener_(zenoh_key, listener);
418-
} else {
419-
// Otherwise, the filters could be for any communication mode.
420-
// We can't use the UUri validators to determine what mode they
421-
// are for because a) there is overlap in allowed values between
422-
// modes and b) any filter is allowed to have wildcards present.
423-
registerRequestListener_(zenoh_key, listener);
424-
registerPublishNotificationListener_(zenoh_key, listener);
425-
426-
if (sink_filter.has_value()) {
427-
// zenoh_key for response listener should be in revert order
428-
std::string zenoh_response_key = toZenohKeyString(
429-
getEntityUri().authority_name(), *sink_filter, source_filter);
430-
registerResponseListener_(zenoh_response_key, listener);
431-
}
432-
}
433255

434-
v1::UStatus status;
435-
status.set_code(v1::UCode::OK);
436-
return status;
256+
return registerPublishNotificationListener_(zenoh_key, listener);
437257
}
438258

439259
void ZenohUTransport::cleanupListener(CallableConn listener) {
440-
if (subscriber_map_.erase(listener) > 0) {
441-
return;
442-
}
443-
queryable_map_.erase(listener);
260+
subscriber_map_.erase(listener);
444261
}
445262

446263
} // namespace uprotocol::transport

0 commit comments

Comments
 (0)