Skip to content

Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165" #3330

@gaurangomar

Description

@gaurangomar

Environment:

  • eKuiper version : 1.13.3
  • Hardware configuration:
    Architecture: x86_64
    CPU op-mode(s): 32-bit, 64-bit
    Address sizes: 39 bits physical, 48 bits virtual
    Byte Order: Little Endian
    CPU(s): 8
    On-line CPU(s) list: 0-7
    Vendor ID: GenuineIntel
    Model name: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz
    CPU family: 6
    Model: 94
    Thread(s) per core: 2
    Core(s) per socket: 4
    Socket(s): 1
  • OS:
    NAME="Oracle Linux Server"
    VERSION="9.0"
  • Others:

What happened and what you expected to happen:
We have multiple rules and we are experiencing an intermittent issue that rules are getting into stopped state.
We are using MQTT as a source and same MQTT broker as SINK.
Rules works fine but intermittently we see that some rules go into stopped state and stop processing data publishing to the source topic.
We are not able to see any error logs, below are the logs which we observed.
We are always sending structured data into the source topic, random/ raw entries will never exist in the source topic.

time="2024-10-23T19:01:44Z" level=info msg="new mqtt client created" file="mqtt/connection.go:165" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="The connection to mqtt broker is established" file="mqtt/connection.go:62" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Open connector reader" file="mqtt/mqtt_source_connector.go:167" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:01:44Z" level=info msg="Rule RULE_CSB1XGR011QG_CSB1WY9R11QG is replaced." file="processor/rule.go:112"
time="2024-10-23T19:01:44Z" level=info msg="Rule RULE_CSB1XGR011QG_CSB1WY9R11QG is update." file="processor/rule.go:90"
time="2024-10-23T19:03:17Z" level=info msg="Stream STREAM_CSB1X7KG11QG_CSB1WPKC11QG is replaced." file="processor/stream.go:213"
time="2024-10-23T19:03:17Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendError:true Qos:0 CheckpointInterval:300000 RestartStrategy:0xc003b28750 Cron: Duration: CronDatetimeRange:[]}" file="planner/planner.go:48"
time="2024-10-23T19:03:17Z" level=info msg="Set config 'mqtt_source.mqtt_source_server' to 'tcp://edge-message-bus:1883' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:03:17Z" level=info msg="Set config 'mqtt_source.mqtt_source_qos' to '0' by environment variable" file="conf/load.go:130"
time="2024-10-23T19:03:17Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:03:17Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=error msg="not found subscription id RULE_CSB1X7KG11QG_CSB1WPKC11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Opening stream" file="topo/topo.go:208" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="decode op started" file="node/decode_op.go:76" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WPGM11QG/CSB1X7KG11QG/CSB1WPKC11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="open sink node 1 instances with batchSize%!(EXTRA int=0)" file="node/sink_node.go:115" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:03:17Z" level=info msg="Connect MQTT broker tcp://edge-message-bus:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-10-23T19:03:17Z" level=info msg="Opening source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG" file="node/source_connector_node.go:64" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Connecting to mqtt server" file="mqtt/mqtt_source_connector.go:86" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="new mqtt client created" file="mqtt/connection.go:165" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="Open connector reader" file="mqtt/mqtt_source_connector.go:167" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="The connection to mqtt broker is established" file="mqtt/connection.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="The connection to mqtt broker is established successfully for tcp://edge-message-bus:1883." file="mqtt/mqtt.go:132"
time="2024-10-23T19:03:17Z" level=info msg="new mqtt client created" file="mqtt/mqtt_wrapper.go:80"
time="2024-10-23T19:03:17Z" level=info msg="Init client wrapper for client type mqtt" file="clients/client_registy.go:107"
time="2024-10-23T19:03:17Z" level=info msg="sink node mqtt_0 instance 0 starts with conf {Concurrency:1 Omitempty:false SendSingle:false DataTemplate: Format:json SchemaId: Delimiter: BufferLength:1024 Fields:[] DataField: BatchSize:0 LingerInterval:0 SinkConf:{MemoryCacheThreshold:1024 MaxDiskCache:1024000 BufferPageSize:256 EnableCache:false ResendInterval:0 CleanCacheAtStop:false ResendAlterQueue:false ResendPriority:0 ResendIndicatorField:}}" file="node/sink_node.go:154" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:17Z" level=info msg="The connection to mqtt broker tcp://edge-message-bus:1883 client id 768b49fd-9171-11ef-b0f5-c6a7ee380e91 established" file="mqtt/mqtt_wrapper.go:87"
time="2024-10-23T19:03:17Z" level=info msg="Rule RULE_CSB1X7KG11QG_CSB1WPKC11QG is replaced." file="processor/rule.go:112"
time="2024-10-23T19:03:17Z" level=info msg="Rule RULE_CSB1X7KG11QG_CSB1WPKC11QG is update." file="processor/rule.go:90"
time="2024-10-23T19:03:17Z" level=info msg="start removing RULE_CSB1XGR011QG_CSB1WY9R11QG metrics" file="topo/topo.go:311"
time="2024-10-23T19:03:17Z" level=info msg="finish removing RULE_CSB1XGR011QG_CSB1WY9R11QG metrics" file="topo/topo.go:321"
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=error msg="not found subscription id RULE_CSB1XGR011QG_CSB1WY9R11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:03:17Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="source connector STREAM_CSB1XGR011QG_CSB1WY9R11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WY8W11QG/CSB1XGR011QG/CSB1WY9R11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1XGR011QG_CSB1WY9R11QG
time="2024-10-23T19:03:17Z" level=info msg="Stop rulestate RULE_CSB1XGR011QG_CSB1WY9R11QG" file="rule/ruleState.go:186"
time="2024-10-23T19:03:19Z" level=info msg="start removing RULE_CSB1X7KG11QG_CSB1WPKC11QG metrics" file="topo/topo.go:311"
time="2024-10-23T19:03:19Z" level=info msg="finish removing RULE_CSB1X7KG11QG_CSB1WPKC11QG metrics" file="topo/topo.go:321"
time="2024-10-23T19:03:19Z" level=info msg="merge done" file="node/concurrent.go:62" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="distribute done" file="node/concurrent.go:79" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="unary operator 3_project instance 0 cancelling...." file="node/operations.go:143" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="unary operator 3_project instance 0 done, cancelling future items" file="node/operations.go:87" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="source connector STREAM_CSB1X7KG11QG_CSB1WPKC11QG is finished" file="node/source_connector_node.go:92" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="sink node mqtt_0 instance 0 done" file="node/sink_node.go:268" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="Closing mqtt sink" file="mqtt/mqtt_sink.go:160" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=error msg="not found subscription id RULE_CSB1X7KG11QG_CSB1WPKC11QG_mqtt_0_0" file="mqtt/mqtt_wrapper.go:253" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:301" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
time="2024-10-23T19:03:19Z" level=info msg="Closing the connection to mqtt broker for tcp://edge-message-bus:1883" file="mqtt/mqtt.go:165"
time="2024-10-23T19:03:19Z" level=info msg="Stop rulestate RULE_CSB1X7KG11QG_CSB1WPKC11QG" file="rule/ruleState.go:186"
time="2024-10-23T19:03:19Z" level=info msg="Closing mqtt source connector to topic edge/device/SENSOR/edge-sample/CSB1WPGM11QG/CSB1X7KG11QG/CSB1WPKC11QG." file="mqtt/mqtt_source_connector.go:172" rule=RULE_CSB1X7KG11QG_CSB1WPKC11QG
�[?2004h

How to reproduce it (as minimally and precisely as possible):
We do not have any specific scenario to reproduce this issue, this is an intermittent issue.
We are using the etc/mqtt_source.yaml file for providing the source connection info, and for the SINK we use something like below.

"actions": [
    {
      "mqtt": {
        "server": "tcp://edge-message-bus:1883",
        "topic": "kuiper-sink-channel",
        "qos": 0,
        "username": "kuiper-user",
        "password": "password",
        "retained": false
      }
    }
  ]

Anything else we need to know?:
Let me know in any other details are required.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions