Skip to content

Commit 07133e5

Browse files
authored
Merge pull request #853 from openziti/ziti-accept-v2
Ziti accept v2
2 parents 0faa75f + 60276f7 commit 07133e5

File tree

5 files changed

+58
-26
lines changed

5 files changed

+58
-26
lines changed

inc_internal/zt_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ struct ziti_conn {
162162
char *service;
163163
char *source_identity;
164164
uint32_t conn_id;
165+
uint32_t rt_conn_id;
165166
void *data;
166167

167168
int (*disposer)(struct ziti_conn *self);

library/bind.c

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,10 @@ static void process_dial(struct binding_s *b, message *msg) {
403403
size_t peer_key_len, marker_len;
404404
const uint8_t *peer_key;
405405
const uint8_t *marker;
406+
uint32_t rt_conn_id;
406407
bool peer_key_sent = message_get_bytes_header(msg, PublicKeyHeader, &peer_key, &peer_key_len);
407408
bool marker_sent = message_get_bytes_header(msg, ConnectionMarkerHeader, &marker, &marker_len);
409+
bool rt_conn_id_sent = message_get_int32_header(msg, RouterProvidedConnId, (int32_t*)&rt_conn_id);
408410

409411
if (!peer_key_sent && conn->encrypted) {
410412
ZITI_LOG(ERROR, "failed to establish crypto for encrypted service: did not receive peer key");
@@ -414,6 +416,10 @@ static void process_dial(struct binding_s *b, message *msg) {
414416

415417
ziti_connection client;
416418
ziti_conn_init(conn->ziti_ctx, &client, NULL);
419+
if (rt_conn_id_sent) {
420+
ZITI_LOG(DEBUG, "conn[%u] using router provided conn_id[%u]", client->conn_id, rt_conn_id);
421+
client->rt_conn_id = rt_conn_id;
422+
}
417423
init_transport_conn(client);
418424
if (marker_sent) {
419425
snprintf(client->marker, sizeof(client->marker), "%.*s", (int) marker_len, marker);
@@ -495,12 +501,12 @@ static void bind_reply_cb(void *ctx, message *msg, int code) {
495501
if (code == ZITI_OK && msg->header.content == ContentTypeStateConnected) {
496502
CONN_LOG(TRACE, "received msg ct[%s] code[%d]", content_type_id(msg->header.content), code);
497503
CONN_LOG(DEBUG, "bound successfully on router[%s]", b->ch->name);
498-
ziti_channel_add_receiver(b->ch, conn->conn_id, b,
504+
ziti_channel_add_receiver(b->ch, b->conn_id, b,
499505
(void (*)(void *, message *, int)) on_message);
500506
b->state = st_bound;
501507
} else {
502508
CONN_LOG(DEBUG, "failed to bind on router[%s]", b->ch->name);
503-
ziti_channel_rem_receiver(b->ch, conn->conn_id);
509+
ziti_channel_rem_receiver(b->ch, b->conn_id);
504510
b->ch = NULL;
505511
b->state = st_unbound;
506512
}
@@ -532,14 +538,15 @@ int start_binding(struct binding_s *b, ziti_channel_t *ch) {
532538
int32_t msg_seq = htole32(0);
533539
uint16_t cost = htole16(conn->server.cost);
534540

535-
hdr_t headers[8] = {
541+
hdr_t headers[9] = {
536542
var_header(ConnIdHeader, conn_id),
537543
var_header(SeqHeader, msg_seq),
538544
header(ListenerId, sizeof(b->conn->server.listener_id), b->conn->server.listener_id),
539545
var_header(SupportsInspectHeader, true_val),
546+
var_header(RouterProvidedConnId, true_val),
540547
// blank hdr_t's to be filled in if needed by options
541548
};
542-
int nheaders = 4;
549+
int nheaders = 5;
543550
if (conn->encrypted) {
544551
headers[nheaders++] = header(PublicKeyHeader, sizeof(b->key_pair.pk), b->key_pair.pk);
545552
}

library/channel.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ void ziti_channel_add_receiver(ziti_channel_t *ch, uint32_t id, void *receiver,
235235
r->receive = receive_f;
236236

237237
model_map_setl(&ch->receivers, r->id, r);
238-
CH_LOG(DEBUG, "added receiver[%d]", id);
238+
CH_LOG(DEBUG, "added receiver[%u]", id);
239239
}
240240

241241
void ziti_channel_rem_receiver(ziti_channel_t *ch, uint32_t id) {
@@ -244,7 +244,7 @@ void ziti_channel_rem_receiver(ziti_channel_t *ch, uint32_t id) {
244244
struct msg_receiver *r = model_map_removel(&ch->receivers, id);
245245

246246
if (r) {
247-
CH_LOG(DEBUG, "removed receiver[%d]", id);
247+
CH_LOG(DEBUG, "removed receiver[%u]", id);
248248
free(r);
249249
}
250250
}

library/connect.c

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ static int close_conn_internal(struct ziti_conn *conn) {
187187
}
188188

189189
if (conn->channel) {
190-
ziti_channel_rem_receiver(conn->channel, conn->conn_id);
190+
ziti_channel_rem_receiver(conn->channel, conn->rt_conn_id);
191191
}
192192

193193
if (conn->conn_req) {
@@ -271,7 +271,7 @@ message *create_message(struct ziti_conn *conn, uint32_t content, uint32_t flags
271271
flags |= EDGE_MULTIPART;
272272
}
273273

274-
int32_t conn_id = htole32(conn->conn_id);
274+
int32_t conn_id = htole32(conn->rt_conn_id);
275275
int32_t msg_seq = htole32(conn->edge_msg_seq++);
276276
uint32_t msg_flags = htole32(flags);
277277
struct msg_uuid uuid = {
@@ -696,7 +696,7 @@ static void on_disconnect(ziti_connection conn, ssize_t status, void *ctx) {
696696
conn_set_state(conn, conn->close ? Closed : Disconnected);
697697
ziti_channel_t *ch = conn->channel;
698698
if (ch) {
699-
ziti_channel_rem_receiver(ch, (int)conn->conn_id);
699+
ziti_channel_rem_receiver(ch, conn->rt_conn_id);
700700
conn->channel = NULL;
701701
}
702702
}
@@ -1065,7 +1065,7 @@ void connect_reply_cb(void *ctx, message *msg, int err) {
10651065
if (strncmp(INVALID_SESSION, (const char *) msg->body, msg->header.body_len) == 0) {
10661066
CONN_LOG(WARN, "session for service[%s] became invalid", conn->service);
10671067
ziti_invalidate_session(conn->ziti_ctx, conn->conn_req->service_id, ziti_session_types.Dial);
1068-
ziti_channel_rem_receiver(conn->channel, conn->conn_id);
1068+
ziti_channel_rem_receiver(conn->channel, conn->rt_conn_id);
10691069
conn->channel = NULL;
10701070
restart_connect(conn);
10711071
} else {
@@ -1126,10 +1126,10 @@ static int ziti_channel_start_connection(struct ziti_conn *conn, ziti_channel_t
11261126

11271127
CONN_LOG(TRACE, "ch[%d] => Edge Connect request token[%s]", ch->id, session->token);
11281128
conn->channel = ch;
1129-
ziti_channel_add_receiver(ch, conn->conn_id, conn,
1129+
ziti_channel_add_receiver(ch, conn->rt_conn_id, conn,
11301130
(void (*)(void *, message *, int)) queue_edge_message);
11311131

1132-
int32_t conn_id = htole32(conn->conn_id);
1132+
int32_t conn_id = htole32(conn->rt_conn_id);
11331133
int32_t msg_seq = htole32(0);
11341134

11351135
const ziti_identity *identity = ziti_get_identity(conn->ziti_ctx);
@@ -1203,7 +1203,26 @@ static int ziti_channel_start_connection(struct ziti_conn *conn, ziti_channel_t
12031203
return ZITI_OK;
12041204
}
12051205

1206+
static void accept_cb(ziti_connection conn, ssize_t i, void *data) {
1207+
ziti_conn_cb cb = data;
1208+
if (i < 0) {
1209+
CONN_LOG(ERROR, "accept failed: %zd[%s]", i, ziti_errorstr(i));
1210+
conn_set_state(conn, Disconnected);
1211+
if (cb) {
1212+
cb(conn, (int)i);
1213+
}
1214+
return;
1215+
}
1216+
conn_set_state(conn, Connected);
1217+
if (conn->encrypted) {
1218+
send_crypto_header(conn);
1219+
}
12061220

1221+
if (cb) {
1222+
CONN_LOG(TRACE, "accept succeeded");
1223+
cb(conn, ZITI_OK);
1224+
}
1225+
}
12071226

12081227
int ziti_accept(ziti_connection conn, ziti_conn_cb cb, ziti_data_cb data_cb) {
12091228

@@ -1215,6 +1234,7 @@ int ziti_accept(ziti_connection conn, ziti_conn_cb cb, ziti_data_cb data_cb) {
12151234
return ZITI_INVALID_STATE;
12161235
}
12171236

1237+
CONN_LOG(DEBUG, "accepting");
12181238
ziti_channel_t *ch = conn->channel;
12191239
conn->data_cb = data_cb;
12201240

@@ -1224,7 +1244,7 @@ int ziti_accept(ziti_connection conn, ziti_conn_cb cb, ziti_data_cb data_cb) {
12241244
conn->flusher->data = conn;
12251245
uv_unref((uv_handle_t *) &conn->flusher);
12261246

1227-
ziti_channel_add_receiver(ch, conn->conn_id, conn, (void (*)(void *, message *, int)) queue_edge_message);
1247+
ziti_channel_add_receiver(ch, conn->rt_conn_id, conn, (void (*)(void *, message *, int)) queue_edge_message);
12281248

12291249
CONN_LOG(TRACE, "ch[%d] => Edge Accept parent_conn_id[%d]", ch->id, conn->parent->conn_id);
12301250

@@ -1233,7 +1253,7 @@ int ziti_accept(ziti_connection conn, ziti_conn_cb cb, ziti_data_cb data_cb) {
12331253
int32_t conn_id = htole32(conn->parent->conn_id);
12341254
int32_t msg_seq = htole32(0);
12351255
int32_t reply_id = htole32(conn->dial_req_seq);
1236-
int32_t clt_conn_id = htole32(conn->conn_id);
1256+
int32_t clt_conn_id = htole32(conn->rt_conn_id);
12371257
hdr_t headers[] = {
12381258
{
12391259
.header_id = ConnIdHeader,
@@ -1251,19 +1271,22 @@ int ziti_accept(ziti_connection conn, ziti_conn_cb cb, ziti_data_cb data_cb) {
12511271
.value = (uint8_t *) &reply_id
12521272
},
12531273
};
1254-
NEWP(req, struct ziti_conn_req);
1255-
req->cb = cb;
1256-
conn->conn_req = req;
12571274

1258-
// add accept deadline in case ER fails to send us Accept result
1259-
ztx_set_deadline(conn->ziti_ctx, ZITI_DEFAULT_TIMEOUT, &req->deadline, connect_timeout, conn);
1275+
struct ziti_write_req_s *ar = calloc(1, sizeof(*ar));
1276+
ar->conn = conn;
1277+
ar->cb = accept_cb;
1278+
ar->ctx = cb;
12601279

1261-
req->waiter = ziti_channel_send_for_reply(
1262-
ch, content_type, headers, 3,
1263-
(const uint8_t *) &clt_conn_id, sizeof(clt_conn_id),
1264-
connect_reply_cb, conn);
1280+
int rc = ziti_channel_send(ch, content_type, headers, 3,
1281+
(const uint8_t *) &clt_conn_id, sizeof(clt_conn_id),
1282+
ar);
1283+
if (rc == ZITI_OK) {
1284+
TAILQ_INSERT_TAIL(&conn->pending_wreqs, ar, _next);
1285+
} else {
1286+
free(ar);
1287+
}
12651288

1266-
return ZITI_OK;
1289+
return rc;
12671290
}
12681291

12691292
int ziti_write(ziti_connection conn, uint8_t *data, size_t length, ziti_write_cb write_cb, void *write_ctx) {
@@ -1409,7 +1432,7 @@ static void process_edge_message(struct ziti_conn *conn, message *msg) {
14091432
size_t uuid_len;
14101433
bool has_seq = message_get_int32_header(msg, SeqHeader, &seq);
14111434
bool has_conn_id = message_get_int32_header(msg, ConnIdHeader, &conn_id);
1412-
assert(has_conn_id && conn_id == conn->conn_id);
1435+
assert(has_conn_id && conn_id == conn->rt_conn_id);
14131436

14141437
message_get_int32_header(msg, FlagsHeader, (int32_t*)&flags);
14151438
uint32_t caps = flags & CONN_CAP_MASK;
@@ -1451,7 +1474,7 @@ static void process_edge_message(struct ziti_conn *conn, message *msg) {
14511474
retry_connect = true;
14521475
}
14531476
if (retry_connect) {
1454-
ziti_channel_rem_receiver(conn->channel, conn->conn_id);
1477+
ziti_channel_rem_receiver(conn->channel, conn->rt_conn_id);
14551478
conn->channel = NULL;
14561479
conn_set_state(conn, Connecting);
14571480
restart_connect(conn);

library/ziti.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,7 @@ int ziti_conn_init(ziti_context ztx, ziti_connection *conn, void *data) {
10201020
c->ziti_ctx = ztx;
10211021
c->data = data;
10221022
c->conn_id = ztx->conn_seq++;
1023+
c->rt_conn_id = c->conn_id;
10231024

10241025
*conn = c;
10251026
model_map_setl(&ctx->connections, (long) c->conn_id, c);

0 commit comments

Comments
 (0)