Skip to content

Commit 92f31dd

Browse files
committed
Provide callback function for client_connected2
The client_connected2 upcall provides a callback function, but the current code passed a NULL for it, thereby disabling that feature. Implement and provide a callback function for it. Signed-off-by: Ralph Castain <[email protected]> (cherry picked from commit 611567a)
1 parent 1d76dfd commit 92f31dd

File tree

3 files changed

+145
-78
lines changed

3 files changed

+145
-78
lines changed

examples/client2.c

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
1616
* Copyright (c) 2013-2020 Intel, Inc. All rights reserved.
1717
* Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved.
18-
* Copyright (c) 2021 Nanook Consulting. All rights reserved.
18+
* Copyright (c) 2021-2025 Nanook Consulting All rights reserved.
1919
* $COPYRIGHT$
2020
*
2121
* Additional copyrights may follow
@@ -76,9 +76,10 @@ int main(int argc, char **argv)
7676
{
7777
pmix_status_t rc;
7878
pmix_value_t value;
79-
pmix_value_t *val, *vptr;
79+
pmix_value_t *val;
8080
pmix_proc_t proc;
8181
uint32_t nprocs, n, k;
82+
uint64_t *u64;
8283
pmix_info_t *info;
8384
bool flag;
8485
mylock_t mylock;
@@ -130,14 +131,13 @@ int main(int argc, char **argv)
130131
fprintf(stderr, "Client %s:%d job size %d\n", myproc.nspace, myproc.rank, nprocs);
131132

132133
/* put a data array of pmix_value's */
133-
val = (pmix_value_t *) malloc(32 * sizeof(pmix_value_t));
134+
u64 = (uint64_t *) malloc(32 * sizeof(uint64_t));
134135
for (n = 0; n < 32; n++) {
135-
val[n].type = PMIX_UINT64;
136-
val[n].data.uint64 = 2 * n;
136+
u64[n] = 2 * n;
137137
}
138-
da.type = PMIX_VALUE;
138+
da.type = PMIX_UINT64;
139139
da.size = 32;
140-
da.array = val;
140+
da.array = u64;
141141
value.type = PMIX_DATA_ARRAY;
142142
value.data.darray = &da;
143143
rc = PMIx_Put(PMIX_GLOBAL, "test-key", &value);
@@ -189,7 +189,7 @@ int main(int argc, char **argv)
189189
PMIX_VALUE_RELEASE(val);
190190
goto done;
191191
}
192-
if (PMIX_VALUE != dptr->type) {
192+
if (PMIX_UINT64 != dptr->type) {
193193
fprintf(stderr,
194194
"Client ns %s rank %d: PMIx_Get %d returned wrong array value type %d\n",
195195
myproc.nspace, myproc.rank, proc.rank, dptr->type);
@@ -203,17 +203,12 @@ int main(int argc, char **argv)
203203
PMIX_VALUE_RELEASE(val);
204204
goto done;
205205
}
206-
vptr = (pmix_value_t *) dptr->array;
206+
u64 = (uint64_t *) dptr->array;
207207
for (k = 0; k < 32; k++) {
208-
if (PMIX_UINT64 != vptr[k].type) {
209-
fprintf(stderr, "Client ns %s rank %d: PMIx_Get %d returned wrong type: %d\n",
210-
myproc.nspace, myproc.rank, proc.rank, vptr[k].type);
211-
PMIX_VALUE_RELEASE(val);
212-
goto done;
213-
}
214-
if (2 * k != vptr[k].data.uint64) {
215-
fprintf(stderr, "Client ns %s rank %d: PMIx_Get %d returned wrong value: %lu\n",
216-
myproc.nspace, myproc.rank, proc.rank, (unsigned long) vptr[k].data.uint64);
208+
if ((2 * k) != u64[k]) {
209+
fprintf(stderr,
210+
"Client ns %s rank %d: PMIx_Get %d returned wrong value: %lu\n",
211+
myproc.nspace, myproc.rank, proc.rank, (unsigned long) u64[k]);
217212
PMIX_VALUE_RELEASE(val);
218213
goto done;
219214
}

src/mca/psec/psec.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* Copyright (c) 2015 Research Organization for Information Science
77
* and Technology (RIST). All rights reserved.
88
* Copyright (c) 2019 Mellanox Technologies, Inc. All rights reserved.
9-
* Copyright (c) 2021-2022 Nanook Consulting. All rights reserved.
9+
* Copyright (c) 2021-2025 Nanook Consulting All rights reserved.
1010
* $COPYRIGHT$
1111
*
1212
* Additional copyrights may follow
@@ -164,7 +164,7 @@ PMIX_EXPORT pmix_psec_module_t *pmix_psec_base_assign_module(const char *options
164164
} \
165165
} while (0)
166166

167-
#define PMIX_PSEC_SERVER_HANDSHAKE_IFNEED(r, p, d, nd, in, nin, c) \
167+
#define PMIX_PSEC_SERVER_HANDSHAKE_IFNEED(r, p) \
168168
if (PMIX_ERR_READY_FOR_HANDSHAKE == r) { \
169169
int _r; \
170170
/* execute the handshake if the security mode calls for it */ \

src/mca/ptl/base/ptl_base_connection_hdlr.c

Lines changed: 130 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,113 @@ static void cnct_cbfunc(pmix_status_t status, pmix_proc_t *proc, void *cbdata);
5151
static void _check_cached_events(pmix_peer_t *peer);
5252
static pmix_status_t process_tool_request(pmix_pending_connection_t *pnd, char *mg, size_t cnt);
5353

54+
// Local objects
55+
typedef struct {
56+
pmix_object_t super;
57+
pmix_event_t ev;
58+
pmix_status_t status;
59+
pmix_status_t reply;
60+
pmix_pending_connection_t *pnd;
61+
char *blob;
62+
pmix_peer_t *peer;
63+
} cnct_hdlr_t;
64+
static void chcon(cnct_hdlr_t *p)
65+
{
66+
memset(&p->ev, 0, sizeof(pmix_event_t));
67+
p->pnd = NULL;
68+
p->blob = NULL;
69+
p->peer = NULL;
70+
}
71+
static void chdes(cnct_hdlr_t *p)
72+
{
73+
if (NULL != p->pnd) {
74+
PMIX_RELEASE(p->pnd);
75+
}
76+
if (NULL != p->blob) {
77+
free(p->blob);
78+
}
79+
}
80+
static PMIX_CLASS_INSTANCE(cnct_hdlr_t,
81+
pmix_object_t,
82+
chcon, chdes);
83+
84+
static void _cnct_complete(int sd, short args, void *cbdata)
85+
{
86+
cnct_hdlr_t *ch = (cnct_hdlr_t *) cbdata;
87+
uint32_t u32;
88+
pmix_status_t rc;
89+
PMIX_HIDE_UNUSED_PARAMS(sd, args);
90+
91+
/* tell the client all is good */
92+
u32 = htonl(ch->reply);
93+
rc = pmix_ptl_base_send_blocking(ch->pnd->sd, (char *) &u32, sizeof(uint32_t));
94+
if (PMIX_SUCCESS != rc) {
95+
PMIX_ERROR_LOG(rc);
96+
goto error;
97+
}
98+
/* If needed, perform the handshake. The macro will update reply */
99+
PMIX_PSEC_SERVER_HANDSHAKE_IFNEED(ch->reply, ch->peer);
100+
101+
/* It is possible that connection validation failed */
102+
if (PMIX_SUCCESS != ch->reply) {
103+
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
104+
"validation of client connection failed");
105+
goto error;
106+
}
107+
108+
/* send the client's array index */
109+
u32 = htonl(ch->peer->index);
110+
rc = pmix_ptl_base_send_blocking(ch->pnd->sd, (char *) &u32, sizeof(uint32_t));
111+
if (PMIX_SUCCESS != rc) {
112+
PMIX_ERROR_LOG(rc);
113+
goto error;
114+
}
115+
116+
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
117+
"connect-ack from client completed");
118+
119+
pmix_ptl_base_set_nonblocking(ch->pnd->sd);
120+
121+
/* start the events for this client */
122+
pmix_event_assign(&ch->peer->recv_event, pmix_globals.evbase, ch->pnd->sd, EV_READ | EV_PERSIST,
123+
pmix_ptl_base_recv_handler, ch->peer);
124+
pmix_event_add(&ch->peer->recv_event, NULL);
125+
ch->peer->recv_ev_active = true;
126+
pmix_event_assign(&ch->peer->send_event, pmix_globals.evbase, ch->pnd->sd, EV_WRITE | EV_PERSIST,
127+
pmix_ptl_base_send_handler, ch->peer);
128+
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
129+
"pmix:server client %s:%u has connected on socket %d",
130+
ch->peer->info->pname.nspace, ch->peer->info->pname.rank, ch->peer->sd);
131+
132+
/* check the cached events and update the client */
133+
_check_cached_events(ch->peer);
134+
PMIX_RELEASE(ch);
135+
return;
136+
137+
error:
138+
if (NULL != ch->peer) {
139+
pmix_pointer_array_set_item(&pmix_server_globals.clients, ch->peer->index, NULL);
140+
PMIX_RELEASE(ch->peer);
141+
}
142+
CLOSE_THE_SOCKET(ch->pnd->sd);
143+
PMIX_RELEASE(ch);
144+
}
145+
146+
static void _connect_complete(pmix_status_t status, void *cbdata)
147+
{
148+
cnct_hdlr_t *ch = (cnct_hdlr_t *) cbdata;
149+
/* need to thread-shift this response */
150+
ch->status = status;
151+
PMIX_THREADSHIFT(ch, _cnct_complete);
152+
}
153+
54154
void pmix_ptl_base_connection_handler(int sd, short args, void *cbdata)
55155
{
56156
pmix_pending_connection_t *pnd = (pmix_pending_connection_t *) cbdata;
57157
pmix_ptl_hdr_t hdr;
58158
pmix_peer_t *peer = NULL;
59159
pmix_status_t rc, reply;
60160
char *msg = NULL, *mg, *p, *blob = NULL;
61-
uint32_t u32;
62161
size_t cnt;
63162
size_t len = 0;
64163
pmix_namespace_t *nptr, *tmp;
@@ -67,6 +166,7 @@ void pmix_ptl_base_connection_handler(int sd, short args, void *cbdata)
67166
pmix_info_t ginfo;
68167
pmix_byte_object_t cred;
69168
uint8_t major, minor, release;
169+
cnct_hdlr_t *ch;
70170

71171
/* acquire the object */
72172
PMIX_ACQUIRE_OBJECT(pnd);
@@ -370,72 +470,47 @@ void pmix_ptl_base_connection_handler(int sd, short args, void *cbdata)
370470
goto error;
371471
}
372472

373-
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, "client connection validated");
473+
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
474+
"client connection validated");
374475

375-
/* tell the client all is good */
376-
u32 = htonl(reply);
377-
if (PMIX_SUCCESS
378-
!= (rc = pmix_ptl_base_send_blocking(pnd->sd, (char *) &u32, sizeof(uint32_t)))) {
379-
PMIX_ERROR_LOG(rc);
380-
goto error;
381-
}
382-
/* If needed, perform the handshake. The macro will update reply */
383-
PMIX_PSEC_SERVER_HANDSHAKE_IFNEED(reply, peer, NULL, 0, NULL, NULL, &cred);
384476

385-
/* It is possible that connection validation failed */
386-
if (PMIX_SUCCESS != reply) {
387-
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
388-
"validation of client connection failed");
389-
goto error;
390-
}
391-
392-
/* send the client's array index */
393-
u32 = htonl(peer->index);
394-
if (PMIX_SUCCESS
395-
!= (rc = pmix_ptl_base_send_blocking(pnd->sd, (char *) &u32, sizeof(uint32_t)))) {
396-
PMIX_ERROR_LOG(rc);
397-
goto error;
398-
}
399-
400-
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
401-
"connect-ack from client completed");
477+
// prep for processing
478+
ch = PMIX_NEW(cnct_hdlr_t);
479+
ch->peer = peer;
480+
ch->pnd = pnd;
481+
ch->reply = reply;
482+
ch->blob = blob;
402483

403484
/* let the host server know that this client has connected */
404485
if (NULL != pmix_host_server.client_connected2) {
405486
PMIX_LOAD_PROCID(&proc, peer->info->pname.nspace, peer->info->pname.rank);
406-
rc = pmix_host_server.client_connected2(&proc, peer->info->server_object, NULL, 0, NULL,
407-
NULL);
408-
if (PMIX_SUCCESS != rc && PMIX_OPERATION_SUCCEEDED != rc) {
487+
rc = pmix_host_server.client_connected2(&proc, peer->info->server_object, NULL, 0,
488+
_connect_complete, ch);
489+
if (PMIX_OPERATION_SUCCEEDED == rc) {
490+
ch->status = PMIX_SUCCESS;
491+
_cnct_complete(0, 0, ch);
492+
return;
493+
}
494+
if (PMIX_SUCCESS != rc) {
409495
PMIX_ERROR_LOG(rc);
496+
goto error;
410497
}
411498
} else if (NULL != pmix_host_server.client_connected) {
412499
PMIX_LOAD_PROCID(&proc, peer->info->pname.nspace, peer->info->pname.rank);
413-
rc = pmix_host_server.client_connected(&proc, peer->info->server_object, NULL, NULL);
414-
if (PMIX_SUCCESS != rc && PMIX_OPERATION_SUCCEEDED != rc) {
500+
rc = pmix_host_server.client_connected(&proc, peer->info->server_object, _connect_complete, ch);
501+
if (PMIX_OPERATION_SUCCEEDED == rc) {
502+
ch->status = PMIX_SUCCESS;
503+
_cnct_complete(0, 0, ch);
504+
return;
505+
}
506+
if (PMIX_SUCCESS != rc) {
415507
PMIX_ERROR_LOG(rc);
416508
goto error;
417509
}
418-
}
419-
420-
pmix_ptl_base_set_nonblocking(pnd->sd);
421-
422-
/* start the events for this client */
423-
pmix_event_assign(&peer->recv_event, pmix_globals.evbase, pnd->sd, EV_READ | EV_PERSIST,
424-
pmix_ptl_base_recv_handler, peer);
425-
pmix_event_add(&peer->recv_event, NULL);
426-
peer->recv_ev_active = true;
427-
pmix_event_assign(&peer->send_event, pmix_globals.evbase, pnd->sd, EV_WRITE | EV_PERSIST,
428-
pmix_ptl_base_send_handler, peer);
429-
pmix_output_verbose(2, pmix_ptl_base_framework.framework_output,
430-
"pmix:server client %s:%u has connected on socket %d",
431-
peer->info->pname.nspace, peer->info->pname.rank, peer->sd);
432-
PMIX_RELEASE(pnd);
433-
434-
/* check the cached events and update the client */
435-
_check_cached_events(peer);
436-
if (NULL != blob) {
437-
free(blob);
438-
blob = NULL;
510+
} else {
511+
// if neither of those conditions are met, then we simply assume the host is ready
512+
ch->status = PMIX_SUCCESS;
513+
_cnct_complete(0, 0, ch);
439514
}
440515

441516
return;
@@ -448,9 +523,6 @@ void pmix_ptl_base_connection_handler(int sd, short args, void *cbdata)
448523
if (NULL != msg) {
449524
free(msg);
450525
}
451-
if (NULL != blob) {
452-
free(blob);
453-
}
454526
if (NULL != peer) {
455527
pmix_pointer_array_set_item(&pmix_server_globals.clients, peer->index, NULL);
456528
PMIX_RELEASE(peer);
@@ -610,7 +682,7 @@ static void process_cbfunc(int sd, short args, void *cbdata)
610682
}
611683

612684
/* If needed perform the handshake. The macro will update reply */
613-
PMIX_PSEC_SERVER_HANDSHAKE_IFNEED(reply, peer, NULL, 0, NULL, NULL, &cred);
685+
PMIX_PSEC_SERVER_HANDSHAKE_IFNEED(reply, peer);
614686

615687
/* If verification wasn't successful - stop here */
616688
if (PMIX_SUCCESS != reply) {

0 commit comments

Comments
 (0)