Skip to content

Commit e741888

Browse files
authored
Support benchmark for polling ingestion (#784)
Signed-off-by: xuxiong1 <[email protected]>
1 parent 3dc35c5 commit e741888

File tree

13 files changed

+879
-124
lines changed

13 files changed

+879
-124
lines changed

create-notice.sh

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ function main {
5454
add_license "ijson" "https://raw.githubusercontent.com/ICRAR/ijson/master/LICENSE.txt"
5555
add_license "google-resumable-media" "https://raw.githubusercontent.com/googleapis/google-resumable-media-python/master/LICENSE"
5656
add_license "google-auth" "https://raw.githubusercontent.com/googleapis/google-auth-library-python/master/LICENSE"
57+
add_license "aiokafka" "https://raw.githubusercontent.com/aio-libs/aiokafka/master/LICENSE"
5758

5859
# transitive dependencies
5960
# Jinja2 dependencies

osbenchmark/client.py

+18-115
Original file line numberDiff line numberDiff line change
@@ -22,132 +22,19 @@
2222
# specific language governing permissions and limitations
2323
# under the License.
2424

25-
import contextvars
2625
import logging
2726
import time
2827

2928
import certifi
3029
import urllib3
3130
from urllib3.util.ssl_ import is_ipaddress
31+
from osbenchmark.kafka_client import KafkaMessageProducer
3232

3333
from osbenchmark import exceptions, doc_link
34+
from osbenchmark.context import RequestContextHolder
3435
from osbenchmark.utils import console, convert
3536

3637

37-
class RequestContextManager:
38-
"""
39-
Ensures that request context span the defined scope and allow nesting of request contexts with proper propagation.
40-
This means that we can span a top-level request context, open sub-request contexts that can be used to measure
41-
individual timings and still measure the proper total time on the top-level request context.
42-
"""
43-
def __init__(self, request_context_holder):
44-
self.ctx_holder = request_context_holder
45-
self.ctx = None
46-
self.token = None
47-
48-
async def __aenter__(self):
49-
self.ctx, self.token = self.ctx_holder.init_request_context()
50-
return self
51-
52-
@property
53-
def request_start(self):
54-
return self.ctx["request_start"]
55-
56-
@property
57-
def request_end(self):
58-
return max((value for value in self.ctx["request_end_list"] if value < self.client_request_end))
59-
60-
@property
61-
def client_request_start(self):
62-
return self.ctx["client_request_start"]
63-
64-
@property
65-
def client_request_end(self):
66-
return self.ctx["client_request_end"]
67-
68-
async def __aexit__(self, exc_type, exc_val, exc_tb):
69-
# propagate earliest request start and most recent request end to parent
70-
client_request_start = self.client_request_start
71-
client_request_end = self.client_request_end
72-
request_start = self.request_start
73-
request_end = self.request_end
74-
self.ctx_holder.restore_context(self.token)
75-
# don't attempt to restore these values on the top-level context as they don't exist
76-
if self.token.old_value != contextvars.Token.MISSING:
77-
self.ctx_holder.update_request_start(request_start)
78-
self.ctx_holder.update_request_end(request_end)
79-
self.ctx_holder.update_client_request_start(client_request_start)
80-
self.ctx_holder.update_client_request_end(client_request_end)
81-
self.token = None
82-
return False
83-
84-
85-
class RequestContextHolder:
86-
"""
87-
Holds request context variables. This class is only meant to be used together with RequestContextManager.
88-
"""
89-
request_context = contextvars.ContextVar("benchmark_request_context")
90-
91-
def new_request_context(self):
92-
return RequestContextManager(self)
93-
94-
@classmethod
95-
def init_request_context(cls):
96-
ctx = {}
97-
token = cls.request_context.set(ctx)
98-
return ctx, token
99-
100-
@classmethod
101-
def restore_context(cls, token):
102-
cls.request_context.reset(token)
103-
104-
@classmethod
105-
def update_request_start(cls, new_request_start):
106-
meta = cls.request_context.get()
107-
# this can happen if multiple requests are sent on the wire for one logical request (e.g. scrolls)
108-
if "request_start" not in meta and "client_request_start" in meta:
109-
meta["request_start"] = new_request_start
110-
111-
@classmethod
112-
def update_request_end(cls, new_request_end):
113-
meta = cls.request_context.get()
114-
if "request_end_list" not in meta:
115-
meta["request_end_list"] = []
116-
meta["request_end_list"].append(new_request_end)
117-
118-
@classmethod
119-
def update_client_request_start(cls, new_client_request_start):
120-
meta = cls.request_context.get()
121-
if "client_request_start" not in meta:
122-
meta["client_request_start"] = new_client_request_start
123-
124-
@classmethod
125-
def update_client_request_end(cls, new_client_request_end):
126-
meta = cls.request_context.get()
127-
meta["client_request_end"] = new_client_request_end
128-
129-
@classmethod
130-
def on_client_request_start(cls):
131-
cls.update_client_request_start(time.perf_counter())
132-
133-
@classmethod
134-
def on_client_request_end(cls):
135-
cls.update_client_request_end(time.perf_counter())
136-
137-
@classmethod
138-
def on_request_start(cls):
139-
cls.update_request_start(time.perf_counter())
140-
141-
@classmethod
142-
def on_request_end(cls):
143-
cls.update_request_end(time.perf_counter())
144-
145-
@classmethod
146-
def return_raw_response(cls):
147-
ctx = cls.request_context.get()
148-
ctx["raw_response"] = True
149-
150-
15138
class OsClientFactory:
15239
"""
15340
Abstracts how the OpenSearch client is created. Intended for testing.
@@ -430,3 +317,19 @@ def wait_for_rest_layer(opensearch, max_attempts=40):
430317
logger.warning("Got unexpected status code [%s] on attempt [%s].", e.status_code, attempt)
431318
raise e
432319
return False
320+
321+
322+
class MessageProducerFactory:
323+
@staticmethod
324+
async def create(params):
325+
"""
326+
Creates and returns a message producer based on the ingestion source.
327+
Currently supports Kafka. Ingestion source should be a dict like:
328+
{'type': 'kafka', 'param': {'topic': 'test', 'bootstrap-servers': 'localhost:34803'}}
329+
"""
330+
ingestion_source = params.get("ingestion-source", {})
331+
producer_type = ingestion_source.get("type", "kafka").lower()
332+
if producer_type == "kafka":
333+
return await KafkaMessageProducer.create(params)
334+
else:
335+
raise ValueError(f"Unsupported ingestion source type: {producer_type}")

osbenchmark/context.py

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
#
3+
# The OpenSearch Contributors require contributions made to
4+
# this file be licensed under the Apache-2.0 license or a
5+
# compatible open source license.
6+
# Modifications Copyright OpenSearch Contributors. See
7+
# GitHub history for details.
8+
# Licensed to Elasticsearch B.V. under one or more contributor
9+
# license agreements. See the NOTICE file distributed with
10+
# this work for additional information regarding copyright
11+
# ownership. Elasticsearch B.V. licenses this file to you under
12+
# the Apache License, Version 2.0 (the "License"); you may
13+
# not use this file except in compliance with the License.
14+
# You may obtain a copy of the License at
15+
#
16+
# http://www.apache.org/licenses/LICENSE-2.0
17+
#
18+
# Unless required by applicable law or agreed to in writing,
19+
# software distributed under the License is distributed on an
20+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
21+
# KIND, either express or implied. See the License for the
22+
# specific language governing permissions and limitations
23+
# under the License.
24+
25+
import contextvars
26+
import time
27+
28+
29+
class RequestContextManager:
30+
"""
31+
Ensures that request context span the defined scope and allow nesting of request contexts with proper propagation.
32+
This means that we can span a top-level request context, open sub-request contexts that can be used to measure
33+
individual timings and still measure the proper total time on the top-level request context.
34+
"""
35+
def __init__(self, request_context_holder):
36+
self.ctx_holder = request_context_holder
37+
self.ctx = None
38+
self.token = None
39+
40+
async def __aenter__(self):
41+
self.ctx, self.token = self.ctx_holder.init_request_context()
42+
return self
43+
44+
@property
45+
def request_start(self):
46+
return self.ctx["request_start"]
47+
48+
@property
49+
def request_end(self):
50+
return max((value for value in self.ctx["request_end_list"] if value < self.client_request_end))
51+
52+
@property
53+
def client_request_start(self):
54+
return self.ctx["client_request_start"]
55+
56+
@property
57+
def client_request_end(self):
58+
return self.ctx["client_request_end"]
59+
60+
async def __aexit__(self, exc_type, exc_val, exc_tb):
61+
# propagate earliest request start and most recent request end to parent
62+
client_request_start = self.client_request_start
63+
client_request_end = self.client_request_end
64+
request_start = self.request_start
65+
request_end = self.request_end
66+
self.ctx_holder.restore_context(self.token)
67+
# don't attempt to restore these values on the top-level context as they don't exist
68+
if self.token.old_value != contextvars.Token.MISSING:
69+
self.ctx_holder.update_request_start(request_start)
70+
self.ctx_holder.update_request_end(request_end)
71+
self.ctx_holder.update_client_request_start(client_request_start)
72+
self.ctx_holder.update_client_request_end(client_request_end)
73+
self.token = None
74+
return False
75+
76+
77+
class RequestContextHolder:
78+
"""
79+
Holds request context variables. This class is only meant to be used together with RequestContextManager.
80+
"""
81+
request_context = contextvars.ContextVar("benchmark_request_context")
82+
83+
def new_request_context(self):
84+
return RequestContextManager(self)
85+
86+
@classmethod
87+
def init_request_context(cls):
88+
ctx = {}
89+
token = cls.request_context.set(ctx)
90+
return ctx, token
91+
92+
@classmethod
93+
def restore_context(cls, token):
94+
cls.request_context.reset(token)
95+
96+
@classmethod
97+
def update_request_start(cls, new_request_start):
98+
meta = cls.request_context.get()
99+
# this can happen if multiple requests are sent on the wire for one logical request (e.g. scrolls)
100+
if "request_start" not in meta and "client_request_start" in meta:
101+
meta["request_start"] = new_request_start
102+
103+
@classmethod
104+
def update_request_end(cls, new_request_end):
105+
meta = cls.request_context.get()
106+
if "request_end_list" not in meta:
107+
meta["request_end_list"] = []
108+
meta["request_end_list"].append(new_request_end)
109+
110+
@classmethod
111+
def update_client_request_start(cls, new_client_request_start):
112+
meta = cls.request_context.get()
113+
if "client_request_start" not in meta:
114+
meta["client_request_start"] = new_client_request_start
115+
116+
@classmethod
117+
def update_client_request_end(cls, new_client_request_end):
118+
meta = cls.request_context.get()
119+
meta["client_request_end"] = new_client_request_end
120+
121+
@classmethod
122+
def on_client_request_start(cls):
123+
cls.update_client_request_start(time.perf_counter())
124+
125+
@classmethod
126+
def on_client_request_end(cls):
127+
cls.update_client_request_end(time.perf_counter())
128+
129+
@classmethod
130+
def on_request_start(cls):
131+
cls.update_request_start(time.perf_counter())
132+
133+
@classmethod
134+
def on_request_end(cls):
135+
cls.update_request_end(time.perf_counter())
136+
137+
@classmethod
138+
def return_raw_response(cls):
139+
ctx = cls.request_context.get()
140+
ctx["raw_response"] = True

osbenchmark/kafka_client.py

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
#
3+
# The OpenSearch Contributors require contributions made to
4+
# this file be licensed under the Apache-2.0 license or a
5+
# compatible open source license.
6+
# Modifications Copyright OpenSearch Contributors. See
7+
# GitHub history for details.
8+
# Licensed to Elasticsearch B.V. under one or more contributor
9+
# license agreements. See the NOTICE file distributed with
10+
# this work for additional information regarding copyright
11+
# ownership. Elasticsearch B.V. licenses this file to you under
12+
# the Apache License, Version 2.0 (the "License"); you may
13+
# not use this file except in compliance with the License.
14+
# You may obtain a copy of the License at
15+
#
16+
# http://www.apache.org/licenses/LICENSE-2.0
17+
#
18+
# Unless required by applicable law or agreed to in writing,
19+
# software distributed under the License is distributed on an
20+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
21+
# KIND, either express or implied. See the License for the
22+
# specific language governing permissions and limitations
23+
# under the License.
24+
25+
from aiokafka import AIOKafkaProducer
26+
from osbenchmark.context import RequestContextHolder
27+
28+
class KafkaMessageProducer:
29+
def __init__(self, producer, topic):
30+
self._producer = producer
31+
self._topic = topic
32+
self._ctx_holder = RequestContextHolder()
33+
34+
@classmethod
35+
async def create(cls, params):
36+
"""
37+
Creates a Kafka producer based on parameters in the ingestion source.
38+
"""
39+
40+
ingestion_source = params.get("ingestion-source", {})
41+
kafka_params = ingestion_source.get("param", {})
42+
topic = kafka_params.get("topic")
43+
if not topic:
44+
raise ValueError("No 'topic' specified in ingestion source parameters.")
45+
bootstrap_servers = kafka_params.get("bootstrap-servers", "")
46+
47+
producer = AIOKafkaProducer(
48+
bootstrap_servers=bootstrap_servers,
49+
key_serializer=str.encode,
50+
value_serializer=str.encode
51+
)
52+
await producer.start()
53+
return cls(producer, topic)
54+
55+
async def send_message(self, message, key=""):
56+
"""
57+
Sends a message to the producer's topic.
58+
"""
59+
await self._producer.send_and_wait(self._topic, message, key=key)
60+
61+
async def stop(self):
62+
"""
63+
Stops the underlying producer.
64+
"""
65+
await self._producer.stop()
66+
67+
@property
68+
def new_request_context(self):
69+
# Delegate to the internal holder
70+
return self._ctx_holder.new_request_context

0 commit comments

Comments
 (0)