Skip to content

Commit d7feb2f

Browse files
authored
Merge pull request #2356 from fengzeroz/main
mqtt custom format
2 parents afb5e55 + 80803c7 commit d7feb2f

27 files changed

+624
-107
lines changed

include/neuron/msg.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ typedef struct {
494494
char group[NEU_GROUP_NAME_LEN];
495495
uint16_t port;
496496
char * params;
497+
char * static_tags;
497498
} neu_req_subscribe_t;
498499

499500
typedef struct {
@@ -507,6 +508,7 @@ typedef struct {
507508
char * group;
508509
uint16_t port;
509510
char * params;
511+
char * static_tags;
510512
} neu_req_subscribe_group_info_t;
511513

512514
typedef struct {
@@ -522,6 +524,7 @@ neu_req_subscribe_groups_fini(neu_req_subscribe_groups_t *req)
522524
free(req->groups[i].driver);
523525
free(req->groups[i].group);
524526
free(req->groups[i].params);
527+
free(req->groups[i].static_tags);
525528
}
526529
free(req->groups);
527530
free(req->app);
@@ -542,11 +545,13 @@ typedef struct neu_resp_subscribe_info {
542545
char driver[NEU_NODE_NAME_LEN];
543546
char group[NEU_GROUP_NAME_LEN];
544547
char *params;
548+
char *static_tags;
545549
} neu_resp_subscribe_info_t;
546550

547551
static inline void neu_resp_subscribe_info_fini(neu_resp_subscribe_info_t *info)
548552
{
549553
free(info->params);
554+
free(info->static_tags);
550555
}
551556

552557
typedef struct {

include/neuron/persist/persist.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ typedef struct {
4848
char *driver_name;
4949
char *group_name;
5050
char *params;
51+
char *static_tags;
5152
} neu_persist_subscription_info_t;
5253

5354
typedef struct {
@@ -80,6 +81,9 @@ neu_persist_subscription_info_fini(neu_persist_subscription_info_t *info)
8081
free(info->driver_name);
8182
free(info->group_name);
8283
free(info->params);
84+
if (info->static_tags) {
85+
free(info->static_tags);
86+
}
8387
}
8488

8589
static inline void neu_persist_user_info_fini(neu_persist_user_info_t *info)
@@ -215,8 +219,8 @@ int neu_persister_delete_tag(const char *driver_name, const char *group_name,
215219
*/
216220
int neu_persister_store_subscription(const char *app_name,
217221
const char *driver_name,
218-
const char *group_name,
219-
const char *params);
222+
const char *group_name, const char *params,
223+
const char *static_tags);
220224

221225
/**
222226
* Update subscriptions.
@@ -229,7 +233,8 @@ int neu_persister_store_subscription(const char *app_name,
229233
int neu_persister_update_subscription(const char *app_name,
230234
const char *driver_name,
231235
const char *group_name,
232-
const char *params);
236+
const char *params,
237+
const char *static_tags);
233238

234239
/**
235240
* Load adapter subscriptions.

persistence/0112_2.11.0_tag.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,13 @@ CREATE TABLE
6363
driver_name TEXT NOT NULL,
6464
group_name TEXT NOT NULL,
6565
params TEXT DEFAULT NULL,
66+
static_tags TEXT DEFAULT NULL,
6667
CHECK (app_name != driver_name),
6768
UNIQUE (app_name, driver_name, group_name),
6869
FOREIGN KEY (app_name) REFERENCES nodes (name) ON UPDATE CASCADE ON DELETE CASCADE,
6970
FOREIGN KEY (driver_name, group_name) REFERENCES groups (driver_name, name) ON UPDATE CASCADE ON DELETE CASCADE
7071
);
71-
INSERT INTO subscriptions SELECT * FROM temp_subscriptions;
72+
INSERT INTO subscriptions SELECT app_name, driver_name, group_name, params, NULL FROM temp_subscriptions;
7273

7374
DROP TABLE temp_subscriptions;
7475

plugins/mqtt/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ add_library(${PROJECT_NAME} SHARED
1111
mqtt_handle.c
1212
mqtt_plugin.c
1313
mqtt_plugin_intf.c
14+
schema.c
1415
)
1516

1617
target_include_directories(${PROJECT_NAME} PRIVATE
@@ -30,6 +31,7 @@ add_library(${AWS_PLUGIN} SHARED
3031
mqtt_handle.c
3132
mqtt_plugin_intf.c
3233
aws_iot_plugin.c
34+
schema.c
3335
)
3436

3537
target_include_directories(${AWS_PLUGIN} PRIVATE
@@ -49,6 +51,7 @@ add_library(${AZURE_PLUGIN} SHARED
4951
mqtt_handle.c
5052
mqtt_plugin_intf.c
5153
azure_iot_plugin.c
54+
schema.c
5255
)
5356

5457
target_include_directories(${AZURE_PLUGIN} PRIVATE

plugins/mqtt/azure_iot_plugin.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,8 @@ static int azure_handle_trans_data(neu_plugin_t * plugin,
299299
return NEU_ERR_MQTT_FAILURE;
300300
}
301301

302-
char *json_str =
303-
generate_upload_json(plugin, trans_data, plugin->config.format, NULL);
302+
char *json_str = generate_upload_json(
303+
plugin, trans_data, plugin->config.format, NULL, 0, NULL, 0, NULL);
304304
if (NULL == json_str) {
305305
plog_error(plugin, "generate upload json fail");
306306
return NEU_ERR_EINTERNAL;

plugins/mqtt/mqtt.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,27 @@
7474
{
7575
"key": "ECP-format",
7676
"value": 2
77+
},
78+
{
79+
"key": "Custom",
80+
"value": 3
7781
}
7882
]
7983
}
8084
},
85+
"schema": {
86+
"name": "Schema",
87+
"name_zh": "数据模式",
88+
"attribute": "optional",
89+
"type": "string",
90+
"condition": {
91+
"field": "format",
92+
"value": 3
93+
},
94+
"valid": {
95+
"length": 81960
96+
}
97+
},
8198
"upload_err": {
8299
"name": "Upload Tag Error Code",
83100
"name_zh": "上报点位错误码",

plugins/mqtt/mqtt_config.c

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,12 +355,30 @@ int mqtt_config_parse(neu_plugin_t *plugin, const char *setting,
355355
// format, required
356356
if (MQTT_UPLOAD_FORMAT_VALUES != format.v.val_int &&
357357
MQTT_UPLOAD_FORMAT_TAGS != format.v.val_int &&
358-
MQTT_UPLOAD_FORMAT_ECP != format.v.val_int) {
358+
MQTT_UPLOAD_FORMAT_ECP != format.v.val_int &&
359+
MQTT_UPLOAD_FORMAT_CUSTOM != format.v.val_int) {
359360
plog_error(plugin, "setting invalid format: %" PRIi64,
360361
format.v.val_int);
361362
goto error;
362363
}
363364

365+
if (format.v.val_int == MQTT_UPLOAD_FORMAT_CUSTOM) {
366+
neu_json_elem_t schema = { .name = "schema", .t = NEU_JSON_STR };
367+
ret = neu_parse_param(setting, &err_param, 1, &schema);
368+
if (0 != ret) {
369+
plog_error(plugin, "parsing schema fail, key: `%s`", err_param);
370+
goto error;
371+
}
372+
373+
ret = mqtt_schema_validate(schema.v.val_str, &config->schema_vts,
374+
&config->n_schema_vt);
375+
if (0 != ret) {
376+
plog_error(plugin, "schema validation fail");
377+
goto error;
378+
}
379+
free(schema.v.val_str);
380+
}
381+
364382
ret = neu_parse_param(setting, &err_param, 2, &driver_action_req_topic,
365383
&driver_action_resp_topic);
366384
if (0 != ret) {
@@ -563,5 +581,8 @@ void mqtt_config_fini(mqtt_config_t *config)
563581
free(config->keypass);
564582
free(config->heartbeat_topic);
565583

584+
if (config->schema_vts) {
585+
free(config->schema_vts);
586+
}
566587
memset(config, 0, sizeof(*config));
567588
}

plugins/mqtt/mqtt_config.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,13 @@ extern "C" {
3030
#include "connection/mqtt_client.h"
3131
#include "plugin.h"
3232

33+
#include "schema.h"
34+
3335
typedef enum {
3436
MQTT_UPLOAD_FORMAT_VALUES = 0,
3537
MQTT_UPLOAD_FORMAT_TAGS = 1,
3638
MQTT_UPLOAD_FORMAT_ECP = 2,
39+
MQTT_UPLOAD_FORMAT_CUSTOM = 3,
3740
} mqtt_upload_format_e;
3841

3942
static inline const char *mqtt_upload_format_str(mqtt_upload_format_e f)
@@ -45,6 +48,8 @@ static inline const char *mqtt_upload_format_str(mqtt_upload_format_e f)
4548
return "format-tags";
4649
case MQTT_UPLOAD_FORMAT_ECP:
4750
return "ECP-format";
51+
case MQTT_UPLOAD_FORMAT_CUSTOM:
52+
return "custom";
4853
default:
4954
return NULL;
5055
}
@@ -78,6 +83,9 @@ typedef struct {
7883
char * keypass; // client key password
7984
// remove in 2.6, keep it here
8085
// for backward compatibility
86+
87+
size_t n_schema_vt;
88+
mqtt_schema_vt_t *schema_vts;
8189
} mqtt_config_t;
8290

8391
int decode_b64_param(neu_plugin_t *plugin, neu_json_elem_t *el);

plugins/mqtt/mqtt_handle.c

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,16 @@ static void to_traceparent(uint8_t *trace_id, char *span_id, char *out)
4040
sprintf(out + size, "-%s-01", span_id);
4141
}
4242

43-
static int tag_values_to_json(UT_array *tags, neu_json_read_resp_t *json)
43+
static int tag_values_to_json(UT_array *tags, mqtt_static_vt_t *s_tags,
44+
size_t n_s_tags, neu_json_read_resp_t *json)
4445
{
4546
int index = 0;
4647

4748
if (0 == utarray_len(tags)) {
4849
return 0;
4950
}
5051

51-
json->n_tag = utarray_len(tags);
52+
json->n_tag = utarray_len(tags) + n_s_tags;
5253
json->tags = (neu_json_read_resp_tag_t *) calloc(
5354
json->n_tag, sizeof(neu_json_read_resp_tag_t));
5455
if (NULL == json->tags) {
@@ -61,6 +62,16 @@ static int tag_values_to_json(UT_array *tags, neu_json_read_resp_t *json)
6162
index += 1;
6263
}
6364

65+
if (s_tags != NULL) {
66+
for (size_t i = 0; i < n_s_tags; i++) {
67+
neu_json_read_resp_tag_t *tag = &json->tags[index];
68+
tag->name = s_tags[i].name;
69+
tag->t = s_tags[i].jtype;
70+
tag->value = s_tags[i].jvalue;
71+
index += 1;
72+
}
73+
}
74+
6475
return 0;
6576
}
6677

@@ -86,7 +97,9 @@ void filter_error_tags(neu_reqresp_trans_data_t *data)
8697
}
8798

8899
char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data,
89-
mqtt_upload_format_e format, bool *skip)
100+
mqtt_upload_format_e format, mqtt_schema_vt_t *vts,
101+
size_t n_vts, mqtt_static_vt_t *s_tags,
102+
size_t n_s_tags, bool *skip)
90103
{
91104
char * json_str = NULL;
92105
neu_json_read_periodic_t header = { .group = (char *) data->group,
@@ -103,9 +116,16 @@ char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data,
103116
}
104117
}
105118

106-
if (0 != tag_values_to_json(data->tags, &json)) {
107-
plog_error(plugin, "tag_values_to_json fail");
108-
return NULL;
119+
if (format == MQTT_UPLOAD_FORMAT_CUSTOM) {
120+
if (0 != tag_values_to_json(data->tags, NULL, 0, &json)) {
121+
plog_error(plugin, "tag_values_to_json fail");
122+
return NULL;
123+
}
124+
} else {
125+
if (0 != tag_values_to_json(data->tags, s_tags, n_s_tags, &json)) {
126+
plog_error(plugin, "tag_values_to_json fail");
127+
return NULL;
128+
}
109129
}
110130

111131
int ret;
@@ -131,6 +151,11 @@ char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data,
131151
data->group);
132152
}
133153
break;
154+
case MQTT_UPLOAD_FORMAT_CUSTOM: {
155+
ret = mqtt_schema_encode(data->driver, data->group, &json, vts, n_vts,
156+
s_tags, n_s_tags, &json_str);
157+
break;
158+
}
134159
default:
135160
plog_warn(plugin, "invalid upload format: %d", format);
136161
break;
@@ -157,7 +182,7 @@ static char *generate_read_resp_json(neu_plugin_t * plugin,
157182
char * json_str = NULL;
158183
neu_json_read_resp_t json = { 0 };
159184

160-
if (0 != tag_values_to_json(data->tags, &json)) {
185+
if (0 != tag_values_to_json(data->tags, NULL, 0, &json)) {
161186
plog_error(plugin, "tag_values_to_json fail");
162187
return NULL;
163188
}
@@ -855,9 +880,22 @@ int handle_trans_data(neu_plugin_t * plugin,
855880
break;
856881
}
857882

858-
bool skip_none = false;
859-
char *json_str = generate_upload_json(
860-
plugin, trans_data, plugin->config.format, &skip_none);
883+
bool skip_none = false;
884+
size_t n_satic_tag = 0;
885+
mqtt_static_vt_t *static_tags = NULL;
886+
if (route->static_tags != NULL && strlen(route->static_tags) > 0) {
887+
mqtt_static_validate(route->static_tags, &static_tags,
888+
&n_satic_tag);
889+
}
890+
891+
char *json_str = generate_upload_json(
892+
plugin, trans_data, plugin->config.format,
893+
plugin->config.schema_vts, plugin->config.n_schema_vt, static_tags,
894+
n_satic_tag, &skip_none);
895+
if (n_satic_tag > 0) {
896+
mqtt_static_free(static_tags, n_satic_tag);
897+
}
898+
861899
if (skip_none) {
862900
break;
863901
}
@@ -919,8 +957,9 @@ int handle_subscribe_group(neu_plugin_t *plugin, neu_req_subscribe_t *sub_info)
919957
goto end;
920958
}
921959

922-
rv = route_tbl_add_new(&plugin->route_tbl, sub_info->driver,
923-
sub_info->group, topic.v.val_str);
960+
rv =
961+
route_tbl_add_new(&plugin->route_tbl, sub_info->driver, sub_info->group,
962+
topic.v.val_str, sub_info->static_tags);
924963
// topic.v.val_str ownership moved
925964
if (0 != rv) {
926965
plog_error(plugin, "route driver:%s group:%s fail, `%s`",
@@ -953,7 +992,7 @@ int handle_update_subscribe(neu_plugin_t *plugin, neu_req_subscribe_t *sub_info)
953992
}
954993

955994
rv = route_tbl_update(&plugin->route_tbl, sub_info->driver, sub_info->group,
956-
topic.v.val_str);
995+
topic.v.val_str, sub_info->static_tags);
957996
// topic.v.val_str ownership moved
958997
if (0 != rv) {
959998
plog_error(plugin, "route driver:%s group:%s fail, `%s`",

plugins/mqtt/mqtt_handle.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ int handle_driver_action_response(neu_plugin_t * plugin,
5858
neu_resp_driver_action_t *data);
5959

6060
char *generate_upload_json(neu_plugin_t *plugin, neu_reqresp_trans_data_t *data,
61-
mqtt_upload_format_e format, bool *skip);
61+
mqtt_upload_format_e format, mqtt_schema_vt_t *vts,
62+
size_t n_vts, mqtt_static_vt_t *s_tags,
63+
size_t n_s_tags, bool *skip);
6264
int handle_trans_data(neu_plugin_t * plugin,
6365
neu_reqresp_trans_data_t *trans_data);
6466

0 commit comments

Comments
 (0)