Skip to content

Commit 61aa237

Browse files
committed
Process measurements in parallel for InfluxDB sinks
1 parent 9c8173e commit 61aa237

File tree

2 files changed

+70
-66
lines changed

2 files changed

+70
-66
lines changed

data_sinks/influxdb.go

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -45,39 +45,41 @@ func InfluxDB(conf config.InfluxDBPublisher) chan<- parser.Measurement {
4545
log.WithFields(log.Fields{"mac": measurement.Mac}).Trace("Skipping InfluxDB publish due to interval limit")
4646
continue
4747
}
48-
p := influxdb.NewPointWithMeasurement(measurementName).
49-
AddTag("dataFormat", fmt.Sprintf("%d", measurement.DataFormat)).
50-
AddTag("mac", strings.ReplaceAll(measurement.Mac, ":", ""))
51-
if measurement.Name != nil {
52-
p.AddTag("name", *measurement.Name)
53-
}
54-
for tag, value := range conf.AdditionalTags {
55-
p.AddTag(tag, value)
56-
}
57-
addFloat(p, "temperature", measurement.Temperature)
58-
addFloat(p, "humidity", measurement.Humidity)
59-
addFloat(p, "pressure", measurement.Pressure)
60-
addFloat(p, "accelerationX", measurement.AccelerationX)
61-
addFloat(p, "accelerationY", measurement.AccelerationY)
62-
addFloat(p, "accelerationZ", measurement.AccelerationZ)
63-
addFloat(p, "batteryVoltage", measurement.BatteryVoltage)
64-
addInt(p, "txPower", measurement.TxPower)
65-
addInt(p, "rssi", measurement.Rssi)
66-
addInt(p, "movementCounter", measurement.MovementCounter)
67-
addInt(p, "measurementSequenceNumber", measurement.MeasurementSequenceNumber)
68-
addFloat(p, "accelerationTotal", measurement.AccelerationTotal)
69-
addFloat(p, "absoluteHumidity", measurement.AbsoluteHumidity)
70-
addFloat(p, "dewPoint", measurement.DewPoint)
71-
addFloat(p, "equilibriumVaporPressure", measurement.EquilibriumVaporPressure)
72-
addFloat(p, "airDensity", measurement.AirDensity)
73-
addFloat(p, "accelerationAngleFromX", measurement.AccelerationAngleFromX)
74-
addFloat(p, "accelerationAngleFromY", measurement.AccelerationAngleFromY)
75-
addFloat(p, "accelerationAngleFromZ", measurement.AccelerationAngleFromZ)
76-
p.SetTime(time.Now())
77-
err := writeAPI.WritePoint(context.Background(), p)
78-
if err != nil {
79-
log.WithError(err).Error("Failed to send data to InfluxDB")
80-
}
48+
go func(measurement parser.Measurement) {
49+
p := influxdb.NewPointWithMeasurement(measurementName).
50+
AddTag("dataFormat", fmt.Sprintf("%d", measurement.DataFormat)).
51+
AddTag("mac", strings.ReplaceAll(measurement.Mac, ":", ""))
52+
if measurement.Name != nil {
53+
p.AddTag("name", *measurement.Name)
54+
}
55+
for tag, value := range conf.AdditionalTags {
56+
p.AddTag(tag, value)
57+
}
58+
addFloat(p, "temperature", measurement.Temperature)
59+
addFloat(p, "humidity", measurement.Humidity)
60+
addFloat(p, "pressure", measurement.Pressure)
61+
addFloat(p, "accelerationX", measurement.AccelerationX)
62+
addFloat(p, "accelerationY", measurement.AccelerationY)
63+
addFloat(p, "accelerationZ", measurement.AccelerationZ)
64+
addFloat(p, "batteryVoltage", measurement.BatteryVoltage)
65+
addInt(p, "txPower", measurement.TxPower)
66+
addInt(p, "rssi", measurement.Rssi)
67+
addInt(p, "movementCounter", measurement.MovementCounter)
68+
addInt(p, "measurementSequenceNumber", measurement.MeasurementSequenceNumber)
69+
addFloat(p, "accelerationTotal", measurement.AccelerationTotal)
70+
addFloat(p, "absoluteHumidity", measurement.AbsoluteHumidity)
71+
addFloat(p, "dewPoint", measurement.DewPoint)
72+
addFloat(p, "equilibriumVaporPressure", measurement.EquilibriumVaporPressure)
73+
addFloat(p, "airDensity", measurement.AirDensity)
74+
addFloat(p, "accelerationAngleFromX", measurement.AccelerationAngleFromX)
75+
addFloat(p, "accelerationAngleFromY", measurement.AccelerationAngleFromY)
76+
addFloat(p, "accelerationAngleFromZ", measurement.AccelerationAngleFromZ)
77+
p.SetTime(time.Now())
78+
err := writeAPI.WritePoint(context.Background(), p)
79+
if err != nil {
80+
log.WithError(err).Error("Failed to send data to InfluxDB")
81+
}
82+
}(measurement)
8183
}
8284
client.Close()
8385
}()

data_sinks/influxdb3.go

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -46,39 +46,41 @@ func InfluxDB3(conf config.InfluxDB3Publisher) chan<- parser.Measurement {
4646
log.WithFields(log.Fields{"mac": measurement.Mac}).Trace("Skipping InfluxDB3 publish due to interval limit")
4747
continue
4848
}
49-
p := influxdb3.NewPointWithMeasurement(measurementName).
50-
AddTag("dataFormat", fmt.Sprintf("%d", measurement.DataFormat)).
51-
AddTag("mac", strings.ReplaceAll(measurement.Mac, ":", ""))
52-
if measurement.Name != nil {
53-
p.AddTag("name", *measurement.Name)
54-
}
55-
for tag, value := range conf.AdditionalTags {
56-
p.AddTag(tag, value)
57-
}
58-
influx3AddFloat(p, "temperature", measurement.Temperature)
59-
influx3AddFloat(p, "humidity", measurement.Humidity)
60-
influx3AddFloat(p, "pressure", measurement.Pressure)
61-
influx3AddFloat(p, "accelerationX", measurement.AccelerationX)
62-
influx3AddFloat(p, "accelerationY", measurement.AccelerationY)
63-
influx3AddFloat(p, "accelerationZ", measurement.AccelerationZ)
64-
influx3AddFloat(p, "batteryVoltage", measurement.BatteryVoltage)
65-
influx3AddInt(p, "txPower", measurement.TxPower)
66-
influx3AddInt(p, "rssi", measurement.Rssi)
67-
influx3AddInt(p, "movementCounter", measurement.MovementCounter)
68-
influx3AddInt(p, "measurementSequenceNumber", measurement.MeasurementSequenceNumber)
69-
influx3AddFloat(p, "accelerationTotal", measurement.AccelerationTotal)
70-
influx3AddFloat(p, "absoluteHumidity", measurement.AbsoluteHumidity)
71-
influx3AddFloat(p, "dewPoint", measurement.DewPoint)
72-
influx3AddFloat(p, "equilibriumVaporPressure", measurement.EquilibriumVaporPressure)
73-
influx3AddFloat(p, "airDensity", measurement.AirDensity)
74-
influx3AddFloat(p, "accelerationAngleFromX", measurement.AccelerationAngleFromX)
75-
influx3AddFloat(p, "accelerationAngleFromY", measurement.AccelerationAngleFromY)
76-
influx3AddFloat(p, "accelerationAngleFromZ", measurement.AccelerationAngleFromZ)
77-
p.SetTimestamp(time.Now())
78-
err := client.WritePoints(context.Background(), p)
79-
if err != nil {
80-
log.WithError(err).Error("Failed to send data to InfluxDB3")
81-
}
49+
go func(measurement parser.Measurement) {
50+
p := influxdb3.NewPointWithMeasurement(measurementName).
51+
AddTag("dataFormat", fmt.Sprintf("%d", measurement.DataFormat)).
52+
AddTag("mac", strings.ReplaceAll(measurement.Mac, ":", ""))
53+
if measurement.Name != nil {
54+
p.AddTag("name", *measurement.Name)
55+
}
56+
for tag, value := range conf.AdditionalTags {
57+
p.AddTag(tag, value)
58+
}
59+
influx3AddFloat(p, "temperature", measurement.Temperature)
60+
influx3AddFloat(p, "humidity", measurement.Humidity)
61+
influx3AddFloat(p, "pressure", measurement.Pressure)
62+
influx3AddFloat(p, "accelerationX", measurement.AccelerationX)
63+
influx3AddFloat(p, "accelerationY", measurement.AccelerationY)
64+
influx3AddFloat(p, "accelerationZ", measurement.AccelerationZ)
65+
influx3AddFloat(p, "batteryVoltage", measurement.BatteryVoltage)
66+
influx3AddInt(p, "txPower", measurement.TxPower)
67+
influx3AddInt(p, "rssi", measurement.Rssi)
68+
influx3AddInt(p, "movementCounter", measurement.MovementCounter)
69+
influx3AddInt(p, "measurementSequenceNumber", measurement.MeasurementSequenceNumber)
70+
influx3AddFloat(p, "accelerationTotal", measurement.AccelerationTotal)
71+
influx3AddFloat(p, "absoluteHumidity", measurement.AbsoluteHumidity)
72+
influx3AddFloat(p, "dewPoint", measurement.DewPoint)
73+
influx3AddFloat(p, "equilibriumVaporPressure", measurement.EquilibriumVaporPressure)
74+
influx3AddFloat(p, "airDensity", measurement.AirDensity)
75+
influx3AddFloat(p, "accelerationAngleFromX", measurement.AccelerationAngleFromX)
76+
influx3AddFloat(p, "accelerationAngleFromY", measurement.AccelerationAngleFromY)
77+
influx3AddFloat(p, "accelerationAngleFromZ", measurement.AccelerationAngleFromZ)
78+
p.SetTimestamp(time.Now())
79+
err := client.WritePoints(context.Background(), p)
80+
if err != nil {
81+
log.WithError(err).Error("Failed to send data to InfluxDB3")
82+
}
83+
}(measurement)
8284
}
8385
client.Close()
8486
}()

0 commit comments

Comments
 (0)