Skip to content

Commit f708526

Browse files
authored
Merge pull request #18 from SkySingh04/slashexx/firebase-mass
feat: Firebase now supports mass transfer of data
2 parents ad5b59e + 2607f66 commit f708526

File tree

4 files changed

+41
-46
lines changed

4 files changed

+41
-46
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
.env
22
myenv/
33
firebaseConfig.json
4-
tests/firebaseConfig.json
4+
tests/firebaseConfig.json
5+
pbweb.json
6+
mongo.txt

config.yaml

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
errorhandling:
22
strategy: LOG_AND_CONTINUE
33
inputconfig:
4-
url: sdfxghjuk
5-
inputmethod: WebSocket
4+
topic: dsfdgfh
5+
url: sdfgbh
6+
inputmethod: Kafka
67
outputconfig:
7-
topic: bfhtd
8-
url: dfghjkl
9-
outputmethod: Kafka
8+
queuename: sfdgrfthgyj
9+
url: sefrgdthfy
10+
outputmethod: RabbitMQ

integrations/firebase.go

+31-40
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@ package integrations
33
import (
44
"context"
55
"encoding/json"
6-
"errors"
6+
_ "errors"
77
"fmt"
8-
"strings"
8+
_ "strings"
99
"sync"
1010
"time"
1111

1212
firebase "firebase.google.com/go"
13+
_ "google.golang.org/api/iterator"
1314
"google.golang.org/api/option"
1415

1516
"github.com/SkySingh04/fractal/interfaces"
@@ -30,8 +31,7 @@ type FirebaseDestination struct {
3031
}
3132

3233
func (f FirebaseSource) FetchData(req interfaces.Request) (interface{}, error) {
33-
logger.Infof("Connecting to Firebase Source: Collection=%s, Document=%s, using Service Account=%s",
34-
req.Collection, req.Document, req.CredentialFileAddr)
34+
logger.Infof("Connecting to Firebase Source: Collection=%s, using Service Account=%s", req.Collection, req.CredentialFileAddr)
3535

3636
opt := option.WithCredentialsFile(req.CredentialFileAddr)
3737
app, err := firebase.NewApp(context.Background(), nil, opt)
@@ -45,41 +45,30 @@ func (f FirebaseSource) FetchData(req interfaces.Request) (interface{}, error) {
4545
}
4646
defer client.Close()
4747

48-
dataChan := make(chan map[string]interface{}, 1)
49-
errChan := make(chan error, 1)
50-
var wg sync.WaitGroup
48+
docs, err := client.Collection(req.Collection).Documents(context.Background()).GetAll()
49+
if err != nil {
50+
return nil, fmt.Errorf("failed to fetch documents: %w", err)
51+
}
5152

52-
wg.Add(1)
53-
go func() {
54-
defer wg.Done()
55-
dsnap, err := client.Collection(req.Collection).Doc(req.Document).Get(context.Background())
56-
if err != nil {
57-
errChan <- fmt.Errorf("failed to fetch document from Firestore: %w", err)
58-
return
59-
}
53+
logger.Infof("Fetched documents from Firebase: %d documents", len(docs))
54+
for i, doc := range docs {
55+
logger.Infof("Document %d ID: %s, Data: %v", i, doc.Ref.ID, doc.Data())
56+
}
6057

61-
if !dsnap.Exists() {
62-
errChan <- fmt.Errorf("document not found: Collection=%s, Document=%s", req.Collection, req.Document)
63-
return
64-
}
58+
var allData []map[string]interface{}
59+
for _, doc := range docs {
60+
data := doc.Data()
61+
logger.Infof("Fetched data from Firebase: %v", data)
6562

66-
dataChan <- dsnap.Data()
67-
}()
63+
data["_id"] = doc.Ref.ID
6864

69-
wg.Wait()
70-
close(dataChan)
71-
close(errChan)
65+
validatedData := data
66+
transformedData := validatedData
7267

73-
select {
74-
case data := <-dataChan:
75-
validatedData, err := validateFirebaseData(data)
76-
if err != nil {
77-
return nil, err
78-
}
79-
return transformFirebaseData(validatedData), nil
80-
case err := <-errChan:
81-
return nil, err
68+
allData = append(allData, transformedData)
8269
}
70+
71+
return allData, nil
8372
}
8473

8574
func (f FirebaseDestination) SendData(data interface{}, req interfaces.Request) error {
@@ -127,6 +116,8 @@ func (f FirebaseDestination) SendData(data interface{}, req interfaces.Request)
127116
}
128117

129118
func convertToMap(data interface{}, result *map[string]interface{}) error {
119+
logger.Infof("Firebase data to map: %v", data)
120+
130121
temp, err := json.Marshal(data)
131122
if err != nil {
132123
return fmt.Errorf("failed to marshal data to JSON: %w", err)
@@ -140,18 +131,18 @@ func convertToMap(data interface{}, result *map[string]interface{}) error {
140131

141132
func validateFirebaseData(data map[string]interface{}) (map[string]interface{}, error) {
142133
logger.Infof("Validating Firebase data: %v", data)
143-
message, ok := data["data"].(string)
144-
if !ok || strings.TrimSpace(message) == "" {
145-
return nil, errors.New("invalid or missing 'data' field")
146-
}
134+
// // message, ok := data;
135+
// if !ok || strings.TrimSpace(message) == "" {
136+
// return nil, errors.New("invalid or missing 'data' field")
137+
// }
147138
return data, nil
148139
}
149140

150141
func transformFirebaseData(data map[string]interface{}) map[string]interface{} {
151142
logger.Infof("Transforming Firebase data: %v", data)
152-
if message, ok := data["data"].(string); ok {
153-
data["data"] = strings.ToUpper(message)
154-
}
143+
// if message, ok := data["data"].(string); ok {
144+
// data["data"] = strings.ToUpper(message)
145+
// }
155146
data["processed"] = time.Now().Format(time.RFC3339)
156147
return data
157148
}

integrations/mongodb.go

+1
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func init() {
161161
}
162162

163163
func TransformDataToBSON(data interface{}) ([]bson.M, error) {
164+
logger.Infof("Data received for BSON conversion insertion: %+v", data)
164165
switch v := data.(type) {
165166
case map[string]interface{}: // Single document
166167
return []bson.M{v}, nil

0 commit comments

Comments
 (0)