Go Dcp SQL streams documents from Couchbase Database Change Protocol (DCP) and writes to SQL tables in near real-time.
- Custom SQL queries per DCP event.
- Update multiple documents for a DCP event(see Example).
- Handling different DCP events such as expiration, deletion and mutation(see Example).
- Managing batch configurations such as maximum batch size, batch ticker durations.
- Scale up and down by custom membership algorithms(Couchbase, KubernetesHa, Kubernetes StatefulSet or Static, see examples).
- Easily manageable configurations.
Note: If you prefer to use the default mapper by entering the configuration instead of creating a custom mapper, please refer to this topic. Otherwise, you can refer to the example provided below:
package main
import (
_ "github.com/lib/pq" // DON'T FORGET TO ADD THE DRIVER
)
func mapper(event couchbase.Event) []sql.Model {
var raw = sql.Raw{
Query: fmt.Sprintf(
"INSERT INTO `example-schema`.`example-table` (key, value) VALUES ($1, $2);",
),
Args: []interface{}{
string(event.Key),
string(event.Value),
},
}
return []sql.Model{&raw}
}
func main() {
connector, err := dcpsql.NewConnectorBuilder("config.yml").
SetMapper(mapper). // NOT NEEDED IF YOU'RE USING DEFAULT MAPPER. JUST CALL Build() FUNCTION
Build()
if err != nil {
panic(err)
}
defer connector.Close()
connector.Start()
}
Check out on go-dcp
Variable | Type | Required | Default | Description |
---|---|---|---|---|
sql.host |
string | yes | SQL connection urls | |
sql.user |
string | yes | SQL username | |
sql.password |
string | yes | SQL password | |
sql.dbName |
string | yes | SQL database name | |
sql.sslMode |
string | no | disable | Enabling SQL SSL mode |
sql.driverName |
string | yes | Driver name | |
sql.port |
int | yes | SQL port | |
sql.batchTickerDuration |
time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch |
sql.collectionTableMapping |
[]CollectionTableMapping | no | 10s | Will be used for default mapper. Please read the next topic. |
Collection table mapping configuration is optional. This configuration should only be provided if you are using the default mapper. If you are implementing your own custom mapper function, this configuration is not needed.
Variable | Type | Required | Default | Description |
---|---|---|---|---|
sql.collectionTableMapping[].collection |
string | yes | Couchbase collection name | |
sql.collectionTableMapping[].tableName |
string | yes | Target SQL table name | |
sql.collectionTableMapping[].keyColumnName |
string | yes | Column name for document key in SQL table | |
sql.collectionTableMapping[].valueColumnName |
string | yes | Column name for document value in SQL table | |
sql.collectionTableMapping[].audit.enabled |
bool | no | Enable audit columns for tracking document changes | |
sql.collectionTableMapping[].audit.createdAtColumnName |
string | no | Column name for tracking document creation time | |
sql.collectionTableMapping[].audit.updatedAtColumnName |
string | no | Column name for tracking document update time |
Metric Name | Description | Labels | Value Type |
---|---|---|---|
sql_connector_latency_ms | Time to adding to the batch. | N/A | Gauge |
sql_connector_bulk_request_process_latency_ms | Time to process bulk request. | N/A | Gauge |
You can also use all DCP-related metrics explained here. All DCP-related metrics are automatically injected. It means you don't need to do anything.
Go Dcp SQL is always open for direct contributions. For more information please check our Contribution Guideline document.
Released under the MIT License.