Skip to content

[BUG] error while writing data in Parquet format using JsonSchema as schema format #439

@grandimk

Description

@grandimk

Describe the bug
I was using the Cloud Storage Sink to collect data from Pulsar and write it to AWS S3 in Parquet. Messages were produced using a JsonSchema format. The Sink fails as soon as it tries to convert the collected data into org.apache.avro.generic.GenericRecord (within the convertGenericRecord function).

It tried to produce messages both from Python and from Java and both fail but with different stack traces.

Note: if the formatType specified in the configuration is json everything works fine.

To Reproduce
Use this template configuration for the pulsar-io-cloud-storage v2.9.3.6:

tenant: "<theTenant>"
namespace: "schema-registry"
name: "cloud-storage-sink"
inputs: 
  - "persistent://<theTenant>/<theNamespace>/<theTopic>"
  - <otherTopicUrl>
archive: "connectors/pulsar-io-cloud-storage-2.9.3.6.nar"
parallelism: 1

configs:
  provider: "aws-s3"
  accessKeyId: "<yourAccessKeyId>"
  secretAccessKey: "<yourSecretAccessKey>"
  bucket: "<yourS3Bucket>"
  region: "<yourRegion>"
  pathPrefix: "cloud_storage_sink_parquet/"
  formatType: "parquet"
  partitionerType: "time"
  timePartitionPattern: "yyyy-MM-dd"
  timePartitionDuration: "1d"
  batchSize: 100
  batchTimeMs: 600000
  withMetadata: false
  withTopicPartitionNumber: false

And produce messages in JsonSchema format. Here the code for a minimal Python producer:

import pulsar
from pulsar.schema import *

class YourMessageClass(Record):
    ...

def generate_message() -> YourMessageClass:
    ...

if __name__ == '__main__':
    client = pulsar.Client('pulsar://host.docker.internal:6650')

    producer = pulsar.Client.create_producer(
        topic='persistent://<theTenant>/<theNamespace>/<theTopic>',
        producer_name='python_producer',
        schema=pulsar.schema.JsonSchema(YourMessageClass)
    )

    for i in range(100):
        msg = generate_message()
        producer.send(msg)

    client.close()

Expected behavior
A chunk of data containing a list of collected messages, written to the specified AWS S3 prefix in Parquet format.

Screenshots
None

Additional context
The tests were done on my laptop, using an Apache Pulsar Docker container where the schema-registry was properly configured (the schema definition of the messages have been uploaded) and the version pulsar-io-cloud-storage-2.9.3.6.nar was loaded.

This is the error occurred while writing data produced with the Python producer:

[pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob cloud_storage_sink_parquet/<THE_NAMESPACE>/<THE_TOPIC>/2022-09-02/180925500946.parquet
java.lang.NullPointerException: null
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:220) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriterBuf(ParquetFormat.java:263) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:283) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:243) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

This is the error occurred while writing data produced with the Java producer:

[pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob cloud_storage_sink_parquet/<THE_NAMESPACE>/<THE_TOPIC>/2022-09-08/64156074046.parquet
java.util.NoSuchElementException: No value present
	at java.util.Optional.get(Optional.java:148) ~[?:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:207) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertGenericRecord(AvroRecordUtil.java:169) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.format.ParquetFormat.recordWriterBuf(ParquetFormat.java:263) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.bindValue(BlobStoreAbstractSink.java:283) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:243) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:209) ~[yvE5zcrQR5kFESe6V0ewtg/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions