Skip to content

Commit 4a810ec

Browse files
committed
add multiprocess support to sync
Signed-off-by: Lior Sventitzky <[email protected]>
1 parent 4af4d20 commit 4a810ec

File tree

3 files changed

+37
-3
lines changed

3 files changed

+37
-3
lines changed

python/python/glide/sync/glide_client.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
22

3+
import os
34
import sys
45
from pathlib import Path
56
from typing import List, Optional, Union
@@ -48,8 +49,19 @@ def create(cls, config: BaseClientConfiguration) -> Self:
4849
self._init_ffi()
4950
self.config = config
5051
self._is_closed = False
51-
conn_req = config._create_a_protobuf_conn_request(
52-
cluster_mode=type(config) is GlideClusterClientConfiguration
52+
53+
os.register_at_fork(after_in_child=self._reset_client_connection)
54+
55+
self._create_new_core_client()
56+
57+
return self
58+
59+
def _reset_client_connection(self):
60+
self._create_new_core_client()
61+
62+
def _create_new_core_client(self):
63+
conn_req = self.config._create_a_protobuf_conn_request(
64+
cluster_mode=type(self.config) is GlideClusterClientConfiguration
5365
)
5466
conn_req_bytes = conn_req.SerializeToString()
5567
client_type = self.ffi.new(
@@ -82,7 +94,6 @@ def create(cls, config: BaseClientConfiguration) -> Self:
8294
self.lib.free_connection_response(client_response_ptr)
8395
else:
8496
raise ClosingError("Failed to create client, response pointer is NULL.")
85-
return self
8697

8798
def _init_ffi(self):
8899
self.ffi = FFI()

python/tests/sync_tests/test_sync_client.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from __future__ import annotations
55

66
import math
7+
import os
78
import threading
89
import time
910
from datetime import date, datetime, timedelta, timezone
@@ -380,6 +381,26 @@ def connect_to_client():
380381
# Clean up the main client
381382
client.close()
382383

384+
@pytest.mark.parametrize("cluster_mode", [True, False])
385+
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
386+
def test_sync_fork(self, glide_sync_client: TGlideClient):
387+
try:
388+
pid = os.fork()
389+
except OSError as e:
390+
pytest.fail(f"Fork failed: {e}")
391+
392+
if pid == 0:
393+
# Child process
394+
glide_sync_client.set("key", "value")
395+
assert glide_sync_client.get("key") == "value".encode()
396+
os._exit(0)
397+
else:
398+
# Parent process
399+
glide_sync_client.set("key", "value")
400+
assert glide_sync_client.get("key") == "value".encode()
401+
_, status = os.waitpid(pid, 0)
402+
if not os.WIFEXITED(status) or os.WEXITSTATUS(status) != 0:
403+
pytest.fail(f"Child process failed with status {status}")
383404

384405
class TestCommands:
385406
@pytest.mark.smoke_test

python/tests/utils/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ async def delete_acl_username_and_password(client: TGlideClient, username: str):
446446
["ACL", "DELUSER", username], route=AllNodes()
447447
)
448448

449+
449450
def create_client_config(
450451
request,
451452
cluster_mode: bool,
@@ -509,6 +510,7 @@ def create_client_config(
509510
)
510511
return config
511512

513+
512514
def run_sync_func_with_timeout_in_thread(
513515
func: Callable, timeout: float, on_timeout=None
514516
):

0 commit comments

Comments
 (0)