Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ go.work.sum
pycache
fix_cve.md
ekuiper240b5.csv
tmp
12 changes: 10 additions & 2 deletions internal/io/mqtt/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ func (conn *Connection) onConnect(ctx api.StreamContext) {
} else {
ctx.GetLogger().Warnf("sc handler has not set yet")
}
ctx.GetLogger().Infof("The connection to mqtt broker is established")
if ctx.GetRuleId() != "" {
ctx.GetLogger().Infof("action=mqtt_connection_established connId=%s rule=%s op=%s server=%s", conn.id, ctx.GetRuleId(), ctx.GetOpId(), conn.server)
} else {
ctx.GetLogger().Infof("action=mqtt_connection_established connId=%s server=%s", conn.id, conn.server)
}
conn.subscriptions.Range(func(k, v any) bool {
topic := k.(string)
info := v.(*client.SubscriptionInfo)
Expand All @@ -150,7 +154,11 @@ func (conn *Connection) onConnectLost(ctx api.StreamContext, err error) {
if handler != nil {
handler(api.ConnectionDisconnected, err.Error())
}
ctx.GetLogger().Infof("%v", err)
if ctx.GetRuleId() != "" {
ctx.GetLogger().Warnf("action=mqtt_connection_disconnected connId=%s rule=%s op=%s server=%s err=%v", conn.id, ctx.GetRuleId(), ctx.GetOpId(), conn.server, err)
} else {
ctx.GetLogger().Warnf("action=mqtt_connection_disconnected connId=%s server=%s err=%v", conn.id, conn.server, err)
}
}

func (conn *Connection) onReconnecting(ctx api.StreamContext) {
Expand Down
3 changes: 3 additions & 0 deletions internal/io/mqtt/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func (ms *Sink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) erro
if err != nil {
return err
}
if ms.adconf.SelId != "" {
ctx.GetLogger().Infof("action=use_shared_mqtt_connection role=sink connId=%s connectionKey=%s rule=%s topic=%s", ms.cw.ID, ms.adconf.SelId, ctx.GetRuleId(), ms.adconf.Tpc)
}
conn, err := ms.cw.Wait(ctx)
if conn == nil {
return fmt.Errorf("mqtt client not ready: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions internal/io/mqtt/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func (ms *SourceConnector) Connect(ctx api.StreamContext, sch api.StatusChangeHa
if err != nil {
return err
}
if ms.cfg.SelId != "" {
ctx.GetLogger().Infof("action=use_shared_mqtt_connection role=source connId=%s connectionKey=%s rule=%s stream=%s topic=%s", cw.ID, ms.cfg.SelId, ctx.GetRuleId(), ctx.GetOpId(), ms.tpc)
}
ms.conId = cw.ID
// wait for connection
conn, err := cw.Wait(ctx)
Expand Down
5 changes: 5 additions & 0 deletions internal/server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ func rulesHandler(w http.ResponseWriter, r *http.Request) {
handleError(w, err, "", logger)
return
}
conf.Log.Infof("create rule:%v", id)
w.WriteHeader(http.StatusCreated)
fmt.Fprintf(w, "Rule %s was created successfully.", id)
case http.MethodGet:
Expand Down Expand Up @@ -787,6 +788,7 @@ func ruleHandler(w http.ResponseWriter, r *http.Request) {
handleError(w, err, "Update rule error", logger)
return
}
conf.Log.Infof("update rule:%v", name)
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprintf(w, "Rule %s was updated successfully.", name)
}
Expand Down Expand Up @@ -846,6 +848,7 @@ func startRuleHandler(w http.ResponseWriter, r *http.Request) {
handleError(w, err, "start rule error", logger)
return
}
conf.Log.Infof("start rule:%v", name)
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprintf(w, "Rule %s was started", name)
}
Expand All @@ -861,6 +864,7 @@ func stopRuleHandler(w http.ResponseWriter, r *http.Request) {
handleError(w, err, "stop rule error", logger)
return
}
conf.Log.Infof("stop rule:%v", name)
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprintf(w, "Rule %s was stopped.", name)
}
Expand All @@ -876,6 +880,7 @@ func restartRuleHandler(w http.ResponseWriter, r *http.Request) {
handleError(w, err, "restart rule error", logger)
return
}
conf.Log.Infof("restart rule:%v", name)
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Rule %s was restarted", name)
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/connection/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func FetchConnection(ctx api.StreamContext, refId, typ string, props map[string]
defer globalConnectionManager.Unlock()
if _, ok := globalConnectionManager.connectionPool[conId]; ok {
conf.Log.Infof("FetchConnection return existed conn %s", conId)
if conId != refId {
conf.Log.Infof("action=reuse_connection connId=%s type=%s connectionKey=%s rule=%s op=%s refId=%s", conId, typ, conId, ctx.GetRuleId(), ctx.GetOpId(), refId)
}
} else {
if conId != refId {
return nil, fmt.Errorf("connection %s not existed", conId)
Expand Down Expand Up @@ -341,6 +344,9 @@ func attachConnection(conId string, refId string, sc api.StatusChangeHandler) (*
return nil, fmt.Errorf("connection %s not existed", conId)
}
meta.AddRef(refId, sc)
if conId != refId {
conf.Log.Infof("action=attach_connection_ref connId=%s type=%s connectionKey=%s refId=%s refCount=%d", conId, meta.Typ, conId, refId, meta.GetRefCount())
}
return meta.cw, nil
}

Expand All @@ -354,7 +360,13 @@ func detachConnection(ctx api.StreamContext, conId string) error {
meta.DeRef(refId)
globalConnectionManager.connectionPool[conId] = meta
conf.Log.Infof("detachConnection remove conn:%v,ref:%v", conId, refId)
if conId != refId {
conf.Log.Infof("action=detach_connection_ref connId=%s type=%s connectionKey=%s rule=%s op=%s refId=%s refCount=%d", conId, meta.Typ, conId, ctx.GetRuleId(), ctx.GetOpId(), refId, meta.GetRefCount())
}
if !meta.Named && meta.GetRefCount() == 0 {
if conId != refId {
conf.Log.Infof("action=close_connection connId=%s type=%s connectionKey=%s rule=%s op=%s reason=zero_ref", conId, meta.Typ, conId, ctx.GetRuleId(), ctx.GetOpId())
}
close(meta.cw.detachCh)
conn, err := meta.cw.Wait(ctx)
if conn != nil && err == nil {
Expand Down
Loading