|
| 1 | +columns: |
| 2 | + - column: "Parameter" |
| 3 | + - column: "Description" |
| 4 | +rows: |
| 5 | + - "Parameter": "`<table_name>`" |
| 6 | + "Description": | |
| 7 | +
|
| 8 | + The name of the table to create. Names for tables must follow the [naming |
| 9 | + guidelines](/sql/identifiers/#naming-restrictions). |
| 10 | +
|
| 11 | + - "Parameter": "`<source_name>`" |
| 12 | + "Description": | |
| 13 | +
|
| 14 | + The name of the [source](/sql/create-source/kafka/) created for the Kafka topic. |
| 15 | +
|
| 16 | + - "Parameter": "**(REFERENCE <ref_object>)**" |
| 17 | + "Description": | |
| 18 | +
|
| 19 | + *Optional.* If specified, the topic (which should match the topic |
| 20 | + specified in the source) from which to create the table. You can create |
| 21 | + multiple tables from the same reference object. |
| 22 | +
|
| 23 | + To find the reference objects available in your |
| 24 | + [source](/sql/create-source/), you can use the following query, |
| 25 | + substituting your source name for `<source_name>`: |
| 26 | +
|
| 27 | + <br> |
| 28 | +
|
| 29 | + ```mzsql |
| 30 | + SELECT refs.* |
| 31 | + FROM mz_internal.mz_source_references refs, mz_sources s |
| 32 | + WHERE s.name = '<source_name>' -- substitute with your source name |
| 33 | + AND refs.source_id = s.id; |
| 34 | + ``` |
| 35 | +
|
| 36 | + - "Parameter": | |
| 37 | + **FORMAT \<format\> | |
| 38 | + KEY FORMAT \<format\> VALUE FORMAT \<format\>** |
| 39 | + "Description": | |
| 40 | +
|
| 41 | + *Optional.* If specified, use the specified format to decode the data. The following `<format>`s are supported: |
| 42 | +
|
| 43 | + | Format | Description | |
| 44 | + |--------|-------------| |
| 45 | + | `AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION <csr_connection> [KEY STRATEGY <strategy> VALUE STRATEGY <strategy>]` | Decode the data as Avro, specifying the [Confluent Schema Registry connection](/sql/create-connection/#confluent-schema-registry) to use. You can also specify the `KEY STRATEGY` and `VALUE STRATEGY` to use: <table> <thead> <tr> <th>Strategy</th> <th>Description</th> </tr> </thead> <tbody> <tr> <td><code>LATEST</code></td> <td>(Default) Use the latest writer schema from the schema registry as the reader schema.</td> </tr> <tr> <td><code>ID</code></td> <td>Use a specific schema from the registry.</td> </tr> <tr> <td><code>INLINE</code></td> <td>Use the inline schema.</td> </tr> </tbody> </table>| |
| 46 | + | `PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION <csr_connection>` | Decode the data as Protocol Buffers, specifying the [Confluent Schema Registry connection](/sql/create-connection/#confluent-schema-registry) to use. | |
| 47 | + | `PROTOBUF MESSAGE <msg_name> USING SCHEMA <encoded_schema>` | Decode the data as Protocol Buffers, specifying the `<msg_name>` and the inline `<encoded_schema>` descriptor to use. | |
| 48 | + | `JSON` | Decode the data as JSON. | |
| 49 | + | `TEXT` | Decode the data as TEXT. | |
| 50 | + | `BYTES` | Decode the data as BYTES. | |
| 51 | + | `CSV WITH HEADER ( <col_name>[, ...]) [DELIMITED BY <char>]` | Parse the data as CSV with a header row. Materialize uses this header to infer both the number of columns and their names. The header is **not** ingested as data. The optional `DELIMITED BY <char>` clause specifies the delimiter character. <br><br>The data is decoded as [`text`](/sql/types/text). You can convert the data to other to other types using explicit [casts](/sql/functions/cast/) when creating views.| |
| 52 | + | `CSV WITH <num> COLUMNS DELIMITED BY <char>` | Parse the data as CSV with a specified number of columns and a specified delimiter. The columns are named `column1`, `column2`...`columnN`. <br><br> The data is decoded as [`text`](/sql/types/text). You can convert the data to other to other types using explicit [casts](/sql/functions/cast/) when creating views.| |
| 53 | +
|
| 54 | + {{< include-md file="shared-content/kafka-format-envelope-compat-table.md" |
| 55 | + >}} |
| 56 | +
|
| 57 | + For more information, see [Creating a source](/sql/create-source/kafka/#creating-a-source). |
| 58 | +
|
| 59 | + - "Parameter": | |
| 60 | + **INCLUDE \<include_option\>** |
| 61 | + "Description": | |
| 62 | +
|
| 63 | + *Optional.* If specified, include the additional information as column(s) in the table. The following `<include_option>`s are supported: |
| 64 | +
|
| 65 | + | Option | Description | |
| 66 | + |--------|-------------| |
| 67 | + | **KEY [AS \<name\>]** | Include a column containing the Kafka message key. If the key is encoded using a format that includes schemas the column will take its name from the schema. For unnamed formats (e.g. `TEXT`), the column will be named `key`. The column can be renamed with the optional **AS** *name* statement. |
| 68 | + | **PARTITION [AS \<name\>]** | Include a `partition` column containing the Kafka message partition. The column can be renamed with the optional **AS** *name* clause. |
| 69 | + | **OFFSET [AS \<name\>]** | Include an `offset` column containing the Kafka message offset. The column can be renamed with the optional **AS** *name* clause. |
| 70 | + | **TIMESTAMP [AS \<name\>]** | Include a `timestamp` column containing the Kafka message timestamp. The column can be renamed with the optional **AS** *name* clause. <br><br>Note that the timestamp of a Kafka message depends on how the topic and its producers are configured. See the [Confluent documentation](https://docs.confluent.io/3.0.0/streams/concepts.html?#time) for details. |
| 71 | + | **HEADERS [AS \<name\>]** | Include a `headers` column containing the Kafka message headers as a list of records of type `(key text, value bytea)`. The column can be renamed with the optional **AS** *name* clause. |
| 72 | + | **HEADER \<key\> AS \<name\> [**BYTES**]** | Include a *name* column containing the Kafka message header *key* parsed as a UTF-8 string. To expose the header value as `bytea`, use the `BYTES` option. |
| 73 | +
|
| 74 | + - "Parameter": | |
| 75 | + **ENVELOPE \<envelope\>** |
| 76 | + "Description": | |
| 77 | +
|
| 78 | + *Optional.* If specified, use the specified envelope. The following `<envelope>`s are supported: |
| 79 | +
|
| 80 | + | Envelope | Description | |
| 81 | + |----------|-------------| |
| 82 | + | **ENVELOPE NONE** | *Default*. Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted. |
| 83 | + | **ENVELOPE DEBEZIUM** | Use the Debezium envelope, which uses a diff envelope to handle CRUD operations. This envelope can lead to **high memory utilization** in the cluster maintaining the source. Materialize can automatically offload processing to disk as needed. See [spilling to disk](/sql/create-source/kafka/#spilling-to-disk) for details. For more information, see [Using Debezium](/sql/create-source/kafka/#using-debezium). |
| 84 | + | **ENVELOPE UPSERT** [**(VALUE DECODING ERRORS = INLINE)**] | Use the upsert envelope, which uses message keys to handle CRUD operations. To handle value decoding errors, use the `(VALUE DECODING ERRORS = INLINE)` option. For more information, see [Handling upserts](/sql/create-source/kafka/#handling-upserts) and [Value decoding errors](/sql/create-source/kafka/#value-decoding-errors). |
| 85 | +
|
| 86 | + {{< include-md file="shared-content/kafka-format-envelope-compat-table.md" >}} |
| 87 | +
|
0 commit comments