Skip to content

Commit 974b082

Browse files
authored
Added margo_migrate_progress_loop (#304)
* added margo_migrate_progress_loop * updated version number
1 parent 339611b commit 974b082

File tree

5 files changed

+215
-4
lines changed

5 files changed

+215
-4
lines changed

configure.ac

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ AC_PREREQ([2.63])
55

66
# IMPORTANT: when editing the version number, edit both the
77
# MARGO_VERSION_ variables and the argument provided to AC_INIT
8-
AC_INIT([margo], [0.19.0], [],[],[])
8+
AC_INIT([margo], [0.19.1], [],[],[])
99

1010
MARGO_VERSION_MAJOR=0
1111
MARGO_VERSION_MINOR=19
12-
MARGO_VERSION_PATCH=0
12+
MARGO_VERSION_PATCH=1
1313
MARGO_VERSION="$MARGO_VERSION_MAJOR.$MARGO_VERSION_MINOR.$MARGO_VERSION_PATCH"
1414
MARGO_VERSION_NUM=$((MARGO_VERSION_MAJOR*100000+MARGO_VERSION_MINOR*100+MARGO_VERSION_PATCH))
1515

include/margo.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1902,6 +1902,16 @@ int margo_get_progress_timeout_ub_msec(margo_instance_id mid,
19021902
*/
19031903
int margo_set_progress_when_needed(margo_instance_id mid, bool when_needed);
19041904

1905+
/**
1906+
* @brief Migrate the margo instance's progress ULT to the specified pool.
1907+
*
1908+
* @param mid Margo instance.
1909+
* @param pool_idx index of the pool.
1910+
*
1911+
* @return 0 in case of success, an ABT error otherwise.
1912+
*/
1913+
int margo_migrate_progress_loop(margo_instance_id mid, unsigned pool_idx);
1914+
19051915
/**
19061916
* @brief Sets configurable parameters/hints.
19071917
*

src/margo-core.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2445,3 +2445,13 @@ int margo_set_progress_when_needed(margo_instance_id mid, bool when_needed)
24452445
}
24462446
return 0;
24472447
}
2448+
2449+
int margo_migrate_progress_loop(margo_instance_id mid, unsigned pool_idx)
2450+
{
2451+
if (mid == MARGO_INSTANCE_NULL) return ABT_ERR_INV_ARG;
2452+
if (pool_idx >= mid->abt.pools_len) return ABT_ERR_INV_ARG;
2453+
if (pool_idx == mid->progress_pool_idx) return 0;
2454+
mid->progress_pool_idx = pool_idx;
2455+
ABT_pool target_pool = mid->abt.pools[pool_idx].pool;
2456+
return ABT_thread_migrate_to_pool(mid->hg_progress_tid, target_pool);
2457+
}

tests/unit-tests/Makefile.subdir

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ check_PROGRAMS += \
1818
tests/unit-tests/margo-comm-finalize \
1919
tests/unit-tests/margo-forward \
2020
tests/unit-tests/margo-monitoring \
21-
tests/unit-tests/margo-sanity-warnings
21+
tests/unit-tests/margo-sanity-warnings \
22+
tests/unit-tests/margo-migrate-progress
2223

2324
TESTS += \
2425
tests/unit-tests/margo-addr \
@@ -38,7 +39,8 @@ TESTS += \
3839
tests/unit-tests/margo-comm-finalize \
3940
tests/unit-tests/margo-forward \
4041
tests/unit-tests/margo-monitoring \
41-
tests/unit-tests/margo-sanity-warnings
42+
tests/unit-tests/margo-sanity-warnings \
43+
tests/unit-tests/margo-migrate-progress
4244

4345
tests_unit_tests_margo_addr_SOURCES = \
4446
tests/unit-tests/munit/munit.c \
@@ -126,6 +128,11 @@ tests_unit_tests_margo_sanity_warnings_SOURCES = \
126128
tests/unit-tests/munit/munit.c \
127129
tests/unit-tests/margo-sanity-warnings.c
128130

131+
tests_unit_tests_margo_migrate_progress_SOURCES = \
132+
tests/unit-tests/munit/munit.c \
133+
tests/unit-tests/margo-migrate-progress.c \
134+
tests/unit-tests/helper-server.c
135+
129136
noinst_HEADERS += tests/unit-tests/munit/munit.h \
130137
tests/unit-tests/helper-server.h
131138
endif
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* (C) 2020 The University of Chicago
3+
*
4+
* See COPYRIGHT in top-level directory.
5+
*/
6+
#include <stdio.h>
7+
#include <margo.h>
8+
#include <margo-hg-shim.h>
9+
#include <mercury_proc_string.h>
10+
#include <mercury_macros.h>
11+
#include "helper-server.h"
12+
#include "munit/munit.h"
13+
#include "munit/munit-goto.h"
14+
15+
#define P(__msg__) printf("%s\n", __msg__); fflush(stdout)
16+
17+
DECLARE_MARGO_RPC_HANDLER(rpc_ult)
18+
static void rpc_ult(hg_handle_t handle)
19+
{
20+
margo_respond(handle, NULL);
21+
margo_destroy(handle);
22+
return;
23+
}
24+
DEFINE_MARGO_RPC_HANDLER(rpc_ult)
25+
26+
static int svr_init_fn(margo_instance_id mid, void* arg)
27+
{
28+
(void)arg;
29+
MARGO_REGISTER(mid, "rpc", void, void, rpc_ult);
30+
return (0);
31+
}
32+
33+
struct test_context {
34+
margo_instance_id mid;
35+
int remote_pid;
36+
char remote_addr[256];
37+
};
38+
39+
static void* test_context_setup(const MunitParameter params[], void* user_data)
40+
{
41+
(void)params;
42+
(void)user_data;
43+
struct test_context* ctx = calloc(1, sizeof(*ctx));
44+
45+
const char* protocol = munit_parameters_get(params, "protocol");
46+
hg_size_t remote_addr_size = 256;
47+
48+
const char* config = "{"
49+
"\"rpc_pool\":\"my_rpc_pool\","
50+
"\"progress_pool\":\"my_progress_pool\","
51+
"\"argobots\": {"
52+
"\"pools\": ["
53+
"{ \"name\":\"my_rpc_pool\", \"kind\":\"fifo_wait\" },"
54+
"{ \"name\":\"my_progress_pool\", \"kind\":\"fifo_wait\" }"
55+
"],"
56+
"\"xstreams\": ["
57+
"{ \"name\":\"my_progress_xstream\","
58+
"\"scheduler\": {\"type\":\"basic_wait\", \"pools\":[\"my_progress_pool\"]}"
59+
"},"
60+
"{ \"name\":\"my_rpc_xstream\","
61+
"\"scheduler\": {\"type\":\"basic_wait\", \"pools\":[\"my_rpc_pool\"]}"
62+
"}"
63+
"],"
64+
"}"
65+
"}";
66+
67+
struct margo_init_info init_info = {0};
68+
init_info.json_config = config;
69+
ctx->remote_pid = HS_start(protocol, &init_info, svr_init_fn, NULL, NULL,
70+
&(ctx->remote_addr[0]), &remote_addr_size);
71+
munit_assert_int(ctx->remote_pid, >, 0);
72+
73+
ctx->mid = margo_init_ext(protocol, MARGO_SERVER_MODE, &init_info);
74+
if(!ctx->mid) {
75+
HS_stop(ctx->remote_pid, 0);
76+
}
77+
munit_assert_not_null(ctx->mid);
78+
79+
return ctx;
80+
}
81+
82+
static void test_context_tear_down(void* fixture)
83+
{
84+
struct test_context* ctx = (struct test_context*)fixture;
85+
86+
hg_addr_t remote_addr = HG_ADDR_NULL;
87+
margo_addr_lookup(ctx->mid, ctx->remote_addr, &remote_addr);
88+
margo_shutdown_remote_instance(ctx->mid, remote_addr);
89+
margo_addr_free(ctx->mid, remote_addr);
90+
HS_stop(ctx->remote_pid, 0);
91+
margo_finalize(ctx->mid);
92+
93+
free(ctx);
94+
}
95+
96+
static MunitResult test_migrate_progress_and_forward(const MunitParameter params[],
97+
void* data)
98+
{
99+
(void)params;
100+
(void)data;
101+
hg_return_t hret[5] = {0,0,0,0,0};
102+
int ret[5] = {0};
103+
hg_handle_t handle = HG_HANDLE_NULL;
104+
hg_addr_t addr = HG_ADDR_NULL;
105+
106+
struct test_context* ctx = (struct test_context*)data;
107+
108+
// "rpc" is registered on the server, everything should be fine
109+
hg_id_t rpc_id = MARGO_REGISTER(ctx->mid, "rpc", void, void, NULL);
110+
111+
hret[0] = margo_addr_lookup(ctx->mid, ctx->remote_addr, &addr);
112+
if(hret[0] != HG_SUCCESS) goto cleanup;
113+
hret[1] = margo_create(ctx->mid, addr, rpc_id, &handle);
114+
if(hret[1] != HG_SUCCESS) goto cleanup;
115+
hret[2] = margo_forward(handle, NULL);
116+
if(hret[2] != HG_SUCCESS) goto cleanup;
117+
hret[3] = margo_destroy(handle);
118+
if(hret[3] != HG_SUCCESS) goto cleanup;
119+
120+
// create new pool and ES
121+
struct margo_pool_info pool_info = {0};
122+
ret[0] = margo_add_pool_from_json(ctx->mid,
123+
"{ \"name\":\"my_new_progress_pool\", \"kind\":\"fifo_wait\" }",
124+
&pool_info);
125+
struct margo_xstream_info es_info = {0};
126+
ret[1] = margo_add_xstream_from_json(ctx->mid,
127+
"{ \"name\":\"my_new_progress_xstream\","
128+
"\"scheduler\": {\"type\":\"basic_wait\", \"pools\":[\"my_new_progress_pool\"]}"
129+
"}",
130+
&es_info);
131+
// migrate the progress loop
132+
ret[2] = margo_migrate_progress_loop(ctx->mid, pool_info.index);
133+
// erase old pool and ES
134+
ret[3] = margo_remove_xstream_by_name(ctx->mid, "my_progress_xstream");
135+
ret[4] = margo_remove_pool_by_name(ctx->mid, "my_progress_pool");
136+
137+
// send another RPC
138+
hret[1] = margo_create(ctx->mid, addr, rpc_id, &handle);
139+
if(hret[1] != HG_SUCCESS) goto cleanup;
140+
hret[2] = margo_forward(handle, NULL);
141+
if(hret[2] != HG_SUCCESS) goto cleanup;
142+
143+
cleanup:
144+
hret[3] = margo_destroy(handle);
145+
hret[4] = margo_addr_free(ctx->mid, addr);
146+
147+
munit_assert_int_goto(hret[0], ==, HG_SUCCESS, error);
148+
munit_assert_int_goto(hret[1], ==, HG_SUCCESS, error);
149+
munit_assert_int_goto(hret[2], ==, HG_SUCCESS, error);
150+
munit_assert_int_goto(hret[3], ==, HG_SUCCESS, error);
151+
munit_assert_int_goto(hret[4], ==, HG_SUCCESS, error);
152+
munit_assert_int_goto(ret[0], ==, 0, error);
153+
munit_assert_int_goto(ret[1], ==, 0, error);
154+
munit_assert_int_goto(ret[2], ==, 0, error);
155+
munit_assert_int_goto(ret[3], ==, 0, error);
156+
munit_assert_int_goto(ret[4], ==, 0, error);
157+
return MUNIT_OK;
158+
159+
error:
160+
return MUNIT_FAIL;
161+
}
162+
163+
static char* protocol_params[] = {"na+sm", NULL};
164+
165+
static MunitParameterEnum test_params[]
166+
= {{"protocol", protocol_params},
167+
{NULL, NULL}};
168+
169+
static MunitParameterEnum test_params2[]
170+
= {{"protocol", protocol_params},
171+
{NULL, NULL}};
172+
173+
static MunitTest test_suite_tests[] = {
174+
{(char*)"/forward", test_migrate_progress_and_forward, test_context_setup,
175+
test_context_tear_down, MUNIT_TEST_OPTION_NONE, test_params},
176+
{NULL, NULL, NULL, NULL, MUNIT_TEST_OPTION_NONE, NULL}};
177+
178+
static const MunitSuite test_suite
179+
= {(char*)"/margo", test_suite_tests, NULL, 1, MUNIT_SUITE_OPTION_NONE};
180+
181+
int main(int argc, char* argv[MUNIT_ARRAY_PARAM(argc + 1)])
182+
{
183+
return munit_suite_main(&test_suite, NULL, argc, argv);
184+
}

0 commit comments

Comments
 (0)