Skip to content

Commit 43dc46d

Browse files
committed
2 parents a148191 + f4260a6 commit 43dc46d

9 files changed

+85
-229
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
.env
2-
myenv/
2+
myenv/
3+
firebaseConfig.json
4+
tests/firebaseConfig.json

Readme.MD

+16-19
Original file line numberDiff line numberDiff line change
@@ -155,28 +155,25 @@ Depending on the data source, fields can be named using:
155155
The rules can be embedded into the YAML configuration for pipelines:
156156

157157
```yaml
158+
pipeline:
159+
error-handling:
160+
strategy: LOG_AND_CONTINUE
158161
inputconfig:
159-
url: wss://echo.websocket.org
160-
inputmethod: WebSocket
161-
162+
csvsourcefilename: sample.csv
163+
inputmethod: CSV
162164
outputconfig:
163-
url: wss://echo.websocket.org
164-
outputmethod: WebSocket
165-
166-
validation:
167-
rules: |
168-
FIELD("message") REQUIRED
169-
FIELD("timestamp") TYPE(INT) RANGE(1609459200, 1704067200)
170-
171-
transformation:
172-
rules: |
173-
ADD_FIELD("processed_at", CURRENT_TIME())
174-
IF FIELD("message") MATCHES("^hello") THEN RENAME("message", "greeting")
175-
MAP("status", {"0": "inactive", "1": "active"})
176-
177-
error_handling:
178-
on_error: LOG_AND_CONTINUE
165+
csvdestinationfilename: test.csv
166+
outputmethod: CSV
167+
cronjob:
168+
repetition_interval: "1h"
169+
monitoring:
170+
job_status:"pending"
171+
transformations:
172+
-ADD_FIELD("processed_at", CURRENT_TIME())
173+
validations:
174+
-FIELD("age") RANGE(30,35)
179175
```
176+
180177
---
181178

182179
# Adding a New Integration

config.yaml

+5-12
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,9 @@
11
errorhandling:
22
strategy: LOG_AND_CONTINUE
33
inputconfig:
4-
csvsourcefilename: sample.csv
5-
inputmethod: CSV
4+
url: sdfxghjuk
5+
inputmethod: WebSocket
66
outputconfig:
7-
csvdestinationfilename: samplenew.csv
8-
outputmethod: CSV
9-
cronjob:
10-
cronexpression: "@every 5s" # Every 5 seconds
11-
jobname: "Data Fetch and Process"
12-
task: "process_csv"
13-
transformations: |
14-
ADD_FIELD("processed_at", CURRENT_TIME())
15-
validations: |
16-
FIELD("age") TYPE(INT) RANGE(30, 35)
7+
topic: bfhtd
8+
url: dfghjkl
9+
outputmethod: Kafka

firebaseConfig.json

-13
This file was deleted.

go.mod

+6-7
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ toolchain go1.22.9
66

77
require (
88
firebase.google.com/go v3.13.0+incompatible
9-
github.com/golang/mock v1.1.1
109
github.com/jlaffaye/ftp v0.2.0
1110
github.com/manifoldco/promptui v0.9.0
1211
github.com/pkg/sftp v1.13.7
@@ -54,7 +53,7 @@ require (
5453

5554
require (
5655
cloud.google.com/go v0.116.0 // indirect
57-
cloud.google.com/go/auth v0.9.9 // indirect
56+
cloud.google.com/go/auth v0.11.0 // indirect
5857
cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect
5958
cloud.google.com/go/compute/metadata v0.5.2 // indirect
6059
cloud.google.com/go/iam v1.2.1 // indirect
@@ -122,13 +121,13 @@ require (
122121
go.opentelemetry.io/otel/trace v1.32.0
123122
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
124123
go.uber.org/mock v0.5.0 // indirect
125-
golang.org/x/crypto v0.28.0
124+
golang.org/x/crypto v0.31.0
126125
golang.org/x/net v0.30.0 // indirect
127126
golang.org/x/oauth2 v0.24.0 // indirect
128-
golang.org/x/sync v0.8.0 // indirect
129-
golang.org/x/sys v0.27.0 // indirect
130-
golang.org/x/term v0.25.0 // indirect
131-
golang.org/x/text v0.19.0 // indirect
127+
golang.org/x/sync v0.10.0 // indirect
128+
golang.org/x/sys v0.28.0 // indirect
129+
golang.org/x/term v0.27.0 // indirect
130+
golang.org/x/text v0.21.0 // indirect
132131
golang.org/x/time v0.7.0 // indirect
133132
google.golang.org/api v0.203.0
134133
google.golang.org/genproto v0.0.0-20241015192408-796eee8c2d53 // indirect

go.sum

+12-13
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
22
cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE=
33
cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U=
4-
cloud.google.com/go/auth v0.9.9 h1:BmtbpNQozo8ZwW2t7QJjnrQtdganSdmqeIBxHxNkEZQ=
5-
cloud.google.com/go/auth v0.9.9/go.mod h1:xxA5AqpDrvS+Gkmo9RqrGGRh6WSNKKOXhY3zNOr38tI=
4+
cloud.google.com/go/auth v0.11.0 h1:Ic5SZz2lsvbYcWT5dfjNWgw6tTlGi2Wc8hyQSC9BstA=
5+
cloud.google.com/go/auth v0.11.0/go.mod h1:xxA5AqpDrvS+Gkmo9RqrGGRh6WSNKKOXhY3zNOr38tI=
66
cloud.google.com/go/auth/oauth2adapt v0.2.4 h1:0GWE/FUsXhf6C+jAkWgYm7X9tK8cuEIfy19DBn6B6bY=
77
cloud.google.com/go/auth/oauth2adapt v0.2.4/go.mod h1:jC/jOpwFP6JBxhB3P5Rr0a9HLMC/Pe3eaL4NmdvqPtc=
88
cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo=
@@ -92,7 +92,6 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
9292
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
9393
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
9494
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
95-
github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
9695
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
9796
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
9897
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@@ -331,8 +330,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
331330
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
332331
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
333332
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
334-
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
335-
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
333+
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
334+
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
336335
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
337336
golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 h1:mchzmB1XO2pMaKFRqk/+MV3mgGG96aqaPXaMifQU47w=
338337
golang.org/x/exp v0.0.0-20231108232855-2478ac86f678/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
@@ -372,8 +371,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
372371
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
373372
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
374373
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
375-
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
376-
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
374+
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
375+
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
377376
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
378377
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
379378
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -390,16 +389,16 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
390389
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
391390
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
392391
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
393-
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
394-
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
392+
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
393+
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
395394
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
396395
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
397396
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
398397
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
399398
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
400399
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
401-
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
402-
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
400+
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
401+
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
403402
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
404403
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
405404
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
@@ -408,8 +407,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
408407
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
409408
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
410409
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
411-
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
412-
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
410+
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
411+
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
413412
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
414413
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
415414
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

integrations/mongodb.go

+43-40
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package integrations
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"log"
78
"sync"
89

@@ -108,61 +109,46 @@ func (m MongoDBDestination) SendData(data interface{}, req interfaces.Request) e
108109
}
109110
logger.Infof("Connecting to MongoDB destination...")
110111

112+
// Initialize MongoDB client
111113
clientOptions := options.Client().ApplyURI(req.TargetMongoDBConnString)
112114
client, err := mongo.Connect(context.TODO(), clientOptions)
113115
if err != nil {
114-
return err
116+
return fmt.Errorf("failed to connect to MongoDB: %w", err)
115117
}
116118
defer func() {
117119
if err = client.Disconnect(context.TODO()); err != nil {
118120
logger.Errorf("Error disconnecting MongoDB client: %v", err)
119121
}
120122
}()
121123

122-
collection := client.Database(req.TargetMongoDBDatabase).Collection(req.TargetMongoDBCollection)
123-
124-
// Assert that data is a slice of bson.M
125-
dataSlice, ok := data.([]bson.M)
126-
if !ok {
127-
logger.Errorf("data must be a slice of bson.M representing documents")
128-
return errors.New("invalid data format: expected []bson.M")
124+
// Transform data to BSON
125+
bsonData, err := TransformDataToBSON(data)
126+
if err != nil {
127+
return fmt.Errorf("data transformation failed: %w", err)
129128
}
130129

131-
// Buffered channel for sending documents
132-
dataChannel := make(chan bson.M, bufferSize)
133-
errorChannel := make(chan error, bufferSize)
134-
var wg sync.WaitGroup
135-
136-
// Goroutines for worker pool
137-
for i := 0; i < bufferSize; i++ { // Worker pool
138-
wg.Add(1)
139-
go func() {
140-
defer wg.Done()
141-
for doc := range dataChannel {
142-
if _, err := collection.InsertOne(context.TODO(), doc); err != nil {
143-
logger.Errorf("Error inserting into collection %s: %v", req.TargetMongoDBCollection, err)
144-
errorChannel <- err
145-
} else {
146-
logger.Infof("Data sent to MongoDB target collection %s: %v", req.TargetMongoDBCollection, doc)
147-
}
148-
}
149-
}()
150-
}
130+
// Access database and collection
131+
collection := client.Database(req.TargetMongoDBDatabase).Collection(req.TargetMongoDBCollection)
151132

152-
// Feed data into the channel
153-
go func() {
154-
for _, doc := range dataSlice {
155-
dataChannel <- doc
133+
// Insert data into MongoDB
134+
if len(bsonData) == 1 {
135+
// Insert a single document
136+
_, err = collection.InsertOne(context.TODO(), bsonData[0])
137+
if err != nil {
138+
return fmt.Errorf("failed to insert document: %w", err)
156139
}
157-
close(dataChannel)
158-
}()
159140

160-
wg.Wait()
161-
close(errorChannel)
162-
163-
// Check for errors in the error channel
164-
if len(errorChannel) > 0 {
165-
return errors.New("one or more errors occurred while inserting data")
141+
} else {
142+
// Insert multiple documents
143+
docs := make([]interface{}, len(bsonData))
144+
for i, doc := range bsonData {
145+
docs[i] = doc
146+
}
147+
_, err = collection.InsertMany(context.TODO(), docs)
148+
if err != nil {
149+
return fmt.Errorf("failed to insert documents: %w", err)
150+
}
151+
logger.Infof("Successfully inserted %d documents into MongoDB collection %s", len(bsonData), req.TargetMongoDBCollection)
166152
}
167153

168154
return nil
@@ -173,3 +159,20 @@ func init() {
173159
registry.RegisterSource("MongoDB", MongoDBSource{})
174160
registry.RegisterDestination("MongoDB", MongoDBDestination{})
175161
}
162+
163+
func TransformDataToBSON(data interface{}) ([]bson.M, error) {
164+
switch v := data.(type) {
165+
case map[string]interface{}: // Single document
166+
return []bson.M{v}, nil
167+
case []map[string]interface{}: // Multiple documents
168+
result := make([]bson.M, len(v))
169+
for i, item := range v {
170+
result[i] = item
171+
}
172+
return result, nil
173+
case []bson.M: // Already in bson.M
174+
return v, nil
175+
default:
176+
return nil, fmt.Errorf("unsupported data format: %T", v)
177+
}
178+
}

tests/firebaseConfig.json

-13
This file was deleted.

0 commit comments

Comments
 (0)