Skip to content

Commit 028f754

Browse files
kishanpsdivyagayathri-hcl
authored andcommitted
[P4RT] Use zmq to send msgs to OrchAgent instead of redis notifications.
1 parent 2f8012b commit 028f754

26 files changed

+448
-119
lines changed

p4rt_app/BUILD.bazel

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ cc_binary(
3232
"@com_github_gflags_gflags//:gflags",
3333
"@com_github_grpc_grpc//:grpc++_authorization_provider",
3434
"@com_google_absl//absl/functional:bind_front",
35+
"@com_google_absl//absl/memory",
3536
"@com_google_absl//absl/status",
3637
"@com_google_absl//absl/status:statusor",
3738
"@com_google_absl//absl/strings",
@@ -51,11 +52,13 @@ cc_binary(
5152
"//p4rt_app/event_monitoring:state_event_monitor",
5253
"//p4rt_app/event_monitoring:state_verification_events",
5354
"//p4rt_app/p4runtime:p4runtime_impl",
55+
"//p4rt_app/sonic/adapters:subscriber_state_table_adapter",
56+
"//p4rt_app/sonic/adapters:producer_state_table_adapter",
57+
"//p4rt_app/sonic/adapters:zmq_producer_state_table_adapter",
5458
"//p4rt_app/sonic:packetio_impl",
5559
"//p4rt_app/sonic:redis_connections",
5660
"//p4rt_app/sonic/adapters:consumer_notifier_adapter",
5761
"//p4rt_app/sonic/adapters:notification_producer_adapter",
58-
"//p4rt_app/sonic/adapters:producer_state_table_adapter",
5962
"//p4rt_app/sonic/adapters:system_call_adapter",
6063
"//p4rt_app/sonic/adapters:table_adapter",
6164
"//p4rt_app/sonic/adapters:warm_boot_state_adapter",

p4rt_app/p4rt.cc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "absl/flags/parse.h"
2323
#include "absl/functional/bind_front.h"
24+
#include "absl/memory/memory.h"
2425
#include "absl/status/status.h"
2526
#include "absl/status/statusor.h"
2627
#include "absl/strings/str_cat.h"
@@ -54,13 +55,16 @@
5455
#include "p4rt_app/sonic/adapters/system_call_adapter.h"
5556
#include "p4rt_app/sonic/adapters/table_adapter.h"
5657
#include "p4rt_app/sonic/adapters/warm_boot_state_adapter.h"
58+
#include "p4rt_app/sonic/adapters/zmq_producer_state_table_adapter.h"
5759
#include "p4rt_app/sonic/packetio_impl.h"
5860
//TODO(PINS):
5961
//#include "swss/component_state_helper.h"
6062
//#include "swss/component_state_helper_interface.h"
6163
#include "p4rt_app/sonic/redis_connections.h"
6264
#include "swss/dbconnector.h"
6365
#include "swss/schema.h"
66+
#include "swss/warm_restart.h"
67+
#include "swss/zmqclient.h"
6468

6569
using ::grpc::Server;
6670
using ::grpc::ServerBuilder;
@@ -183,17 +187,11 @@ namespace p4rt_app {
183187
namespace {
184188

185189
sonic::P4rtTable CreateP4rtTable(swss::DBConnector* app_db,
186-
swss::DBConnector* counters_db) {
187-
const std::string kP4rtResponseChannel =
188-
std::string("APPL_DB_") + APP_P4RT_TABLE_NAME + "_RESPONSE_CHANNEL";
189-
190+
swss::DBConnector* counters_db,
191+
swss::ZmqClient* zmq_client) {
190192
return sonic::P4rtTable{
191-
.notification_producer =
192-
absl::make_unique<sonic::NotificationProducerAdapter>(
193-
app_db, APP_P4RT_CHANNEL_NAME),
194-
.notification_consumer =
195-
absl::make_unique<sonic::ConsumerNotifierAdapter>(
196-
kP4rtResponseChannel, app_db),
193+
.producer = absl::make_unique<sonic::ZmqProducerStateTableAdapter>(
194+
app_db, APP_P4RT_TABLE_NAME, *zmq_client),
197195
.app_db = absl::make_unique<p4rt_app::sonic::TableAdapter>(
198196
app_db, APP_P4RT_TABLE_NAME),
199197
.counter_db = absl::make_unique<p4rt_app::sonic::TableAdapter>(
@@ -428,9 +426,13 @@ int main(int argc, char** argv) {
428426
swss::DBConnector counters_db("COUNTERS_DB", /*timeout=*/0);
429427
swss::DBConnector state_db("STATE_DB", /*timeout=*/0);
430428

429+
// Zmq request-reply one-to-one connection with the swss server.
430+
swss::ZmqClient zmq_client("ipc:///zmq/zmq_swss_ep",
431+
/*waitTimeMs=*/10 * 60 * 1000);
432+
431433
// Create interfaces to interact with the P4RT_TABLE entries.
432434
p4rt_app::sonic::P4rtTable p4rt_table =
433-
p4rt_app::CreateP4rtTable(&app_db, &counters_db);
435+
p4rt_app::CreateP4rtTable(&app_db, &counters_db, &zmq_client);
434436
p4rt_app::sonic::VrfTable vrf_table =
435437
p4rt_app::CreateVrfTable(&app_db, &app_state_db);
436438
p4rt_app::sonic::VlanTable vlan_table =

p4rt_app/p4runtime/p4runtime_impl.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1597,7 +1597,7 @@ absl::Status P4RuntimeImpl::ConfigureAppDbTables(
15971597
ASSIGN_OR_RETURN(
15981598
pdpi::IrUpdateStatus status,
15991599
sonic::GetAndProcessResponseNotificationWithoutRevertingState(
1600-
*p4rt_table_.notification_consumer, acl_key));
1600+
*p4rt_table_.producer, acl_key));
16011601

16021602
// Any issue with the forwarding config should be sent back to the
16031603
// controller as an INVALID_ARGUMENT.
@@ -1614,9 +1614,9 @@ absl::Status P4RuntimeImpl::ConfigureAppDbTables(
16141614
_ << "Could not publish Table Definition Set to APPDB");
16151615

16161616
ASSIGN_OR_RETURN(
1617-
pdpi::IrUpdateStatus status,
1618-
sonic::GetAndProcessResponseNotificationWithoutRevertingState(
1619-
*p4rt_table_.notification_consumer, acl_key));
1617+
pdpi::IrUpdateStatus status,
1618+
sonic::GetAndProcessResponseNotificationWithoutRevertingState(
1619+
*p4rt_table_.producer, acl_key));
16201620

16211621
// Any issue with the forwarding config should be sent back to the
16221622
// controller as an INVALID_ARGUMENT.

p4rt_app/sonic/BUILD.bazel

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ cc_test(
4343
deps = [
4444
":app_db_acl_def_table_manager",
4545
":redis_connections",
46-
"//p4rt_app/sonic/adapters:fake_notification_producer_adapter",
4746
"//gutil/gutil:status_matchers",
4847
"//p4rt_app/sonic/adapters:fake_sonic_db_table",
48+
"//p4rt_app/sonic/adapters:fake_zmq_producer_state_table_adapter",
4949
"//p4rt_app/utils:ir_builder",
5050
"@com_github_google_glog//:glog",
5151
"@com_github_nlohmann_json//:nlohmann_json",
@@ -98,9 +98,9 @@ cc_test(
9898
"//gutil/gutil:status_matchers",
9999
"//p4_pdpi:ir_cc_proto",
100100
"//p4rt_app/sonic/adapters:mock_consumer_notifier_adapter",
101-
"//p4rt_app/sonic/adapters:mock_notification_producer_adapter",
102101
"//p4rt_app/sonic/adapters:mock_producer_state_table_adapter",
103102
"//p4rt_app/sonic/adapters:mock_table_adapter",
103+
"//p4rt_app/sonic/adapters:mock_zmq_producer_state_table_adapter",
104104
"//p4rt_app/tests/lib:app_db_entry_builder",
105105
"//sai_p4/instantiations/google:instantiations",
106106
"//sai_p4/instantiations/google:sai_p4info_cc",
@@ -284,9 +284,9 @@ cc_library(
284284
hdrs = ["redis_connections.h"],
285285
deps = [
286286
"//p4rt_app/sonic/adapters:consumer_notifier_adapter",
287-
"//p4rt_app/sonic/adapters:notification_producer_adapter",
288287
"//p4rt_app/sonic/adapters:producer_state_table_adapter",
289288
"//p4rt_app/sonic/adapters:table_adapter",
289+
"//p4rt_app/sonic/adapters:zmq_producer_state_table_adapter",
290290
],
291291
)
292292

@@ -300,6 +300,7 @@ cc_library(
300300
"//p4_pdpi/utils:ir",
301301
"//p4rt_app/sonic/adapters:consumer_notifier_adapter",
302302
"//p4rt_app/sonic/adapters:table_adapter",
303+
"//p4rt_app/sonic/adapters:zmq_producer_state_table_adapter",
303304
"@com_github_google_glog//:glog",
304305
"@com_google_absl//absl/container:btree",
305306
"@com_google_absl//absl/status",
@@ -477,8 +478,6 @@ cc_test(
477478
"//gutil/gutil:proto_matchers",
478479
"//gutil/gutil:status_matchers",
479480
"//p4_pdpi:ir_cc_proto",
480-
"//p4rt_app/sonic/adapters:mock_consumer_notifier_adapter",
481-
"//p4rt_app/sonic/adapters:mock_notification_producer_adapter",
482481
"//p4rt_app/sonic/adapters:mock_table_adapter",
483482
"@com_github_p4lang_p4runtime//:p4runtime_cc_proto",
484483
"@com_google_googleapis//google/rpc:code_cc_proto",

p4rt_app/sonic/adapters/BUILD.bazel

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,3 +252,34 @@ cc_library(
252252
"@sonic_swss_common//:common",
253253
],
254254
)
255+
256+
cc_library(
257+
name = "zmq_producer_state_table_adapter",
258+
srcs = ["zmq_producer_state_table_adapter.cc"],
259+
hdrs = ["zmq_producer_state_table_adapter.h"],
260+
deps = ["@sonic_swss_common//:libswsscommon"],
261+
)
262+
263+
cc_library(
264+
name = "fake_zmq_producer_state_table_adapter",
265+
testonly = True,
266+
srcs = ["fake_zmq_producer_state_table_adapter.cc"],
267+
hdrs = ["fake_zmq_producer_state_table_adapter.h"],
268+
deps = [
269+
":fake_sonic_db_table",
270+
":zmq_producer_state_table_adapter",
271+
"@com_google_absl//absl/status",
272+
"@sonic_swss_common//:common",
273+
],
274+
)
275+
276+
cc_library(
277+
name = "mock_zmq_producer_state_table_adapter",
278+
testonly = True,
279+
hdrs = ["mock_zmq_producer_state_table_adapter.h"],
280+
deps = [
281+
":zmq_producer_state_table_adapter",
282+
"@com_google_googletest//:gtest",
283+
"@sonic_swss_common//:common",
284+
],
285+
)

p4rt_app/sonic/adapters/fake_consumer_notifier_adapter.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ FakeConsumerNotifierAdapter::FakeConsumerNotifierAdapter(
3333
bool FakeConsumerNotifierAdapter::WaitForNotificationAndPop(
3434
std::string &op, std::string &data, SonicDbEntryList &values,
3535
int64_t timeout_ms) {
36-
sonic_db_table_->GetNextNotification(op, data, values);
37-
return true;
36+
return sonic_db_table_->GetNextNotification(op, data, values).ok();
3837
}
3938

4039
} // namespace sonic

p4rt_app/sonic/adapters/fake_sonic_db_table.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,11 @@ bool FakeSonicDbTable::PushNotification(const std::string &key,
101101
return true;
102102
}
103103

104-
void FakeSonicDbTable::GetNextNotification(std::string &op, std::string &data,
105-
SonicDbEntryList &values) {
104+
absl::Status FakeSonicDbTable::GetNextNotification(std::string& op,
105+
std::string& data,
106+
SonicDbEntryList& values) {
106107
if (notifications_.empty()) {
107-
// TODO: we probably want to return a timeout error if we never
108-
// get a notification?
109-
LOG(FATAL) << "Could not find a notification.";
108+
return absl::DeadlineExceededError("No active notification to send");
110109
}
111110

112111
VLOG(1) << absl::StreamFormat("'%s' get notification: %s", debug_table_name_,
@@ -124,6 +123,7 @@ void FakeSonicDbTable::GetNextNotification(std::string &op, std::string &data,
124123
op = "SWSS_RC_SUCCESS";
125124
values.push_back({"err_str", "SWSS_RC_SUCCESS"});
126125
}
126+
return absl::OkStatus();
127127
}
128128

129129
absl::StatusOr<SonicDbEntryMap> FakeSonicDbTable::ReadTableEntry(

p4rt_app/sonic/adapters/fake_sonic_db_table.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "absl/base/thread_annotations.h"
2424
#include "absl/container/flat_hash_map.h"
25+
#include "absl/status/status.h"
2526
#include "absl/status/statusor.h"
2627
#include "absl/synchronization/mutex.h"
2728

@@ -67,8 +68,8 @@ class FakeSonicDbTable {
6768
bool PushNotification(const std::string &key);
6869
bool PushNotification(const std::string &key, const std::string &op,
6970
const SonicDbEntryMap &values);
70-
void GetNextNotification(std::string &op, std::string &data,
71-
SonicDbEntryList &values);
71+
absl::Status GetNextNotification(std::string& op, std::string& data,
72+
SonicDbEntryList& values);
7273

7374
absl::StatusOr<SonicDbEntryMap> ReadTableEntry(const std::string &key) const
7475
ABSL_LOCKS_EXCLUDED(entries_mutex_);

p4rt_app/sonic/adapters/fake_sonic_db_table_test.cc

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,13 @@ TEST(FakeSonicDbTest, DeleteEntry) {
101101
EXPECT_TRUE(table.GetAllKeys().empty());
102102
}
103103

104-
TEST(FakeSonicDbDeathTest, GetNotificationDiesIfNoNotificationExists) {
104+
TEST(FakeSonicDbDeathTest, GetNotificationFailsIfNoNotificationExists) {
105105
FakeSonicDbTable table;
106106
std::string op;
107107
std::string data;
108108
SonicDbEntryList value;
109109

110-
EXPECT_DEATH(table.GetNextNotification(op, data, value),
111-
"Could not find a notification");
110+
EXPECT_FALSE(table.GetNextNotification(op, data, value).ok());
112111
}
113112

114113
TEST(FakeSonicDbTest, DefaultNotificationResponseIsSuccess) {
@@ -119,7 +118,7 @@ TEST(FakeSonicDbTest, DefaultNotificationResponseIsSuccess) {
119118

120119
// First insert.
121120
EXPECT_TRUE(table.PushNotification("entry"));
122-
table.GetNextNotification(op, data, values);
121+
EXPECT_OK(table.GetNextNotification(op, data, values));
123122
EXPECT_EQ(op, "SWSS_RC_SUCCESS");
124123
EXPECT_EQ(data, "entry");
125124
EXPECT_THAT(values,
@@ -135,7 +134,7 @@ TEST(FakeSonicDbTest, SetNotificationResponseForKey) {
135134
table.SetResponseForKey(/*key=*/"entry", /*code=*/"SWSS_RC_UNKNOWN",
136135
/*message=*/"my error message");
137136
EXPECT_FALSE(table.PushNotification("entry"));
138-
table.GetNextNotification(op, data, values);
137+
EXPECT_OK(table.GetNextNotification(op, data, values));
139138
EXPECT_EQ(op, "SWSS_RC_UNKNOWN");
140139
EXPECT_EQ(data, "entry");
141140
EXPECT_THAT(values,
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
#include "p4rt_app/sonic/adapters/fake_zmq_producer_state_table_adapter.h"
15+
16+
#include <memory>
17+
#include <string>
18+
#include <utility>
19+
#include <vector>
20+
21+
#include "absl/status/status.h"
22+
#include "p4rt_app/sonic/adapters/fake_sonic_db_table.h"
23+
#include "swss/rediscommand.h"
24+
#include "swss/schema.h"
25+
#include "swss/table.h"
26+
27+
namespace p4rt_app {
28+
namespace sonic {
29+
30+
FakeZmqProducerStateTableAdapter::FakeZmqProducerStateTableAdapter(
31+
FakeSonicDbTable* sonic_db_table)
32+
: sonic_db_table_(*sonic_db_table) {}
33+
34+
void FakeZmqProducerStateTableAdapter::send(
35+
const std::vector<swss::KeyOpFieldsValuesTuple>& kcos) {
36+
for (const auto& kfv : kcos) {
37+
SonicDbEntryMap entry_map;
38+
SonicDbEntryList entry_list;
39+
for (const auto& [field, value] : kfvFieldsValues(kfv)) {
40+
entry_map[field] = value;
41+
entry_list.push_back(std::make_pair(field, value));
42+
}
43+
44+
// Only if the request succeeds should we update AppDb state.
45+
if (sonic_db_table_.PushNotification(kfvKey(kfv), kfvOp(kfv), entry_map)) {
46+
// Notifications to the OA can only "SET" or "DEL" an entry. So "SET" is
47+
// used for both inserting and modifying an entry. Therefore, we always
48+
// delete the current entry and only then re-insert it on "SET".
49+
sonic_db_table_.DeleteTableEntry(kfvKey(kfv));
50+
if (kfvOp(kfv) == "SET") {
51+
sonic_db_table_.InsertTableEntry(kfvKey(kfv), entry_list);
52+
}
53+
}
54+
}
55+
}
56+
57+
bool FakeZmqProducerStateTableAdapter::wait(
58+
std::string& db, std::string& table_name,
59+
std::vector<std::shared_ptr<swss::KeyOpFieldsValuesTuple>>& kcos) {
60+
// db & table_name is fixed now since only p4rt tables use zmq.
61+
db = "APPL_DB";
62+
table_name = APP_P4RT_TABLE_NAME;
63+
std::string op, key;
64+
SonicDbEntryList values;
65+
while (sonic_db_table_.GetNextNotification(op, key, values).code() !=
66+
absl::StatusCode::kDeadlineExceeded) {
67+
// Zmq response consists of key, op(empty) and field value tuples of
68+
// <status, error_message>. Replace the field in values tuple(originally has
69+
// `err_str`) with the actual response code string.
70+
values[0].first = op;
71+
kcos.push_back(std::make_shared<swss::KeyOpFieldsValuesTuple>(
72+
key, "", std::vector<std::pair<std::string, std::string>>(values)));
73+
values.clear();
74+
}
75+
if (kcos.empty()) {
76+
return false;
77+
}
78+
79+
return true;
80+
}
81+
82+
} // namespace sonic
83+
} // namespace p4rt_app

0 commit comments

Comments
 (0)