Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,23 @@ BIGQUERY_TABLE=
KINESIS_STREAM_NAME=
KINESIS_STREAM_REGION=

# Optional
## You have to specify this environment variable if you want to export Elasticsearch.
## e.g. ELASTICSEARCH_HOST=https://<user name>:<passward>@xxx.yyy.elastic-cloud.com
ELASTICSEARCH_HOST=
ELASTICSEARCH_INDEX_NAME=
## If synchronizing Mongo with Elasticsearch using changestream's fulldocument,
## set ELASTICSEARCH_SYNC_ENABLED to true. Default is false.
ELASTICSEARCH_SYNC_ENABLED=

# Optional
## You have to specify this environment variable if you want to export Cloud PubSub.
PUBSUB_TOPIC_NAME=

# Require
## Specify the location you want to export.
## e.g. EXPORT_DESTINATION=bigquery
## e.g. EXPORT_DESTINATION=bigquery,pubsub,kinesisStream,file
## e.g. EXPORT_DESTINATION=bigquery,pubsub,kinesisStream,elasticsearch,file
EXPORT_DESTINATION=

# Require
Expand Down
31 changes: 31 additions & 0 deletions application/export_change_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"github.com/cam-inc/mxtransporter/config"
pconfig "github.com/cam-inc/mxtransporter/config/pubsub"
interfaceForBigquery "github.com/cam-inc/mxtransporter/interfaces/bigquery"
interfaceForElasticsearch "github.com/cam-inc/mxtransporter/interfaces/elasticsearch"
iff "github.com/cam-inc/mxtransporter/interfaces/file"
interfaceForKinesisStream "github.com/cam-inc/mxtransporter/interfaces/kinesis-stream"
mongoConnection "github.com/cam-inc/mxtransporter/interfaces/mongo"
interfaceForPubsub "github.com/cam-inc/mxtransporter/interfaces/pubsub"
"github.com/cam-inc/mxtransporter/pkg/client"
"github.com/cam-inc/mxtransporter/pkg/errors"
irt "github.com/cam-inc/mxtransporter/usecases/resume-token"
elasticsearch "github.com/elastic/go-elasticsearch/v8"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand All @@ -32,6 +34,7 @@ const (
BigQuery agent = "bigquery"
CloudPubSub agent = "pubsub"
KinesisStream agent = "kinesisStream"
Elasticsearch agent = "elasticsearch"
File agent = "file"
)

Expand All @@ -40,6 +43,7 @@ type (
newBigqueryClient(ctx context.Context, projectID string) (*bigquery.Client, error)
newPubsubClient(ctx context.Context, projectID string) (*pubsub.Client, error)
newKinesisClient(ctx context.Context) (*kinesis.Client, error)
newElasticsearchClient(ctx context.Context) (*elasticsearch.TypedClient, error)
watch(ctx context.Context, ops *options.ChangeStreamOptions) (*mongo.ChangeStream, error)
newFileClient(ctx context.Context) (iff.Exporter, error)
setCsExporter(exporter ChangeStreamsExporterImpl)
Expand Down Expand Up @@ -82,6 +86,14 @@ func (*ChangeStreamsWatcherClientImpl) newKinesisClient(ctx context.Context) (*k
return ksClient, nil
}

func (*ChangeStreamsWatcherClientImpl) newElasticsearchClient(ctx context.Context) (*elasticsearch.TypedClient, error) {
esClient, err := client.NewElasticsearchClient(ctx)
if err != nil {
return nil, err
}
return esClient, nil
}

func (*ChangeStreamsWatcherClientImpl) newFileClient(_ context.Context) (iff.Exporter, error) {
return iff.New(config.FileExportConfig()), nil
}
Expand Down Expand Up @@ -147,6 +159,7 @@ func (c *ChangeStreamsWatcherImpl) WatchChangeStreams(ctx context.Context) error
bqImpl interfaceForBigquery.BigqueryImpl
psImpl interfaceForPubsub.PubsubImpl
ksImpl interfaceForKinesisStream.KinesisStreamImpl
esImpl interfaceForElasticsearch.ElasticsearchImpl
fe iff.Exporter
)

Expand Down Expand Up @@ -174,6 +187,13 @@ func (c *ChangeStreamsWatcherImpl) WatchChangeStreams(ctx context.Context) error
}
ksClientImpl := &interfaceForKinesisStream.KinesisStreamClientImpl{ksClient}
ksImpl = interfaceForKinesisStream.KinesisStreamImpl{ksClientImpl}
case Elasticsearch:
esClient, err := c.Watcher.newElasticsearchClient(ctx)
if err != nil {
return err
}
esClientImpl := &interfaceForElasticsearch.ElasticsearchClientImpl{esClient}
esImpl = interfaceForElasticsearch.ElasticsearchImpl{esClientImpl}
case File:
fCli, err := c.Watcher.newFileClient(ctx)
if err != nil {
Expand All @@ -190,6 +210,7 @@ func (c *ChangeStreamsWatcherImpl) WatchChangeStreams(ctx context.Context) error
bq: bqImpl,
pubsub: psImpl,
kinesisStream: ksImpl,
elasticsearch: esImpl,
fileExporter: fe,
resumeToken: c.resumeTokenManager,
}
Expand All @@ -215,6 +236,7 @@ type (
exportToBigquery(ctx context.Context, cs primitive.M) error
exportToPubsub(ctx context.Context, cs primitive.M) error
exportToKinesisStream(ctx context.Context, cs primitive.M) error
exportToElasticsearch(ctx context.Context, cs primitive.M) error
exportToFile(ctx context.Context, cs primitive.M) error
saveResumeToken(ctx context.Context, rt string) error
err() error
Expand All @@ -230,6 +252,7 @@ type (
bq interfaceForBigquery.BigqueryImpl
pubsub interfaceForPubsub.PubsubImpl
kinesisStream interfaceForKinesisStream.KinesisStreamImpl
elasticsearch interfaceForElasticsearch.ElasticsearchImpl
fileExporter iff.Exporter
resumeToken irt.ResumeToken
}
Expand Down Expand Up @@ -264,6 +287,10 @@ func (c *changeStreamsExporterClientImpl) exportToKinesisStream(ctx context.Cont
return c.kinesisStream.ExportToKinesisStream(ctx, cs)
}

func (c *changeStreamsExporterClientImpl) exportToElasticsearch(ctx context.Context, cs primitive.M) error {
return c.elasticsearch.ExportToElasticsearch(ctx, cs)
}

func (c *changeStreamsExporterClientImpl) exportToFile(ctx context.Context, cs primitive.M) error {
return c.fileExporter.Export(ctx, cs)
}
Expand Down Expand Up @@ -316,6 +343,10 @@ func (c *ChangeStreamsExporterImpl) exportChangeStreams(ctx context.Context) err
if err := c.exporter.exportToKinesisStream(ctx, csMap); err != nil {
return err
}
case Elasticsearch:
if err := c.exporter.exportToElasticsearch(ctx, csMap); err != nil {
return err
}
case File:
if err := c.exporter.exportToFile(ctx, csMap); err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions config/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ const (
PUBSUB_TOPIC_NAME = "PUBSUB_TOPIC_NAME"
PUBSUB_ORDERING_BY = "PUBSUB_ORDERING_BY"

ELASTICSEARCH_HOST = "ELASTICSEARCH_HOST"
ELASTICSEARCH_INDEX_NAME = "ELASTICSEARCH_INDEX_NAME"
ELASTICSEARCH_SYNC_ENABLED = "ELASTICSEARCH_SYNC_ENABLED"

MONGODB_HOST = "MONGODB_HOST"
MONGODB_DATABASE = "MONGODB_DATABASE"
MONGODB_COLLECTION = "MONGODB_COLLECTION"
Expand Down
30 changes: 30 additions & 0 deletions config/elasticsearch/elasticsearch_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package elasticsearch

import (
"os"
"strconv"

"github.com/cam-inc/mxtransporter/config/constant"
)

type Elasticsearch struct {
ElasticsearchConnectionUrl string
IndexName string
SyncEnabled bool
}

func getEnvAsBool(name string, defaultVal bool) bool {
valStr := os.Getenv(name)
if val, err := strconv.ParseBool(valStr); err == nil {
return val
}
return defaultVal
}

func ElasticsearchConfig() Elasticsearch {
var esCfg Elasticsearch
esCfg.ElasticsearchConnectionUrl = os.Getenv(constant.ELASTICSEARCH_HOST)
esCfg.IndexName = os.Getenv(constant.ELASTICSEARCH_INDEX_NAME)
esCfg.SyncEnabled = getEnvAsBool(constant.ELASTICSEARCH_SYNC_ENABLED, false)
return esCfg
}
Loading