|
1 | 1 | # Kafka Connect BigQuery Connector
|
2 | 2 |
|
3 |
| -[](https://travis-ci.org/wepay/kafka-connect-bigquery) |
4 |
| -[](https://codecov.io/gh/wepay/kafka-connect-bigquery) |
5 |
| - |
6 | 3 | This is an implementation of a sink connector from [Apache Kafka] to [Google BigQuery], built on top
|
7 |
| -of [Apache Kafka Connect]. For a comprehensive list of configuration options, see the [Connector Configuration Wiki]. |
| 4 | +of [Apache Kafka Connect]. |
8 | 5 |
|
9 | 6 | ## History
|
10 | 7 |
|
11 | 8 | This connector was [originally developed by WePay](https://github.com/wepay/kafka-connect-bigquery).
|
12 | 9 | In late 2020 the project moved to [Confluent](https://github.com/confluentinc/kafka-connect-bigquery),
|
13 |
| -with both companies taking on maintenance duties. All new activity such as filing issues and opening |
14 |
| -pull requests should now target the [Confluent](https://github.com/confluentinc/kafka-connect-bigquery) |
15 |
| -fork of the project. |
16 |
| - |
17 |
| -## Download |
18 |
| - |
19 |
| -The latest releases are available in the GitHub release tab, or via [Confluent Hub](https://www.confluent.io/hub/wepay/kafka-connect-bigquery). |
20 |
| - |
21 |
| -## Standalone Quickstart |
22 |
| - |
23 |
| -> **NOTE**: You must have the [Confluent Platform] installed in order to run the example. |
24 |
| -
|
25 |
| -### Configuration Basics |
26 |
| - |
27 |
| -Firstly, you need to specify configuration settings for your connector. These can be found in the |
28 |
| -`kcbq-connector/quickstart/properties/connector.properties` file. Look for this section: |
29 |
| - |
30 |
| -```plain |
31 |
| -########################################### Fill me in! ########################################### |
32 |
| -# The name of the BigQuery project to write to |
33 |
| -project= |
34 |
| -# The name of the BigQuery dataset to write to (leave the '.*=' at the beginning, enter your |
35 |
| -# dataset after it) |
36 |
| -datasets=.*= |
37 |
| -# The location of a BigQuery service account or user JSON credentials file |
38 |
| -# or service account credentials or user credentials in JSON format (non-escaped JSON blob) |
39 |
| -keyfile= |
40 |
| -# 'FILE' if keyfile is a credentials file, 'JSON' if it's a credentials JSON |
41 |
| -keySource=FILE |
42 |
| -``` |
43 |
| - |
44 |
| -You'll need to choose a BigQuery project to write to, a dataset from that project to write to, and |
45 |
| -provide the location of a JSON key file that can be used to access a BigQuery service account that |
46 |
| -can write to the project/dataset pair. Once you've decided on these properties, fill them in and |
47 |
| -save the properties file. |
48 |
| - |
49 |
| -Once you get more familiar with the connector, you might want to revisit the `connector.properties` |
50 |
| -file and experiment with tweaking its settings. |
51 |
| - |
52 |
| -#### Migrating to 2.x.x |
53 |
| -In accordance with the introduction of schema unionization in version 2.0.0, the following changes |
54 |
| -to configs have been introduced and should be made when migrating: |
55 |
| -1. `autoUpdateSchemas` has been removed |
56 |
| -2. `allowNewBigQueryFields` and `allowBigQueryRequiredFieldRelaxation` have been introduced |
57 |
| -3. `allowSchemaUnionization` has been introduced |
58 |
| - |
59 |
| -Setting `allowNewBigQueryFields` and `allowBigQueryRequiredFieldRelaxation` to `true` while |
60 |
| -setting `allowSchemaUnionization` to false results in the same behavior that setting `autoUpdateSchemas` |
61 |
| -to `true` used to. |
62 |
| - |
63 |
| -### Building and Extracting a Confluent Hub archive |
64 |
| - |
65 |
| -If you haven't already, move into the repository's top-level directory: |
66 |
| - |
67 |
| -```bash |
68 |
| -$ cd /path/to/kafka-connect-bigquery/ |
69 |
| -``` |
70 |
| - |
71 |
| -Begin by creating Confluent Hub archive of the connector with the Confluent Schema Retriever included: |
| 10 | +with both companies taking on maintenance duties. |
| 11 | +In 2024, Aiven created [its own fork](https://github.com/Aiven-Open/bigquery-connector-for-apache-kafka/) |
| 12 | +based off the Confluent project in order to continue maintaining an open source, Apache 2-licensed |
| 13 | +version of the connector. |
72 | 14 |
|
73 |
| -```bash |
74 |
| -$ mvn clean package -DskipTests |
75 |
| -``` |
76 |
| - |
77 |
| -And then extract its contents: |
78 |
| - |
79 |
| -```bash |
80 |
| -$ mkdir -p bin/jar/ && cp kcbq-connector/target/components/packages/wepay-kafka-connect-bigquery-*/wepay-kafka-connect-bigquery-*/lib/*.jar bin/jar/ |
81 |
| -``` |
82 |
| - |
83 |
| -### Setting-Up Background Processes |
| 15 | +## Configuration |
84 | 16 |
|
85 |
| -Then move into the `quickstart` directory: |
86 |
| - |
87 |
| -```bash |
88 |
| -$ cd kcbq-connector/quickstart/ |
89 |
| -``` |
90 |
| - |
91 |
| -After that, if your Confluent Platform installation isn't in a sibling directory to the connector, |
92 |
| -specify its location (and do so before starting each of the subsequent processes in their own |
93 |
| -terminal): |
94 |
| - |
95 |
| -```bash |
96 |
| -$ export CONFLUENT_DIR=/path/to/confluent |
97 |
| -``` |
98 |
| - |
99 |
| -Then, initialize the background processes necessary for Kafka Connect (one terminal per script): |
100 |
| -(Taken from http://docs.confluent.io/3.0.0/quickstart.html) |
101 |
| - |
102 |
| -```bash |
103 |
| -$ ./zookeeper.sh |
104 |
| -``` |
105 |
| - |
106 |
| -(wait a little while for it to get on its feet) |
107 |
| - |
108 |
| -```bash |
109 |
| -$ ./kafka.sh |
110 |
| -``` |
111 |
| - |
112 |
| -(wait a little while for it to get on its feet) |
113 |
| - |
114 |
| -```bash |
115 |
| -$ ./schema-registry.sh |
116 |
| -``` |
| 17 | +### Sample |
117 | 18 |
|
118 |
| -(wait a little while for it to get on its feet) |
119 |
| - |
120 |
| -### Initializing the Avro Console Producer |
121 |
| - |
122 |
| -Next, initialize the Avro Console Producer (also in its own terminal): |
123 |
| - |
124 |
| -```bash |
125 |
| -$ ./avro-console-producer.sh |
126 |
| -``` |
127 |
| - |
128 |
| -Give it some data to start off with (type directly into the Avro Console Producer instance): |
| 19 | +An example connector configuration, that reads records from Kafka with |
| 20 | +JSON-encoded values and writes their values to BigQuery: |
129 | 21 |
|
130 | 22 | ```json
|
131 |
| -{"f1":"Testing the Kafka-BigQuery Connector!"} |
132 |
| -``` |
133 |
| - |
134 |
| -### Running the Connector |
135 |
| - |
136 |
| -Finally, initialize the BigQuery connector (also in its own terminal): |
| 23 | +{ |
| 24 | + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", |
| 25 | + "topics": "users, clicks, payments", |
| 26 | + "tasks.max": "3", |
| 27 | + "value.converter": "org.apache.kafka.connect.json.JsonConverter", |
137 | 28 |
|
138 |
| -```bash |
139 |
| -$ ./connector.sh |
| 29 | + "project": "kafka-ingest-testing", |
| 30 | + "defaultDataset": "kcbq-example", |
| 31 | + "keyfile": "/tmp/bigquery-credentials.json" |
| 32 | +} |
140 | 33 | ```
|
141 | 34 |
|
142 |
| -### Piping Data Through the Connector |
143 |
| - |
144 |
| -Now you can enter Avro messages of the schema `{"f1": "$SOME_STRING"}` into the Avro Console |
145 |
| -Producer instance, and the pipeline instance should write them to BigQuery. |
146 |
| - |
147 |
| -If you want to get more adventurous, you can experiment with different schemas or topics by |
148 |
| -adjusting flags given to the Avro Console Producer and tweaking the config settings found in the |
149 |
| -`kcbq-connector/quickstart/properties` directory. |
| 35 | +### Complete docs |
| 36 | +See [here](docs/sink-connector-config-options.rst) for a list of the connector's |
| 37 | +configuration properties. |
150 | 38 |
|
151 |
| -## Integration Testing the Connector |
152 |
| - |
153 |
| -### Configuring the tests |
154 |
| - |
155 |
| -You must supply the following environment variables in order to run the tests: |
156 |
| - |
157 |
| -- `$KCBQ_TEST_PROJECT`: The name of the BigQuery project to use for the test |
158 |
| -- `$KCBQ_TEST_DATASET`: The name of the BigQuery dataset to use for the test |
159 |
| -- `$KCBQ_TEST_KEYFILE`: The key file used to authenticate with BigQuery during the test |
160 |
| -- `$KCBQ_TEST_BUCKET`: The name of the GCS bucket to use (for testing the GCS batch loading feature) |
161 |
| - |
162 |
| -The `$KCBQ_TEST_FOLDER` variable can be supplied to specify which subfolder of the GCS bucket should |
163 |
| -be used when testing the GCS batch loading feature; if not supplied, the top-level folder will be |
164 |
| -used. |
165 |
| - |
166 |
| -### Adding new GCP Credentials & BigQuery DataSet |
167 |
| -This section is optional in case one wants to use a different GCP project and generate new creds for that |
168 |
| -- **Create a GCP Service Account:** Follow instructions from https://cloud.google.com/iam/docs/creating-managing-service-accounts e.g. |
169 |
| -``` |
170 |
| -gcloud iam service-accounts create kcbq-test --description="service account key for bigquery sink integration test" --display-name="kcbq-test" |
171 |
| -``` |
172 |
| -- **Create Service Account Keys:** Follow instructions from https://cloud.google.com/iam/docs/creating-managing-service-account-keys e.g. |
173 |
| -``` |
174 |
| -gcloud iam service-accounts keys create /tmp/creds.json --iam-account=kcbq-test@<GCP_PROJECT_NAME>.iam.gserviceaccount.com |
175 |
| -``` |
176 |
| -- **Give BigQuery & Storage Admin Permissions to Service Account:** |
177 |
| - - Open https://console.cloud.google.com/iam-admin/iam?project=<GCP_PROJECT_NAME> |
178 |
| - - Click on Add and enter New Principal as created above e.g. `kcbq-test@<GCP_PROJECT_NAME>.iam.gserviceaccount.com` |
179 |
| - - Add following 2 roles from "Select a role" drop down menu: |
180 |
| - - BigQuery -> BigQuery Admin |
181 |
| - - Cloud Storage -> Storage Admin |
182 |
| -- **Add a BigQuery DataSet into the Project:** |
183 |
| - - Open https://console.cloud.google.com/bigquery?project=<GCP_PROJECT_NAME> |
184 |
| - - Click on the 3 vertical dots against the project name and click on "Create dataset" and follow the steps there. |
185 |
| - |
186 |
| -### Running the Integration Tests |
187 |
| - |
188 |
| -```bash |
189 |
| -# (Re)builds the project and runs the integration tests, skipping unit tests to save a bit of time |
190 |
| -mvn clean package integration-test -Dskip.unit.tests=true |
191 |
| -``` |
192 |
| - |
193 |
| -### How Integration Testing Works |
194 |
| - |
195 |
| -Integration tests run by creating embedded instances for [Zookeeper], [Kafka], [Schema Registry], |
196 |
| -and the BigQuery Connector itself, then verifying the results using a [JUnit] test. |
197 |
| - |
198 |
| -They use schemas and data that can be found in the |
199 |
| -`kcbq-connector/src/test/resources/integration_test_cases/` directory, and rely on a user-provided |
200 |
| -JSON key file (like in the `quickstart` example) to access BigQuery. |
201 |
| - |
202 |
| -### Data Corruption Concerns |
203 |
| - |
204 |
| -In order to ensure the validity of each test, any table that will be written to in the course of |
205 |
| -integration testing is preemptively deleted before the connector is run. This will only be an issue |
206 |
| -if you have any tables in your dataset whose names begin with `kcbq_test_` and match the sanitized |
207 |
| -name of any of the `test_schema` subdirectories. If that is the case, you should probably consider |
208 |
| -writing to a different project/dataset. |
209 |
| - |
210 |
| -Kafka, Schema Registry, Zookeeper, and Kafka Connect are all run as temporary embedded instances, so |
211 |
| -there is no risk that running integration tests will corrupt any existing data that is already on |
212 |
| -your machine, and there is also no need to free up any of your ports that might currently be in use |
213 |
| -by instances of the services that are brought up in the process of testing. |
214 |
| - |
215 |
| -### Adding New Integration Tests |
216 |
| - |
217 |
| -Adding an integration test is a little more involved, and consists of two major steps: specifying |
218 |
| -Avro data to be sent to Kafka, and specifying via JUnit test how to verify that such data made |
219 |
| -it to BigQuery as expected. |
220 |
| - |
221 |
| -To specify input data, you must create a new directory in the |
222 |
| -`kcbq-connector/src/test/resources/integration_test_cases/` directory with whatever name you want |
223 |
| -the Kafka topic of your test to be named, and whatever string you want the name of your test's |
224 |
| -BigQuery table to be derived from. Then, create two files in that directory: |
225 |
| - |
226 |
| -* `schema.json` will contain the Avro schema of the type of data the new test will send |
227 |
| -through the connector. |
228 |
| - |
229 |
| -* `data.json` will contain a series of JSON objects, each of which should represent an [Avro] record |
230 |
| -that matches the specified schema. **Each JSON object must occupy its own line, and each object |
231 |
| -cannot occupy more than one line** (this inconvenience is due to limitations in the Avro |
232 |
| -Console Producer, and may be addressed in future commits). |
233 |
| - |
234 |
| -To specify data verification, add to the test cases present in the |
235 |
| -`kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/integration/BigQuerySinkConnectorIT.java` |
| 39 | +## Download |
236 | 40 |
|
237 |
| -> **NOTE**: Because the order of rows is not guaranteed when reading test results from BigQuery, |
238 |
| -you must include a numeric column named "row" number in all of your test schemas, and every row of |
239 |
| -test data must have a unique value for its row number. When data is read back from BigQuery to |
240 |
| -verify its accuracy, it will be returned in ascending order based on that "row" column. |
| 41 | +Releases are available in the GitHub release tab. |
| 42 | +<!-- TODO: |
| 43 | + Mention first Aiven-published release (which will be the first to |
| 44 | + include executable artifacts) |
| 45 | +--> |
241 | 46 |
|
242 |
| - [Apache Avro]: https://avro.apache.org |
243 |
| - [Apache Kafka Connect]: http://docs.confluent.io/current/connect/ |
| 47 | + [Apache Kafka Connect]: https://kafka.apache.org/documentation.html#connect |
244 | 48 | [Apache Kafka]: http://kafka.apache.org
|
245 |
| - [Apache Maven]: https://maven.apache.org |
246 |
| - [Avro]: https://avro.apache.org |
247 |
| - [BigQuery]: https://cloud.google.com/bigquery/ |
248 |
| - [Confluent Platform]: http://docs.confluent.io/current/installation.html |
249 |
| - [Connector Configuration Wiki]: https://github.com/wepay/kafka-connect-bigquery/wiki/Connector-Configuration |
250 | 49 | [Google BigQuery]: https://cloud.google.com/bigquery/
|
251 |
| - [JUnit]: http://junit.org |
252 |
| - [Kafka Connect]: http://docs.confluent.io/current/connect/ |
253 | 50 | [Kafka]: http://kafka.apache.org
|
254 |
| - [Maven]: https://maven.apache.org |
255 |
| - [Schema Registry]: https://github.com/confluentinc/schema-registry |
256 |
| - [Semantic Versioning]: http://semver.org |
257 |
| - [Zookeeper]: https://zookeeper.apache.org |
258 |
| - |
259 |
| -## Implementation details of different modes |
260 |
| -### Upsert/Delete with Legacy InsertAll API |
261 |
| -Click [here](https://docs.google.com/document/d/1p8_rLQqR9GIALIruB3-MjqR8EgYdaEw2rlFF1fxRJf0/edit#heading=h.lfiuaruj2s8y) to read the implementation details of upsert/delete mode with Legacy InsertAll API |
262 |
| - |
263 |
| - |
0 commit comments