Skip to content

Commit ce2b2f7

Browse files
authored
Support include message key to metadata (#1239)
1 parent 55d2220 commit ce2b2f7

File tree

11 files changed

+59
-17
lines changed

11 files changed

+59
-17
lines changed

docs/aws-s3-sink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ Before using the AWS S3 sink connector, you need to configure it. This table out
153153
| `useHumanReadableSchemaVersion` | Boolean | False | false | false | Use a human-readable format string for the schema version in the message metadata. If it is set to `true`, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format. |
154154
| `includeTopicToMetadata` | Boolean | False | false | false | Include the topic name to the metadata. |
155155
| `includePublishTimeToMetadata` | Boolean | False | false | false | Include the message publish time to the metadata as a timestamp. |
156+
| `includeMessageKeyToMetadata` | Boolean | False | false | false | Include the message key to the metadata as a string. |
156157
| `avroCodec` | String | False | false | snappy | Compression codec used when formatType=`avro`. Available compression types are: none (no compression), deflate, bzip2, xz, zstandard, snappy. |
157158
| `parquetCodec` | String | False | false | gzip | Compression codec used when formatType=`parquet`. Available compression types are: none (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
158159
| `jsonAllowNaN` | Boolean | False | false | false | Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=`json`. Since JSON specification does not allow such values this is a non-standard feature and disabled by default. |

docs/azure-blob-storage-sink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ Before using the Azure Blob Storage sink connector, you need to configure it. Th
134134
| `useHumanReadableSchemaVersion` | Boolean | False | false | false | Use a human-readable format string for the schema version in the message metadata. If it is set to `true`, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format. |
135135
| `includeTopicToMetadata` | Boolean | False | false | false | Include the topic name to the metadata. |
136136
| `includePublishTimeToMetadata` | Boolean | False | false | false | Include the message publish time to the metadata as a timestamp. |
137+
| `includeMessageKeyToMetadata` | Boolean | False | false | false | Include the message key to the metadata as a string. |
137138
| `avroCodec` | String | False | false | snappy | Compression codec used when formatType=`avro`. Available compression types are: none (no compression), deflate, bzip2, xz, zstandard, snappy. |
138139
| `parquetCodec` | String | False | false | gzip | Compression codec used when formatType=`parquet`. Available compression types are: none (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
139140
| `jsonAllowNaN` | Boolean | False | false | false | Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=`json`. Since JSON specification does not allow such values this is a non-standard feature and disabled by default. |

docs/google-cloud-storage-sink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ Before using the Google Cloud Storage sink connector, you need to configure it.
139139
| `useHumanReadableSchemaVersion` | Boolean | False | false | false | Use a human-readable format string for the schema version in the message metadata. If it is set to `true`, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format. |
140140
| `includeTopicToMetadata` | Boolean | False | false | false | Include the topic name to the metadata. |
141141
| `includePublishTimeToMetadata` | Boolean | False | false | false | Include the message publish time to the metadata as a timestamp. |
142+
| `includeMessageKeyToMetadata` | Boolean | False | false | false | Include the message key to the metadata as a string. |
142143
| `avroCodec` | String | False | false | snappy | Compression codec used when formatType=`avro`. Available compression types are: none (no compression), deflate, bzip2, xz, zstandard, snappy. |
143144
| `parquetCodec` | String | False | false | gzip | Compression codec used when formatType=`parquet`. Available compression types are: none (no compression), snappy, gzip, lzo, brotli, lz4, zstd. |
144145
| `jsonAllowNaN` | Boolean | False | false | false | Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=`json`. Since JSON specification does not allow such values this is a non-standard feature and disabled by default. |

src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public class BlobStoreAbstractConfig implements Serializable {
117117
private boolean useHumanReadableSchemaVersion;
118118
private boolean includeTopicToMetadata;
119119
private boolean includePublishTimeToMetadata;
120+
private boolean includeMessageKeyToMetadata;
120121

121122
public void validate() {
122123
checkNotNull(provider, "provider not set.");

src/main/java/org/apache/pulsar/io/jcloud/format/AvroFormat.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class AvroFormat implements Format<GenericRecord> , InitConfiguration<Blo
5555
private boolean useHumanReadableSchemaVersion;
5656
private boolean includeTopicToMetadata;
5757
private boolean includePublishTimeToMetadata;
58+
private boolean includeMessageKeyToMetadata;
5859
private CodecFactory codecFactory;
5960

6061
@Override
@@ -69,6 +70,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
6970
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
7071
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
7172
this.includePublishTimeToMetadata = configuration.isIncludePublishTimeToMetadata();
73+
this.includeMessageKeyToMetadata = configuration.isIncludeMessageKeyToMetadata();
7274
String codecName = configuration.getAvroCodec();
7375
if (codecName == null) {
7476
this.codecFactory = CodecFactory.nullCodec();
@@ -90,7 +92,8 @@ public void initSchema(org.apache.pulsar.client.api.Schema<GenericRecord> schema
9092
rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema);
9193
if (useMetadata){
9294
rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema, useHumanReadableMessageId,
93-
useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata);
95+
useHumanReadableSchemaVersion, includeTopicToMetadata,
96+
includePublishTimeToMetadata, includeMessageKeyToMetadata);
9497
}
9598

9699
LOGGER.debug("Using avro schema: {}", rootAvroSchema);
@@ -129,7 +132,7 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> records) throw
129132
org.apache.avro.generic.GenericRecord metadataRecord =
130133
MetadataUtil.extractedMetadataRecord(next,
131134
useHumanReadableMessageId, useHumanReadableSchemaVersion,
132-
includeTopicToMetadata, includePublishTimeToMetadata);
135+
includeTopicToMetadata, includePublishTimeToMetadata, includeMessageKeyToMetadata);
133136
writeRecord.put(MetadataUtil.MESSAGE_METADATA_KEY, metadataRecord);
134137
}
135138
fileWriter.append(writeRecord);

src/main/java/org/apache/pulsar/io/jcloud/format/JsonFormat.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public class JsonFormat implements Format<GenericRecord>, InitConfiguration<Blob
7171
private boolean useHumanReadableSchemaVersion;
7272
private boolean includeTopicToMetadata;
7373
private boolean includePublishTimeToMetadata;
74+
private boolean includeMessageKeyToMetadata;
7475

7576
@Override
7677
public String getExtension() {
@@ -84,6 +85,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
8485
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
8586
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
8687
this.includePublishTimeToMetadata = configuration.isIncludePublishTimeToMetadata();
88+
this.includeMessageKeyToMetadata = configuration.isIncludeMessageKeyToMetadata();
8789

8890
if (configuration.isJsonAllowNaN()) {
8991
JSON_MAPPER.get().enable(ALLOW_NON_NUMERIC_NUMBERS.mappedFeature());
@@ -123,7 +125,7 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> record) throws
123125
if (useMetadata) {
124126
writeValue.put(MetadataUtil.MESSAGE_METADATA_KEY,
125127
MetadataUtil.extractedMetadata(next, useHumanReadableMessageId, useHumanReadableSchemaVersion,
126-
includeTopicToMetadata, includePublishTimeToMetadata));
128+
includeTopicToMetadata, includePublishTimeToMetadata, includeMessageKeyToMetadata));
127129
}
128130
String recordAsString = JSON_MAPPER.get().writeValueAsString(writeValue);
129131
stringBuilder.append(recordAsString).append("\n");

src/main/java/org/apache/pulsar/io/jcloud/format/ParquetFormat.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class ParquetFormat implements Format<GenericRecord>, InitConfiguration<B
6565
private boolean useHumanReadableSchemaVersion;
6666
private boolean includeTopicToMetadata;
6767
private boolean includePublishTimeToMetadata;
68+
private boolean includeMessageKeyToMetadata;
6869

6970
private CompressionCodecName compressionCodecName = CompressionCodecName.GZIP;
7071

@@ -80,6 +81,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
8081
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
8182
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
8283
this.includePublishTimeToMetadata = configuration.isIncludePublishTimeToMetadata();
84+
this.includeMessageKeyToMetadata = configuration.isIncludeMessageKeyToMetadata();
8385
this.compressionCodecName = CompressionCodecName.fromConf(configuration.getParquetCodec());
8486
}
8587

@@ -189,7 +191,8 @@ public void initSchema(org.apache.pulsar.client.api.Schema<GenericRecord> schema
189191
rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema);
190192
if (useMetadata) {
191193
rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema, useHumanReadableMessageId,
192-
useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata);
194+
useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata,
195+
includeMessageKeyToMetadata);
193196
}
194197
log.info("Using avro schema: {}", rootAvroSchema);
195198
}
@@ -252,7 +255,7 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> records) throw
252255
DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(descriptor);
253256
Metadata.PulsarIOCSCProtobufMessageMetadata metadata = getMetadataFromMessage(next,
254257
useHumanReadableMessageId, useHumanReadableSchemaVersion,
255-
includeTopicToMetadata, includePublishTimeToMetadata);
258+
includeTopicToMetadata, includePublishTimeToMetadata, includeMessageKeyToMetadata);
256259
for (Descriptors.FieldDescriptor field : descriptor.getFields()) {
257260
if (field.getName().equals(MESSAGE_METADATA_KEY)) {
258261
messageBuilder.setField(field, metadata);
@@ -276,7 +279,8 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> records) throw
276279
useHumanReadableMessageId,
277280
useHumanReadableSchemaVersion,
278281
includeTopicToMetadata,
279-
includePublishTimeToMetadata);
282+
includePublishTimeToMetadata,
283+
includeMessageKeyToMetadata);
280284
writeRecord.put(MESSAGE_METADATA_KEY, metadataRecord);
281285
}
282286
if (parquetWriter != null) {

0 commit comments

Comments
 (0)