Skip to content

Commit 4300302

Browse files
committed
update mosaic helpers and add fn tests
1 parent 060ce5f commit 4300302

10 files changed

+743
-421
lines changed

functional-tests/common/mosaic_e2e.py

Lines changed: 361 additions & 48 deletions
Large diffs are not rendered by default.

functional-tests/entry.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ def disabled_tests() -> frozenset[str]:
3838
[
3939
# only used in keep-alive mode
4040
"keepalive_stub_test",
41-
# enable after issues are resolved
42-
"fn_mosaic_setup_concurrent",
43-
"fn_mosaic_setup_staggered",
4441
]
4542
)
4643

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import time
2+
3+
import flexitest
4+
5+
from common.base_test import BaseTest
6+
from common.mosaic_e2e import (
7+
PreparedDeposit,
8+
handle_all_setups,
9+
init_evaluator_deposit,
10+
init_garbler_deposit,
11+
prepare_deposit,
12+
wait_all_deposits_ready,
13+
)
14+
from envs.mosaic_env import MosaicEnv
15+
16+
NETWORK_SIZE = 3
17+
DEPOSIT_COUNT = 2
18+
19+
20+
@flexitest.register
21+
class MosaicConcurrentEvaluatorFirstTest(BaseTest):
22+
"""
23+
Tests multiple mosaic deposit across multiple nodes,
24+
with all evaluators initializing first, before garblers:
25+
"""
26+
27+
def __init__(self, ctx: flexitest.InitContext):
28+
ctx.set_env(MosaicEnv(NETWORK_SIZE))
29+
30+
def main(self, ctx: flexitest.RunContext):
31+
rpcs = {i: ctx.get_service(f"mosaic_{i}").create_rpc() for i in range(NETWORK_SIZE)}
32+
peer_ids = {i: rpcs[i].mosaic_getRpcPeerId() for i in range(NETWORK_SIZE)}
33+
34+
tsid_map = handle_all_setups(self.logger, rpcs, peer_ids, NETWORK_SIZE)
35+
self.logger.info("*** ALL SETUPS COMPLETE ***")
36+
37+
# Prepare and init deposits on every setup for every deposit index
38+
prepared: list[tuple[str, PreparedDeposit]] = []
39+
for deposit_idx in range(DEPOSIT_COUNT):
40+
for garbler in range(NETWORK_SIZE):
41+
for evaluator in range(NETWORK_SIZE):
42+
if garbler == evaluator:
43+
continue
44+
tsids = tsid_map[(garbler, evaluator)]
45+
name = f"deposit_{deposit_idx}_g{garbler}_e{evaluator}"
46+
dep = prepare_deposit(
47+
rpcs[garbler],
48+
rpcs[evaluator],
49+
tsids.garbler_tsid,
50+
tsids.evaluator_tsid,
51+
deposit_idx,
52+
)
53+
prepared.append((name, dep))
54+
55+
for name, dep in prepared:
56+
init_evaluator_deposit(self.logger, dep, name)
57+
58+
time.sleep(10)
59+
60+
for name, dep in prepared:
61+
init_garbler_deposit(self.logger, dep, name)
62+
63+
wait_all_deposits_ready(self.logger, prepared)
64+
self.logger.info("*** ALL DEPOSITS COMPLETE ***")
65+
66+
return True
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import time
2+
3+
import flexitest
4+
5+
from common.base_test import BaseTest
6+
from common.mosaic_e2e import (
7+
PreparedDeposit,
8+
handle_all_setups,
9+
init_evaluator_deposit,
10+
init_garbler_deposit,
11+
prepare_deposit,
12+
wait_all_deposits_ready,
13+
)
14+
from envs.mosaic_env import MosaicEnv
15+
16+
NETWORK_SIZE = 3
17+
DEPOSIT_COUNT = 2
18+
19+
20+
@flexitest.register
21+
class MosaicConcurrentDepositGarblerFirstTest(BaseTest):
22+
"""
23+
Tests multiple mosaic deposit across multiple nodes,
24+
with all garbler initializing first, before evaluators:
25+
"""
26+
27+
def __init__(self, ctx: flexitest.InitContext):
28+
ctx.set_env(MosaicEnv(NETWORK_SIZE))
29+
30+
def main(self, ctx: flexitest.RunContext):
31+
rpcs = {i: ctx.get_service(f"mosaic_{i}").create_rpc() for i in range(NETWORK_SIZE)}
32+
peer_ids = {i: rpcs[i].mosaic_getRpcPeerId() for i in range(NETWORK_SIZE)}
33+
34+
tsid_map = handle_all_setups(self.logger, rpcs, peer_ids, NETWORK_SIZE)
35+
self.logger.info("*** ALL SETUPS COMPLETE ***")
36+
37+
# Prepare and init deposits on every setup for every deposit index
38+
prepared: list[tuple[str, PreparedDeposit]] = []
39+
for deposit_idx in range(DEPOSIT_COUNT):
40+
for garbler in range(NETWORK_SIZE):
41+
for evaluator in range(NETWORK_SIZE):
42+
if garbler == evaluator:
43+
continue
44+
tsids = tsid_map[(garbler, evaluator)]
45+
name = f"deposit_{deposit_idx}_g{garbler}_e{evaluator}"
46+
dep = prepare_deposit(
47+
rpcs[garbler],
48+
rpcs[evaluator],
49+
tsids.garbler_tsid,
50+
tsids.evaluator_tsid,
51+
deposit_idx,
52+
)
53+
prepared.append((name, dep))
54+
55+
for name, dep in prepared:
56+
init_garbler_deposit(self.logger, dep, name)
57+
58+
time.sleep(10)
59+
60+
for name, dep in prepared:
61+
init_evaluator_deposit(self.logger, dep, name)
62+
63+
wait_all_deposits_ready(self.logger, prepared)
64+
self.logger.info("*** ALL DEPOSITS COMPLETE ***")
65+
66+
return True

functional-tests/tests/setup/fn_mosaic_setup_concurrent.py

Lines changed: 0 additions & 124 deletions
This file was deleted.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import secrets
2+
import time
3+
4+
import flexitest
5+
6+
from common.base_test import BaseTest
7+
from common.mosaic_e2e import TablesetPair, TsidMap, wait_all_setup_complete
8+
from envs.mosaic_env import MosaicEnv
9+
10+
NETWORK_SIZE = 3
11+
12+
13+
@flexitest.register
14+
class MosaicSetupGerblerFirstTest(BaseTest):
15+
"""
16+
Tests mosaic setup across multiple nodes where all evaluator setups
17+
are issued first, then after a 10s delay, all evaluator setups are issued.
18+
"""
19+
20+
def __init__(self, ctx: flexitest.InitContext):
21+
ctx.set_env(MosaicEnv(NETWORK_SIZE))
22+
23+
def main(self, ctx: flexitest.RunContext):
24+
rpcs = {i: ctx.get_service(f"mosaic_{i}").create_rpc() for i in range(NETWORK_SIZE)}
25+
peer_ids = {i: rpcs[i].mosaic_getRpcPeerId() for i in range(NETWORK_SIZE)}
26+
27+
instance_id = "0" * 64
28+
setups = []
29+
tsid_map: TsidMap = {}
30+
31+
# Phase 1: issue all garbler setups
32+
setup_inputs_map: dict[tuple[int, int], str] = {}
33+
for garbler in range(NETWORK_SIZE):
34+
for evaluator in range(NETWORK_SIZE):
35+
if garbler == evaluator:
36+
continue
37+
38+
setup_inputs = secrets.token_hex(32)
39+
setup_inputs_map[(garbler, evaluator)] = setup_inputs
40+
41+
tsid_e = rpcs[evaluator].mosaic_setupTableset(
42+
{
43+
"role": "evaluator",
44+
"peer_info": {"peer_id": peer_ids[garbler]},
45+
"setup_inputs": setup_inputs,
46+
"instance_id": instance_id,
47+
}
48+
)
49+
name_e = f"node{evaluator}_evaluator_to_node{garbler}"
50+
self.logger.info(f"{name_e}: {tsid_e}")
51+
setups.append((name_e, rpcs[evaluator], tsid_e))
52+
tsid_map[(garbler, evaluator)] = TablesetPair("", tsid_e)
53+
54+
self.logger.info("*** ALL GARBLER SETUPS ISSUED, waiting 10s ***")
55+
time.sleep(10)
56+
57+
# Phase 2: issue all evaluator setups
58+
for garbler in range(NETWORK_SIZE):
59+
for evaluator in range(NETWORK_SIZE):
60+
if garbler == evaluator:
61+
continue
62+
63+
setup_inputs = setup_inputs_map[(garbler, evaluator)]
64+
65+
tsid_g = rpcs[garbler].mosaic_setupTableset(
66+
{
67+
"role": "garbler",
68+
"peer_info": {"peer_id": peer_ids[evaluator]},
69+
"setup_inputs": setup_inputs,
70+
"instance_id": instance_id,
71+
}
72+
)
73+
name_g = f"node{garbler}_garbler_to_node{evaluator}"
74+
self.logger.info(f"{name_g}: {tsid_g}")
75+
setups.append((name_g, rpcs[garbler], tsid_g))
76+
77+
tp = tsid_map[(garbler, evaluator)]
78+
tsid_map[(garbler, evaluator)] = TablesetPair(tsid_g, tp.evaluator_tsid)
79+
80+
self.logger.info("*** ALL EVALUATOR SETUPS ISSUED ***")
81+
82+
wait_all_setup_complete(self.logger, setups)
83+
self.logger.info("*** ALL SETUPS COMPLETE ***")
84+
85+
return True

0 commit comments

Comments
 (0)