Skip to content

Commit e0b54de

Browse files
committed
Rename metadata to headers
1 parent 1c2a7c2 commit e0b54de

File tree

8 files changed

+270
-102
lines changed

8 files changed

+270
-102
lines changed

jina/constants.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
import datetime as _datetime
12
import os as _os
23
import sys as _sys
34
from pathlib import Path as _Path
4-
import datetime as _datetime
55

66
__windows__ = _sys.platform == 'win32'
77
__uptime__ = _datetime.datetime.now().isoformat()
@@ -53,6 +53,7 @@
5353
__args_executor_func__ = {
5454
'docs',
5555
'parameters',
56+
'headers',
5657
'docs_matrix',
5758
}
5859
__args_executor_init__ = {'metas', 'requests', 'runtime_args'}

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,10 @@ def add_post_route(
187187
)
188188
app_kwargs['response_class'] = DocArrayResponse
189189

190+
from fastapi import Request
191+
190192
@app.api_route(**app_kwargs)
191-
async def post(body: input_model, response: Response):
193+
async def post(body: input_model, response: Response, request: Request):
192194
target_executor = None
193195
req_id = None
194196
if body.header is not None:
@@ -208,6 +210,7 @@ async def post(body: input_model, response: Response):
208210
docs,
209211
exec_endpoint=endpoint_path,
210212
parameters=body.parameters,
213+
headers=request.headers,
211214
target_executor=target_executor,
212215
request_id=req_id,
213216
return_results=True,

jina/serve/runtimes/gateway/request_handling.py

Lines changed: 31 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -157,48 +157,37 @@ async def _load_balance(self, request):
157157
try:
158158
async with aiohttp.ClientSession() as session:
159159

160-
if request.method == 'GET':
161-
request_kwargs = {}
162-
try:
163-
payload = await request.json()
164-
if payload:
165-
request_kwargs['json'] = payload
166-
except Exception:
167-
self.logger.debug('No JSON payload found in request')
168-
169-
async with session.get(
170-
url=target_url, **request_kwargs
171-
) as response:
172-
# Create a StreamResponse with the same headers and status as the target response
173-
stream_response = web.StreamResponse(
174-
status=response.status,
175-
headers=response.headers,
176-
)
177-
178-
# Prepare the response to send headers
179-
await stream_response.prepare(request)
180-
181-
# Stream the response from the target server to the client
182-
async for chunk in response.content.iter_any():
183-
await stream_response.write(chunk)
184-
185-
# Close the stream response once all chunks are sent
186-
await stream_response.write_eof()
187-
return stream_response
188-
189-
elif request.method == 'POST':
190-
d = await request.read()
191-
import json
192-
193-
async with session.post(
194-
url=target_url, json=json.loads(d.decode())
195-
) as response:
196-
content = await response.read()
197-
return web.Response(
198-
body=content,
199-
status=response.status,
200-
content_type=response.content_type,
201-
)
160+
request_kwargs = {}
161+
try:
162+
payload = await request.json()
163+
if payload:
164+
request_kwargs['json'] = payload
165+
except Exception:
166+
self.logger.debug('No JSON payload found in request')
167+
168+
async with session.request(
169+
request.method,
170+
url=target_url,
171+
auto_decompress=False,
172+
**request_kwargs,
173+
) as response:
174+
# Create a StreamResponse with the same headers and status as the target response
175+
stream_response = web.StreamResponse(
176+
status=response.status,
177+
headers=response.headers,
178+
)
179+
180+
# Prepare the response to send headers
181+
await stream_response.prepare(request)
182+
183+
# Stream the response from the target server to the client
184+
async for chunk in response.content.iter_any():
185+
await stream_response.write(chunk)
186+
187+
# Close the stream response once all chunks are sent
188+
await stream_response.write_eof()
189+
return stream_response
190+
202191
except aiohttp.ClientError as e:
203192
return web.Response(text=f'Error: {str(e)}', status=500)
204193

jina/serve/runtimes/gateway/streamer.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
AsyncIterator,
77
Dict,
88
List,
9+
Mapping,
910
Optional,
1011
Sequence,
1112
Tuple,
@@ -209,6 +210,7 @@ async def stream(
209210
exec_endpoint: Optional[str] = None,
210211
target_executor: Optional[str] = None,
211212
parameters: Optional[Dict] = None,
213+
headers: Optional[Mapping[str, str]] = None,
212214
results_in_order: bool = False,
213215
return_type: Type[DocumentArray] = DocumentArray,
214216
) -> AsyncIterator[Tuple[Union[DocumentArray, 'Request'], 'ExecutorError']]:
@@ -221,6 +223,7 @@ async def stream(
221223
:param exec_endpoint: The Executor endpoint to which to send the Documents
222224
:param target_executor: A regex expression indicating the Executors that should receive the Request
223225
:param parameters: Parameters to be attached to the Requests
226+
:param headers: Http request headers
224227
:param results_in_order: return the results in the same order as the request_iterator
225228
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
226229
:yield: tuple of Documents or Responses and unpacked error from Executors if any
@@ -232,6 +235,7 @@ async def stream(
232235
exec_endpoint=exec_endpoint,
233236
target_executor=target_executor,
234237
parameters=parameters,
238+
headers=headers,
235239
results_in_order=results_in_order,
236240
return_type=return_type,
237241
):
@@ -256,6 +260,7 @@ async def stream_doc(
256260
exec_endpoint: Optional[str] = None,
257261
target_executor: Optional[str] = None,
258262
parameters: Optional[Dict] = None,
263+
headers: Optional[Mapping[str, str]] = None,
259264
request_id: Optional[str] = None,
260265
return_type: Type[DocumentArray] = DocumentArray,
261266
) -> AsyncIterator[Tuple[Union[DocumentArray, 'Request'], 'ExecutorError']]:
@@ -267,6 +272,7 @@ async def stream_doc(
267272
:param exec_endpoint: The Executor endpoint to which to send the Documents
268273
:param target_executor: A regex expression indicating the Executors that should receive the Request
269274
:param parameters: Parameters to be attached to the Requests
275+
:param headers: Http request headers
270276
:param request_id: Request ID to add to the request streamed to Executor. Only applicable if request_size is equal or less to the length of the docs
271277
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
272278
:yield: tuple of Documents or Responses and unpacked error from Executors if any
@@ -282,6 +288,8 @@ async def stream_doc(
282288
req.header.target_executor = target_executor
283289
if parameters:
284290
req.parameters = parameters
291+
if headers:
292+
req.headers = headers
285293

286294
async for result in self.rpc_stream_doc(request=req, return_type=return_type):
287295
error = None
@@ -306,6 +314,7 @@ async def stream_docs(
306314
exec_endpoint: Optional[str] = None,
307315
target_executor: Optional[str] = None,
308316
parameters: Optional[Dict] = None,
317+
headers: Optional[Mapping[str, str]] = None,
309318
results_in_order: bool = False,
310319
request_id: Optional[str] = None,
311320
return_type: Type[DocumentArray] = DocumentArray,
@@ -319,6 +328,7 @@ async def stream_docs(
319328
:param exec_endpoint: The Executor endpoint to which to send the Documents
320329
:param target_executor: A regex expression indicating the Executors that should receive the Request
321330
:param parameters: Parameters to be attached to the Requests
331+
:param headers: Http request headers
322332
:param results_in_order: return the results in the same order as the request_iterator
323333
:param request_id: Request ID to add to the request streamed to Executor. Only applicable if request_size is equal or less to the length of the docs
324334
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
@@ -339,6 +349,8 @@ def _req_generator():
339349
req.header.target_executor = target_executor
340350
if parameters:
341351
req.parameters = parameters
352+
if headers:
353+
req.headers = headers
342354
yield req
343355
else:
344356
from docarray import BaseDoc
@@ -361,6 +373,8 @@ def batch(iterable, n=1):
361373
req.header.target_executor = target_executor
362374
if parameters:
363375
req.parameters = parameters
376+
if headers:
377+
req.headers = headers
364378
yield req
365379
else:
366380
req = DataRequest()
@@ -374,6 +388,8 @@ def batch(iterable, n=1):
374388
req.header.target_executor = target_executor
375389
if parameters:
376390
req.parameters = parameters
391+
if headers:
392+
req.headers = headers
377393
yield req
378394

379395
async for resp in self.rpc_stream(
@@ -438,6 +454,7 @@ async def post(
438454
request_size: int = 100,
439455
on: Optional[str] = None,
440456
parameters: Optional[Dict] = None,
457+
headers: Optional[Mapping[str, str]] = None,
441458
return_type: Type[DocumentArray] = DocumentArray,
442459
**kwargs,
443460
):
@@ -505,6 +522,7 @@ async def stream_doc(
505522
inputs: 'Document',
506523
on: Optional[str] = None,
507524
parameters: Optional[Dict] = None,
525+
headers: Optional[Mapping[str, str]] = None,
508526
**kwargs,
509527
):
510528
req: SingleDocumentRequest = SingleDocumentRequest(inputs.to_protobuf())

jina/serve/runtimes/worker/http_fastapi_app.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,18 @@ def add_post_route(
8686

8787
app_kwargs['response_class'] = DocArrayResponse
8888

89+
from fastapi import Request
90+
8991
@app.api_route(**app_kwargs)
90-
async def post(body: input_model, response: Response):
92+
async def post(body: input_model, response: Response, request: Request):
9193

9294
req = DataRequest()
9395
if body.header is not None:
9496
req.header.request_id = body.header.request_id
9597

9698
if body.parameters is not None:
9799
req.parameters = body.parameters
100+
req.headers = request.headers
98101
req.header.exec_endpoint = endpoint_path
99102
data = body.data
100103
if isinstance(data, list):
@@ -149,6 +152,7 @@ async def streaming_get(request: Request = None, body: input_doc_model = None):
149152
body = Document.from_pydantic_model(body)
150153
req = DataRequest()
151154
req.header.exec_endpoint = endpoint_path
155+
req.headers = request.headers
152156
if not docarray_v2:
153157
req.data.docs = DocumentArray([body])
154158
else:

0 commit comments

Comments
 (0)