Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions inc_internal/zt_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ struct ziti_conn {
char *service;
char *source_identity;
uint32_t conn_id;
uint32_t rt_conn_id;
void *data;

int (*disposer)(struct ziti_conn *self);
Expand Down
15 changes: 11 additions & 4 deletions library/bind.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,10 @@ static void process_dial(struct binding_s *b, message *msg) {
size_t peer_key_len, marker_len;
const uint8_t *peer_key;
const uint8_t *marker;
uint32_t rt_conn_id;
bool peer_key_sent = message_get_bytes_header(msg, PublicKeyHeader, &peer_key, &peer_key_len);
bool marker_sent = message_get_bytes_header(msg, ConnectionMarkerHeader, &marker, &marker_len);
bool rt_conn_id_sent = message_get_int32_header(msg, RouterProvidedConnId, (int32_t*)&rt_conn_id);

if (!peer_key_sent && conn->encrypted) {
ZITI_LOG(ERROR, "failed to establish crypto for encrypted service: did not receive peer key");
Expand All @@ -414,6 +416,10 @@ static void process_dial(struct binding_s *b, message *msg) {

ziti_connection client;
ziti_conn_init(conn->ziti_ctx, &client, NULL);
if (rt_conn_id_sent) {
ZITI_LOG(DEBUG, "conn[%u] using router provided conn_id[%u]", client->conn_id, rt_conn_id);
client->rt_conn_id = rt_conn_id;
}
init_transport_conn(client);
if (marker_sent) {
snprintf(client->marker, sizeof(client->marker), "%.*s", (int) marker_len, marker);
Expand Down Expand Up @@ -495,12 +501,12 @@ static void bind_reply_cb(void *ctx, message *msg, int code) {
if (code == ZITI_OK && msg->header.content == ContentTypeStateConnected) {
CONN_LOG(TRACE, "received msg ct[%s] code[%d]", content_type_id(msg->header.content), code);
CONN_LOG(DEBUG, "bound successfully on router[%s]", b->ch->name);
ziti_channel_add_receiver(b->ch, conn->conn_id, b,
ziti_channel_add_receiver(b->ch, b->conn_id, b,
(void (*)(void *, message *, int)) on_message);
b->state = st_bound;
} else {
CONN_LOG(DEBUG, "failed to bind on router[%s]", b->ch->name);
ziti_channel_rem_receiver(b->ch, conn->conn_id);
ziti_channel_rem_receiver(b->ch, b->conn_id);
b->ch = NULL;
b->state = st_unbound;
}
Expand Down Expand Up @@ -532,14 +538,15 @@ int start_binding(struct binding_s *b, ziti_channel_t *ch) {
int32_t msg_seq = htole32(0);
uint16_t cost = htole16(conn->server.cost);

hdr_t headers[8] = {
hdr_t headers[9] = {
var_header(ConnIdHeader, conn_id),
var_header(SeqHeader, msg_seq),
header(ListenerId, sizeof(b->conn->server.listener_id), b->conn->server.listener_id),
var_header(SupportsInspectHeader, true_val),
var_header(RouterProvidedConnId, true_val),
// blank hdr_t's to be filled in if needed by options
};
int nheaders = 4;
int nheaders = 5;
if (conn->encrypted) {
headers[nheaders++] = header(PublicKeyHeader, sizeof(b->key_pair.pk), b->key_pair.pk);
}
Expand Down
4 changes: 2 additions & 2 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#define POOLED_MESSAGE_SIZE (32 * 1024)
#define INBOUND_POOL_SIZE (32)

#define CH_LOG(lvl, fmt, ...) ZITI_LOG(lvl, "ch[%d] " fmt, ch->id, ##__VA_ARGS__)

Check warning on line 42 in library/channel.c

View workflow job for this annotation

GitHub Actions / Linux ARM

format '%ld' expects argument of type 'long int', but argument 10 has type 'size_t' ***aka 'unsigned int'*** [-Wformat=]

enum ChannelState {
Initial,
Expand Down Expand Up @@ -235,7 +235,7 @@
r->receive = receive_f;

model_map_setl(&ch->receivers, r->id, r);
CH_LOG(DEBUG, "added receiver[%d]", id);
CH_LOG(DEBUG, "added receiver[%u]", id);
}

void ziti_channel_rem_receiver(ziti_channel_t *ch, uint32_t id) {
Expand All @@ -244,7 +244,7 @@
struct msg_receiver *r = model_map_removel(&ch->receivers, id);

if (r) {
CH_LOG(DEBUG, "removed receiver[%d]", id);
CH_LOG(DEBUG, "removed receiver[%u]", id);
free(r);
}
}
Expand Down Expand Up @@ -743,7 +743,7 @@
},
};
ch->latency = uv_now(ch->loop);
ziti_channel_send_for_reply(ch, ContentTypeHelloType, headers, 2, ch->token, strlen(ch->token), hello_reply_cb, ch);

Check warning on line 746 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS x86_64

passing 'char[37]' to parameter of type 'const uint8_t *' (aka 'const unsigned char *') converts between pointers to integer types where one is of the unique plain 'char' type and the other is not [-Wpointer-sign]

Check warning on line 746 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS arm64

passing 'char[37]' to parameter of type 'const uint8_t *' (aka 'const unsigned char *') converts between pointers to integer types where one is of the unique plain 'char' type and the other is not [-Wpointer-sign]
}


Expand Down Expand Up @@ -919,7 +919,7 @@

CH_LOG(TRACE, "on_data [len=%zd]", len);
ch->last_read = uv_now(ch->loop);
buffer_append(ch->incoming, buf->base, (uint32_t) len);

Check warning on line 922 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS x86_64

passing 'char *const' to parameter of type 'uint8_t *' (aka 'unsigned char *') converts between pointers to integer types where one is of the unique plain 'char' type and the other is not [-Wpointer-sign]

Check warning on line 922 in library/channel.c

View workflow job for this annotation

GitHub Actions / MacOS arm64

passing 'char *const' to parameter of type 'uint8_t *' (aka 'unsigned char *') converts between pointers to integer types where one is of the unique plain 'char' type and the other is not [-Wpointer-sign]
process_inbound(ch);
}

Expand Down
63 changes: 43 additions & 20 deletions library/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@
}

if (conn->channel) {
ziti_channel_rem_receiver(conn->channel, conn->conn_id);
ziti_channel_rem_receiver(conn->channel, conn->rt_conn_id);
}

if (conn->conn_req) {
Expand Down Expand Up @@ -252,7 +252,7 @@
if (r->cb != NULL) {
r->cb(conn, status ? status : (ssize_t) r->len, r->ctx);
}
r = model_list_it_element(it);

Check warning on line 255 in library/connect.c

View workflow job for this annotation

GitHub Actions / Linux ARM64

assignment discards 'const' qualifier from pointer target type [-Wdiscarded-qualifiers]

Check warning on line 255 in library/connect.c

View workflow job for this annotation

GitHub Actions / Linux x86_64

assignment discards 'const' qualifier from pointer target type [-Wdiscarded-qualifiers]
it = model_list_it_next(it);
} while(r);
model_list_clear(&req->chain, free);
Expand All @@ -271,7 +271,7 @@
flags |= EDGE_MULTIPART;
}

int32_t conn_id = htole32(conn->conn_id);
int32_t conn_id = htole32(conn->rt_conn_id);
int32_t msg_seq = htole32(conn->edge_msg_seq++);
uint32_t msg_flags = htole32(flags);
struct msg_uuid uuid = {
Expand Down Expand Up @@ -667,7 +667,7 @@
count++;
tot += r->len;

r = model_list_it_element(it);

Check warning on line 670 in library/connect.c

View workflow job for this annotation

GitHub Actions / Linux ARM64

assignment discards 'const' qualifier from pointer target type [-Wdiscarded-qualifiers]

Check warning on line 670 in library/connect.c

View workflow job for this annotation

GitHub Actions / Linux x86_64

assignment discards 'const' qualifier from pointer target type [-Wdiscarded-qualifiers]
it = model_list_it_next(it);
} while(r != NULL);
CONN_LOG(DEBUG, "consolidated %d payloads total_len[%zd]", count, tot);
Expand Down Expand Up @@ -696,7 +696,7 @@
conn_set_state(conn, conn->close ? Closed : Disconnected);
ziti_channel_t *ch = conn->channel;
if (ch) {
ziti_channel_rem_receiver(ch, (int)conn->conn_id);
ziti_channel_rem_receiver(ch, conn->rt_conn_id);
conn->channel = NULL;
}
}
Expand Down Expand Up @@ -1065,7 +1065,7 @@
if (strncmp(INVALID_SESSION, (const char *) msg->body, msg->header.body_len) == 0) {
CONN_LOG(WARN, "session for service[%s] became invalid", conn->service);
ziti_invalidate_session(conn->ziti_ctx, conn->conn_req->service_id, ziti_session_types.Dial);
ziti_channel_rem_receiver(conn->channel, conn->conn_id);
ziti_channel_rem_receiver(conn->channel, conn->rt_conn_id);
conn->channel = NULL;
restart_connect(conn);
} else {
Expand Down Expand Up @@ -1126,10 +1126,10 @@

CONN_LOG(TRACE, "ch[%d] => Edge Connect request token[%s]", ch->id, session->token);
conn->channel = ch;
ziti_channel_add_receiver(ch, conn->conn_id, conn,
ziti_channel_add_receiver(ch, conn->rt_conn_id, conn,
(void (*)(void *, message *, int)) queue_edge_message);

int32_t conn_id = htole32(conn->conn_id);
int32_t conn_id = htole32(conn->rt_conn_id);
int32_t msg_seq = htole32(0);

const ziti_identity *identity = ziti_get_identity(conn->ziti_ctx);
Expand Down Expand Up @@ -1203,7 +1203,26 @@
return ZITI_OK;
}

static void accept_cb(ziti_connection conn, ssize_t i, void *data) {
ziti_conn_cb cb = data;
if (i < 0) {
CONN_LOG(ERROR, "accept failed: %zd[%s]", i, ziti_errorstr(i));
conn_set_state(conn, Disconnected);
if (cb) {
cb(conn, (int)i);
}
return;
}
conn_set_state(conn, Connected);
if (conn->encrypted) {
send_crypto_header(conn);
}

if (cb) {
CONN_LOG(TRACE, "accept succeeded");
cb(conn, ZITI_OK);
}
}

int ziti_accept(ziti_connection conn, ziti_conn_cb cb, ziti_data_cb data_cb) {

Expand All @@ -1215,6 +1234,7 @@
return ZITI_INVALID_STATE;
}

CONN_LOG(DEBUG, "accepting");
ziti_channel_t *ch = conn->channel;
conn->data_cb = data_cb;

Expand All @@ -1224,7 +1244,7 @@
conn->flusher->data = conn;
uv_unref((uv_handle_t *) &conn->flusher);

ziti_channel_add_receiver(ch, conn->conn_id, conn, (void (*)(void *, message *, int)) queue_edge_message);
ziti_channel_add_receiver(ch, conn->rt_conn_id, conn, (void (*)(void *, message *, int)) queue_edge_message);

CONN_LOG(TRACE, "ch[%d] => Edge Accept parent_conn_id[%d]", ch->id, conn->parent->conn_id);

Expand All @@ -1233,7 +1253,7 @@
int32_t conn_id = htole32(conn->parent->conn_id);
int32_t msg_seq = htole32(0);
int32_t reply_id = htole32(conn->dial_req_seq);
int32_t clt_conn_id = htole32(conn->conn_id);
int32_t clt_conn_id = htole32(conn->rt_conn_id);
hdr_t headers[] = {
{
.header_id = ConnIdHeader,
Expand All @@ -1251,19 +1271,22 @@
.value = (uint8_t *) &reply_id
},
};
NEWP(req, struct ziti_conn_req);
req->cb = cb;
conn->conn_req = req;

// add accept deadline in case ER fails to send us Accept result
ztx_set_deadline(conn->ziti_ctx, ZITI_DEFAULT_TIMEOUT, &req->deadline, connect_timeout, conn);
struct ziti_write_req_s *ar = calloc(1, sizeof(*ar));
ar->conn = conn;
ar->cb = accept_cb;
ar->ctx = cb;

req->waiter = ziti_channel_send_for_reply(
ch, content_type, headers, 3,
(const uint8_t *) &clt_conn_id, sizeof(clt_conn_id),
connect_reply_cb, conn);
int rc = ziti_channel_send(ch, content_type, headers, 3,
(const uint8_t *) &clt_conn_id, sizeof(clt_conn_id),
ar);
if (rc == ZITI_OK) {
TAILQ_INSERT_TAIL(&conn->pending_wreqs, ar, _next);
} else {
free(ar);
}

return ZITI_OK;
return rc;
}

int ziti_write(ziti_connection conn, uint8_t *data, size_t length, ziti_write_cb write_cb, void *write_ctx) {
Expand Down Expand Up @@ -1409,7 +1432,7 @@
size_t uuid_len;
bool has_seq = message_get_int32_header(msg, SeqHeader, &seq);
bool has_conn_id = message_get_int32_header(msg, ConnIdHeader, &conn_id);
assert(has_conn_id && conn_id == conn->conn_id);
assert(has_conn_id && conn_id == conn->rt_conn_id);

message_get_int32_header(msg, FlagsHeader, (int32_t*)&flags);
uint32_t caps = flags & CONN_CAP_MASK;
Expand Down Expand Up @@ -1451,7 +1474,7 @@
retry_connect = true;
}
if (retry_connect) {
ziti_channel_rem_receiver(conn->channel, conn->conn_id);
ziti_channel_rem_receiver(conn->channel, conn->rt_conn_id);
conn->channel = NULL;
conn_set_state(conn, Connecting);
restart_connect(conn);
Expand Down
1 change: 1 addition & 0 deletions library/ziti.c
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@
c->ziti_ctx = ztx;
c->data = data;
c->conn_id = ztx->conn_seq++;
c->rt_conn_id = c->conn_id;

*conn = c;
model_map_setl(&ctx->connections, (long) c->conn_id, c);
Expand Down Expand Up @@ -1806,7 +1807,7 @@
id_it = model_list_it_remove(id_it);
}

MODEL_MAP_FOREACH(conn_id, conn, &ztx->connections) {

Check warning on line 1810 in library/ziti.c

View workflow job for this annotation

GitHub Actions / MacOS x86_64

comparison between pointer and integer ('uint32_t' (aka 'unsigned int') and 'void *') [-Wpointer-integer-compare]

Check warning on line 1810 in library/ziti.c

View workflow job for this annotation

GitHub Actions / MacOS arm64

comparison between pointer and integer ('uint32_t' (aka 'unsigned int') and 'void *') [-Wpointer-integer-compare]
if (conn->type == Server) {
update_bindings(conn);
}
Expand Down
Loading