Skip to content

Commit 9fff977

Browse files
committed
test again
Signed-off-by: Jaylin <jaylin@emqx.io>
1 parent 0935ecc commit 9fff977

File tree

7 files changed

+149
-111
lines changed

7 files changed

+149
-111
lines changed

src/core/pipe.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ static void
3333
pipe_destroy(void *arg)
3434
{
3535
nni_pipe *p = arg;
36-
if (p == NULL || p->cache) {
36+
if (p == NULL || nni_atomic_get_bool(&p->cache)) {
3737
return;
3838
}
3939

@@ -274,7 +274,7 @@ pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tran_data)
274274
p->p_ref = 1;
275275
// NanoMQ
276276
p->packet_id = 0;
277-
p->cache = false;
277+
nni_atomic_set_bool(&p->cache, false);
278278
p->subinfol = nni_zalloc(sizeof(nni_list));
279279
NNI_LIST_INIT(p->subinfol, struct subinfo, node);
280280

@@ -456,7 +456,7 @@ nni_pipe_get_conn_param(nni_pipe *p)
456456
bool
457457
nni_pipe_get_status(nni_pipe *p)
458458
{
459-
return p->cache;
459+
return nni_atomic_get_bool(&p->cache);
460460
}
461461

462462
uint16_t
@@ -492,7 +492,7 @@ nni_pipe_set_pid(nni_pipe *new_pipe, uint32_t id)
492492
rv = nni_id_set(&pipes, id, new_pipe);
493493
// Kick out duplicated Client ID
494494
nni_mtx_unlock(&pipes_lk);
495-
if (!p->cache || rv != 0) {
495+
if (!nni_atomic_get_bool(&p->cache) || rv != 0) {
496496
log_error("Client ID collision or set ID failed!");
497497
// Must close old pipe first to make it like a normal disconnect
498498
// so that new pipe can inherit.

src/core/sockimpl.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,13 @@ struct nni_pipe {
135135
#endif
136136

137137
// NanoMQ
138-
void *conn_param;
139-
bool cache;
140-
uint16_t packet_id;
141-
nni_list *subinfol; // additional info for sub
142-
// nano_qos_db stores qos msgs in 'sqlite' or 'nni_id_hash_map'
143-
void *nano_qos_db; // protected by pipe lock.
144-
void *old;
138+
void *conn_param;
139+
void *nano_qos_db; // protected by pipe lock of transport.
140+
// nano_qos_db stores qos msgs in 'sqlite' or 'nni_id_hash_map'
141+
void *old;
142+
uint16_t packet_id;
143+
nni_list *subinfol; // additional info for sub
144+
nni_atomic_bool cache;
145145
};
146146

147147
extern int nni_sock_add_dialer(nni_sock *, nni_dialer *);

src/sp/protocol/mqtt/nmq_mqtt.c

Lines changed: 56 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ nano_pipe_timer_cb(void *arg)
191191
}
192192
nni_mtx_lock(&p->lk);
193193
// TODO pipe lock or sock lock?
194-
if (npipe->cache) {
194+
if (nni_atomic_get_bool(&npipe->cache)) {
195195
nng_time will_intval = p->conn_param->will_delay_interval;
196196
nng_time session_int = p->conn_param->session_expiry_interval;
197197
p->ka_refresh++;
@@ -211,7 +211,7 @@ nano_pipe_timer_cb(void *arg)
211211
old = nni_id_get(&s->cached_sessions, p->pipe->p_id);
212212
if (old != NULL) {
213213
old->event = true;
214-
old->pipe->cache = false;
214+
nni_atomic_set_bool(&old->pipe->cache, false);
215215
#ifdef NNG_SUPP_SQLITE
216216
nni_qos_db_remove_by_pipe(is_sqlite,
217217
old->nano_qos_db, old->pipe->p_id);
@@ -404,8 +404,9 @@ nano_ctx_send(void *arg, nni_aio *aio)
404404
if ((p = nni_id_get(&s->pipes, pipe)) == NULL) {
405405
// pre-configured session
406406
void *qos_db = NULL;
407-
if (s->conf->ext_qos_db)
408-
qos_db = nng_id_get(s->conf->ext_qos_db, pipe);
407+
if (s->conf->ext_qos_db) {
408+
qos_db = nng_id_get(s->conf->ext_qos_db, pipe);
409+
}
409410
if (qos_db != NULL) {
410411
if (nni_msg_get_type(msg) == CMD_PUBLISH &&
411412
nni_msg_get_pub_qos(msg) > 0) {
@@ -442,6 +443,7 @@ nano_ctx_send(void *arg, nni_aio *aio)
442443
if ((rv = nni_aio_schedule(aio, nano_ctx_cancel_send, ctx)) != 0) {
443444
nni_msg_free(msg);
444445
nni_mtx_unlock(&p->lk);
446+
nni_aio_set_msg(aio, NULL);
445447
return;
446448
}
447449
log_debug("pipe %d occupied! resending in cb!", pipe);
@@ -552,7 +554,7 @@ static void
552554
nano_pipe_stop(void *arg)
553555
{
554556
nano_pipe *p = arg;
555-
if (p->pipe->cache)
557+
if (nni_atomic_get_bool(&p->pipe->cache))
556558
return; // your time is yet to come
557559

558560
log_trace(" ########## nano_pipe_stop ########## ");
@@ -570,7 +572,7 @@ nano_pipe_fini(void *arg)
570572
nng_msg *msg;
571573

572574
log_trace(" ########## nano_pipe_fini ########## ");
573-
if (p->pipe->cache) {
575+
if (nni_atomic_get_bool(&p->pipe->cache)) {
574576
return; // your time is yet to come
575577
}
576578
nni_mtx_lock(&s->lk);
@@ -583,16 +585,7 @@ nano_pipe_fini(void *arg)
583585
nni_msg_free(msg);
584586
}
585587

586-
//Safely free the msgs in qos_db, only when nano_qos_db is not taken by new pipe
587-
void *nano_qos_db = p->pipe->nano_qos_db;
588-
if (p->event == true) {
589-
if (!p->broker->conf->sqlite.enable && nano_qos_db != NULL) {
590-
nni_qos_db_remove_all_msg(false,
591-
nano_qos_db, nmq_close_unack_msg_cb);
592-
nni_qos_db_fini_id_hash(nano_qos_db);
593-
p->pipe->nano_qos_db = NULL;
594-
}
595-
} else {
588+
if (p->event != true) {
596589
// we keep all structs in broker layer, except this conn_param
597590
conn_param_free(p->conn_param);
598591
}
@@ -707,26 +700,30 @@ nano_pipe_start(void *arg)
707700
if (old != NULL) {
708701
// there should be no msg in this map
709702
if (!is_sqlite && p->pipe->nano_qos_db!= NULL) {
703+
nni_qos_db_remove_all_msg(false,
704+
p->pipe->nano_qos_db, nmq_close_unack_msg_cb);
710705
nni_qos_db_fini_id_hash(p->pipe->nano_qos_db);
711706
p->pipe->nano_qos_db = NULL;
712707
}
713708
log_info("resuming session %d with %d", npipe->p_id, old->pipe->p_id);
714-
npipe->old = old->pipe;
715-
nni_pipe_peer(npipe);
709+
// npipe->old = old->pipe;
710+
old->pipe->old = npipe;
711+
// nni_pipe_peer(npipe);
712+
nni_pipe_peer(old->pipe);
716713
p->id = nni_pipe_id(npipe);
717714
// set event to false so that no notification will be sent
718715
p->event = false;
719716
// set event of old pipe to false and discard it.
720717
old->event = false;
721-
old->pipe->cache = false;
718+
// old->pipe->cache = false;
722719
nni_id_remove(&s->cached_sessions, p->pipe->p_id);
723720
}
724721
} else {
725722
// clean previous session
726723
old = nni_id_get(&s->cached_sessions, p->pipe->p_id);
727724
if (old != NULL) {
728-
old->event = true;
729-
old->pipe->cache = false;
725+
old->event = true;
726+
nni_atomic_swap_bool(&old->pipe->cache, false);
730727
#ifdef NNG_SUPP_SQLITE
731728
nni_qos_db_remove_by_pipe(
732729
is_sqlite, old->nano_qos_db, old->pipe->p_id);
@@ -735,8 +732,6 @@ nano_pipe_start(void *arg)
735732
nni_qos_db_remove_unused_msg(
736733
is_sqlite, old->nano_qos_db);
737734
#endif
738-
// nni_qos_db_remove_all_msg(is_sqlite, old->nano_qos_db,
739-
// nmq_close_unack_msg_cb);
740735
nni_id_remove(&s->cached_sessions, p->pipe->p_id);
741736
log_info("cleaning session %d from cache", p->pipe->p_id);
742737
}
@@ -761,17 +756,16 @@ nano_pipe_start(void *arg)
761756
}
762757
nmq_connack_encode(msg, p->conn_param, rv);
763758
conn_param_free(p->conn_param);
764-
if (old)
765-
nni_msg_set_proto_data(msg, NULL, old->pipe);
766759
if (rv != 0) {
767760
// send connack with reason code 0x05
768761
log_warn("Invalid auth info.");
769762
}
770763

771-
// Dont need to manage id_map while enable SQLite.
764+
// Recover preset sessions
772765
void *qos_db = NULL;
773766
if (s->conf->ext_qos_db)
774767
qos_db = nng_id_get(s->conf->ext_qos_db, p->pipe->p_id);
768+
// Dont need to manage id_map while enable SQLite.
775769
if (qos_db != NULL && !s->conf->sqlite.enable) {
776770
// check sqlite compatibility
777771
if (p->nano_qos_db != NULL)
@@ -851,7 +845,7 @@ nano_pipe_close(void *arg)
851845
char *clientid = NULL;
852846

853847
log_trace(" ############## nano_pipe_close [%p] ############## ", p);
854-
if (npipe->cache == true) {
848+
if (nni_atomic_get_bool(&npipe->cache)) {
855849
// not first time we trying to close stored session pipe
856850
nni_atomic_swap_bool(&npipe->p_closed, false);
857851
return -1;
@@ -866,7 +860,7 @@ nano_pipe_close(void *arg)
866860
clientid = (char *) conn_param_get_clientid(p->conn_param);
867861
}
868862
if (clientid) {
869-
nni_pipe *new_pipe;
863+
nni_pipe *new_pipe = NULL;
870864
if (nni_pipe_find(&new_pipe, npipe->p_id) == 0) {
871865
log_debug("keep session id [%s] ", p->conn_param->clientid.body);
872866
nni_pipe_rele(new_pipe);
@@ -875,15 +869,15 @@ nano_pipe_close(void *arg)
875869
} else {
876870
// client with session stored is kicking itself
877871
log_info("A keeping Session is kicked out");
872+
// also cache kicked session
873+
// merging 2 pipes together in pipe start
878874
}
879875
log_info("session stored %d", npipe->p_id);
880876
nni_id_set(&s->cached_sessions, npipe->p_id, p);
881-
// set event to false avoid of sending the
882-
// disconnecting msg
877+
// set event to false avoid of sending the disconnecting msg
883878
p->event = false;
884-
npipe->cache = true;
885-
// set clean start to 1, prevent caching
886-
// session twice
879+
nni_atomic_set_bool(&npipe->cache, true);
880+
// set clean start to 1, prevent caching session twice
887881
p->conn_param->clean_start = 1;
888882
nni_atomic_swap_bool(&npipe->p_closed, false);
889883
if (nni_list_active(&s->recvpipes, p)) {
@@ -900,24 +894,24 @@ nano_pipe_close(void *arg)
900894
}
901895

902896
// have to close & stop aio timer first, otherwise we hit null qos_db
903-
nni_aio_finish_error(&p->aio_timer, NNG_ECANCELED);
904-
nni_aio_close(&p->aio_timer);
905-
nni_aio_close(&p->aio_send);
906-
nni_aio_close(&p->aio_recv);
907-
// take params from npipe to new pipe
908-
// new_pipe->packet_id = npipe->packet_id;
909-
// there should be no msg in this map
910-
if (!s->conf->sqlite.enable && new_pipe->nano_qos_db != NULL)
911-
nni_qos_db_fini_id_hash(new_pipe->nano_qos_db);
912-
// new_pipe->nano_qos_db = npipe->nano_qos_db;
913-
// npipe->nano_qos_db = NULL;
914-
915-
// nni_list *l = new_pipe->subinfol;
916-
// new_pipe->subinfol = npipe->subinfol;
917-
// npipe->subinfol = l;
918-
new_pipe->old = npipe;
919-
nni_pipe_peer(new_pipe);
920-
log_info("client kick itself while keeping session!");
897+
// nni_aio_finish_error(&p->aio_timer, NNG_ECANCELED);
898+
// nni_aio_close(&p->aio_timer);
899+
// nni_aio_close(&p->aio_send);
900+
// nni_aio_close(&p->aio_recv);
901+
// // take params from npipe to new pipe
902+
// // new_pipe->packet_id = npipe->packet_id;
903+
// // there should be no msg in this map
904+
// if (!s->conf->sqlite.enable && new_pipe->nano_qos_db != NULL)
905+
// nni_qos_db_fini_id_hash(new_pipe->nano_qos_db);
906+
// // new_pipe->nano_qos_db = npipe->nano_qos_db;
907+
// // npipe->nano_qos_db = NULL;
908+
909+
// // nni_list *l = new_pipe->subinfol;
910+
// // new_pipe->subinfol = npipe->subinfol;
911+
// // npipe->subinfol = l;
912+
// new_pipe->old = npipe;
913+
// nni_pipe_peer(new_pipe);
914+
// log_info("client kick itself while keeping session!");
921915
} else {
922916
nni_aio_close(&p->aio_send);
923917
nni_aio_close(&p->aio_recv);
@@ -1178,15 +1172,17 @@ nano_pipe_recv_cb(void *arg)
11781172
nni_mtx_lock(&p->lk);
11791173
NNI_GET16(ptr, ackid);
11801174
p->rid = ackid + 1;
1181-
if ((qos_msg = nni_qos_db_get(is_sqlite, npipe->nano_qos_db,
1182-
npipe->p_id, ackid)) != NULL) {
1183-
nni_qos_db_remove_msg(
1184-
is_sqlite, npipe->nano_qos_db, qos_msg);
1185-
nni_qos_db_remove(
1186-
is_sqlite, npipe->nano_qos_db, npipe->p_id, ackid);
1187-
} else {
1188-
log_warn("ACK failed! qos msg %ld not found!", ackid);
1189-
}
1175+
// if (npipe->nano_qos_db == NULL)
1176+
// log_error("mem leak!!!!!!!!!!!");
1177+
// if ((qos_msg = nni_qos_db_get(is_sqlite, npipe->nano_qos_db,
1178+
// npipe->p_id, ackid)) != NULL) {
1179+
// nni_qos_db_remove_msg(
1180+
// is_sqlite, npipe->nano_qos_db, qos_msg);
1181+
// nni_qos_db_remove(
1182+
// is_sqlite, npipe->nano_qos_db, npipe->p_id, ackid);
1183+
// } else {
1184+
// log_warn("ACK failed! qos msg %ld not found!", ackid);
1185+
// }
11901186
nni_mtx_unlock(&p->lk);
11911187
case CMD_CONNECT:
11921188
case CMD_PUBREC:

0 commit comments

Comments
 (0)