Skip to content

Commit 4170971

Browse files
committed
Fix issues
1 parent b24f0db commit 4170971

File tree

6 files changed

+184
-96
lines changed

6 files changed

+184
-96
lines changed

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

build_autogenerated.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test/core/event_engine/test_suite/BUILD

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ grpc_cc_test(
5757
"//test/core/event_engine/test_suite/posix:oracle_event_engine_posix",
5858
"//test/core/event_engine/test_suite/tests:client",
5959
"//test/core/event_engine/test_suite/tests:dns",
60+
"//test/core/event_engine/test_suite/tests:endpoint",
6061
"//test/core/event_engine/test_suite/tests:server",
6162
"//test/core/event_engine/test_suite/tests:timer",
6263
],
@@ -94,6 +95,7 @@ grpc_cc_test(
9495
"//test/core/event_engine:event_engine_test_utils",
9596
"//test/core/event_engine/test_suite/posix:oracle_event_engine_posix",
9697
"//test/core/event_engine/test_suite/tests:client",
98+
"//test/core/event_engine/test_suite/tests:endpoint",
9799
"//test/core/event_engine/test_suite/tests:server",
98100
"//test/core/event_engine/test_suite/tests:timer",
99101
],
@@ -113,6 +115,7 @@ grpc_cc_test(
113115
"//test/core/event_engine:event_engine_test_utils",
114116
"//test/core/event_engine/test_suite/tests:client",
115117
"//test/core/event_engine/test_suite/tests:dns",
118+
"//test/core/event_engine/test_suite/tests:endpoint",
116119
"//test/core/event_engine/test_suite/tests:server",
117120
"//test/core/event_engine/test_suite/tests:timer",
118121
],
@@ -145,6 +148,7 @@ grpc_cc_test(
145148
deps = [
146149
"//test/core/event_engine/fuzzing_event_engine",
147150
"//test/core/event_engine/test_suite/tests:client",
151+
"//test/core/event_engine/test_suite/tests:endpoint",
148152
"//test/core/event_engine/test_suite/tests:server",
149153
"//test/core/event_engine/test_suite/tests:timer",
150154
],

test/core/event_engine/test_suite/tests/BUILD

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,19 @@ grpc_cc_library(
9999
alwayslink = 1,
100100
)
101101

102+
grpc_cc_library(
103+
name = "endpoint",
104+
testonly = True,
105+
srcs = ["endpoint_test.cc"],
106+
external_deps = ["absl/log:check"],
107+
deps = [
108+
"//src/core:channel_args",
109+
"//test/core/event_engine:event_engine_test_utils",
110+
"//test/core/event_engine/test_suite:event_engine_test_framework",
111+
],
112+
alwayslink = 1,
113+
)
114+
102115
grpc_cc_library(
103116
name = "server",
104117
testonly = True,

test/core/event_engine/test_suite/tests/client_test.cc

Lines changed: 0 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -302,101 +302,5 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
302302
server_endpoint.reset();
303303
}
304304

305-
// Create a connection using the test EventEngine to a listener created by the
306-
// test EventEngine and exchange bi-di data over the connection. Each endpoint
307-
// gets reset as soon as the write is done. This test checks that EventEngine
308-
// implementations handle lifetimes around endpoints correctly.
309-
310-
TEST_F(EventEngineClientTest, WriteEventCallbackEndpointValidityTest) {
311-
grpc_core::ExecCtx ctx;
312-
std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
313-
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
314-
std::string target_addr = absl::StrCat(
315-
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
316-
auto resolved_addr = URIToResolvedAddress(target_addr);
317-
CHECK_OK(resolved_addr);
318-
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
319-
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
320-
std::unique_ptr<grpc_core::Notification> server_signal;
321-
322-
Listener::AcceptCallback accept_cb =
323-
[&server_endpoint, &server_signal](
324-
std::unique_ptr<Endpoint> ep,
325-
grpc_core::MemoryAllocator /*memory_allocator*/) {
326-
server_endpoint = std::move(ep);
327-
server_signal->Notify();
328-
};
329-
330-
grpc_core::ChannelArgs args;
331-
auto quota = grpc_core::ResourceQuota::Default();
332-
args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
333-
ChannelArgsEndpointConfig config(args);
334-
auto listener = *test_ee->CreateListener(
335-
std::move(accept_cb),
336-
[](absl::Status status) {
337-
ASSERT_TRUE(status.ok()) << status.ToString();
338-
},
339-
config, std::make_unique<grpc_core::MemoryQuota>("foo"));
340-
341-
ASSERT_TRUE(listener->Bind(*resolved_addr).ok());
342-
ASSERT_TRUE(listener->Start().ok());
343-
344-
constexpr size_t n_iterations = 100;
345-
for (int i = 0; i < n_iterations; ++i) {
346-
server_signal = std::make_unique<grpc_core::Notification>();
347-
grpc_core::Notification client_signal;
348-
test_ee->Connect(
349-
[&client_endpoint,
350-
&client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> endpoint) {
351-
ASSERT_TRUE(endpoint.ok());
352-
client_endpoint = std::move(*endpoint);
353-
client_signal.Notify();
354-
},
355-
*resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
356-
24h);
357-
358-
client_signal.WaitForNotification();
359-
server_signal->WaitForNotification();
360-
ASSERT_NE(client_endpoint.get(), nullptr);
361-
ASSERT_NE(server_endpoint.get(), nullptr);
362-
363-
// Start writes with WriteEventCallbacks from the client endpoint and server
364-
// endpoint and reset both endpoints immediately. It doesn't matter if the
365-
// callbacks don't get invoked as long as there is no use-after-free
366-
// behavior.
367-
auto event_cb = [](EventEngine::Endpoint* ee_ep, WriteEvent /*event*/,
368-
absl::Time /*time*/,
369-
std::vector<WriteMetric> /*metrics*/) {
370-
// some operation on the endpoint to ensure validity
371-
ASSERT_NE(ee_ep->GetPeerAddress().address(), nullptr);
372-
};
373-
SliceBuffer client_write_slice_buf;
374-
SliceBuffer server_write_slice_buf;
375-
WriteArgs client_write_args;
376-
client_write_args.set_metrics_sink(WriteEventSink(
377-
client_endpoint->AllWriteMetrics(),
378-
{WriteEvent::kSendMsg, WriteEvent::kScheduled, WriteEvent::kSent,
379-
WriteEvent::kAcked, WriteEvent::kClosed},
380-
event_cb));
381-
WriteArgs server_write_args;
382-
server_write_args.set_metrics_sink(WriteEventSink(
383-
client_endpoint->AllWriteMetrics(),
384-
{WriteEvent::kSendMsg, WriteEvent::kScheduled, WriteEvent::kSent,
385-
WriteEvent::kAcked, WriteEvent::kClosed},
386-
event_cb));
387-
AppendStringToSliceBuffer(&client_write_slice_buf, GetNextSendMessage());
388-
AppendStringToSliceBuffer(&server_write_slice_buf, GetNextSendMessage());
389-
client_endpoint->Write([&](absl::Status /*status*/) {},
390-
&client_write_slice_buf,
391-
std::move(client_write_args));
392-
server_endpoint->Write([&](absl::Status /*status*/) {},
393-
&server_write_slice_buf,
394-
std::move(server_write_args));
395-
client_endpoint.reset();
396-
server_endpoint.reset();
397-
}
398-
listener.reset();
399-
}
400-
401305
// TODO(vigneshbabu): Add more tests which create listeners bound to a mix
402306
// Ipv6 and other type of addresses (UDS) in the same test.
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright 2025 gRPC authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <grpc/event_engine/event_engine.h>
16+
#include <grpc/event_engine/memory_allocator.h>
17+
#include <grpc/impl/channel_arg_names.h>
18+
19+
#include <chrono>
20+
#include <memory>
21+
#include <string>
22+
#include <utility>
23+
#include <vector>
24+
25+
#include "absl/log/check.h"
26+
#include "absl/status/status.h"
27+
#include "absl/status/statusor.h"
28+
#include "absl/strings/str_cat.h"
29+
#include "absl/time/time.h"
30+
#include "gtest/gtest.h"
31+
#include "src/core/lib/channel/channel_args.h"
32+
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
33+
#include "src/core/lib/event_engine/tcp_socket_utils.h"
34+
#include "src/core/lib/iomgr/exec_ctx.h"
35+
#include "src/core/lib/resource_quota/memory_quota.h"
36+
#include "src/core/lib/resource_quota/resource_quota.h"
37+
#include "src/core/util/notification.h"
38+
#include "test/core/event_engine/event_engine_test_utils.h"
39+
#include "test/core/event_engine/test_suite/event_engine_test_framework.h"
40+
#include "test/core/test_util/port.h"
41+
42+
class EventEngineEndpointTest : public EventEngineTest {};
43+
44+
namespace {
45+
46+
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
47+
using ::grpc_event_engine::experimental::EventEngine;
48+
using ::grpc_event_engine::experimental::URIToResolvedAddress;
49+
using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
50+
using WriteArgs =
51+
::grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs;
52+
using WriteEvent =
53+
::grpc_event_engine::experimental::EventEngine::Endpoint::WriteEvent;
54+
using WriteMetric =
55+
::grpc_event_engine::experimental::EventEngine::Endpoint::WriteMetric;
56+
using WriteEventSink =
57+
::grpc_event_engine::experimental::EventEngine::Endpoint::WriteEventSink;
58+
using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
59+
using ::grpc_event_engine::experimental::GetNextSendMessage;
60+
using ::grpc_event_engine::experimental::NotifyOnDelete;
61+
using ::grpc_event_engine::experimental::SliceBuffer;
62+
63+
using namespace std::chrono_literals;
64+
65+
// Create a connection using the test EventEngine to a listener created by the
66+
// test EventEngine and exchange bi-di data over the connection. Each endpoint
67+
// gets reset as soon as the write is done. This test checks that EventEngine
68+
// implementations handle lifetimes around endpoints correctly.
69+
70+
TEST_F(EventEngineEndpointTest, WriteEventCallbackEndpointValidityTest) {
71+
grpc_core::ExecCtx ctx;
72+
std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
73+
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
74+
std::string target_addr = absl::StrCat(
75+
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
76+
auto resolved_addr = URIToResolvedAddress(target_addr);
77+
CHECK_OK(resolved_addr);
78+
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
79+
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
80+
std::unique_ptr<grpc_core::Notification> server_signal;
81+
82+
Listener::AcceptCallback accept_cb =
83+
[&server_endpoint, &server_signal](
84+
std::unique_ptr<Endpoint> ep,
85+
grpc_core::MemoryAllocator /*memory_allocator*/) {
86+
server_endpoint = std::move(ep);
87+
server_signal->Notify();
88+
};
89+
90+
grpc_core::ChannelArgs args;
91+
auto quota = grpc_core::ResourceQuota::Default();
92+
args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
93+
ChannelArgsEndpointConfig config(args);
94+
auto listener = *test_ee->CreateListener(
95+
std::move(accept_cb),
96+
[](absl::Status status) {
97+
ASSERT_TRUE(status.ok()) << status.ToString();
98+
},
99+
config, std::make_unique<grpc_core::MemoryQuota>("foo"));
100+
101+
ASSERT_TRUE(listener->Bind(*resolved_addr).ok());
102+
ASSERT_TRUE(listener->Start().ok());
103+
104+
constexpr int n_iterations = 100;
105+
for (int i = 0; i < n_iterations; ++i) {
106+
server_signal = std::make_unique<grpc_core::Notification>();
107+
grpc_core::Notification client_signal;
108+
test_ee->Connect(
109+
[&client_endpoint,
110+
&client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> endpoint) {
111+
ASSERT_TRUE(endpoint.ok());
112+
client_endpoint = std::move(*endpoint);
113+
client_signal.Notify();
114+
},
115+
*resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
116+
24h);
117+
118+
client_signal.WaitForNotification();
119+
server_signal->WaitForNotification();
120+
ASSERT_NE(client_endpoint.get(), nullptr);
121+
ASSERT_NE(server_endpoint.get(), nullptr);
122+
123+
// Start writes with WriteEventCallbacks from the client endpoint and server
124+
// endpoint and reset both endpoints immediately. It doesn't matter if the
125+
// callbacks don't get invoked as long as there is no use-after-free
126+
// behavior.
127+
auto event_cb = [](EventEngine::Endpoint* ee_ep, WriteEvent /*event*/,
128+
absl::Time /*time*/,
129+
std::vector<WriteMetric> /*metrics*/) {
130+
// some operation on the endpoint to ensure validity
131+
ASSERT_NE(ee_ep->GetPeerAddress().address(), nullptr);
132+
};
133+
SliceBuffer client_write_slice_buf;
134+
SliceBuffer server_write_slice_buf;
135+
WriteArgs client_write_args;
136+
client_write_args.set_metrics_sink(WriteEventSink(
137+
client_endpoint->AllWriteMetrics(),
138+
{WriteEvent::kSendMsg, WriteEvent::kScheduled, WriteEvent::kSent,
139+
WriteEvent::kAcked, WriteEvent::kClosed},
140+
event_cb));
141+
WriteArgs server_write_args;
142+
server_write_args.set_metrics_sink(WriteEventSink(
143+
client_endpoint->AllWriteMetrics(),
144+
{WriteEvent::kSendMsg, WriteEvent::kScheduled, WriteEvent::kSent,
145+
WriteEvent::kAcked, WriteEvent::kClosed},
146+
event_cb));
147+
AppendStringToSliceBuffer(&client_write_slice_buf, GetNextSendMessage());
148+
AppendStringToSliceBuffer(&server_write_slice_buf, GetNextSendMessage());
149+
client_endpoint->Write([&](absl::Status /*status*/) {},
150+
&client_write_slice_buf,
151+
std::move(client_write_args));
152+
server_endpoint->Write([&](absl::Status /*status*/) {},
153+
&server_write_slice_buf,
154+
std::move(server_write_args));
155+
client_endpoint.reset();
156+
server_endpoint.reset();
157+
}
158+
listener.reset();
159+
}
160+
161+
} // namespace

0 commit comments

Comments
 (0)