Skip to content

[Bug]: Python TypeError when converting Avro logicalType timestamp-millis to Beam Schema #31656

Open
@gergely-g

Description

@gergely-g

What happened?

When importing Avro files that have schemas with the field type:

{"type": "long", "logicalType": "timestamp-millis"}

an attempt to convert the collection to Beam Schemas (beam.Row) will fail with a TypeError in RowCoderImpl in the form:

apache_beam/coders/coder_impl.py:209: in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
    ???
apache_beam/coders/coder_impl.py:248: in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
    ???
apache_beam/coders/coder_impl.py:1824: in apache_beam.coders.coder_impl.RowCoderImpl.encode_to_stream
    ???
E   TypeError: an integer is required [while running 'ReadFromAvro/Map(<lambda at avroio.py:633>)']

Avro files with timestamp-millis logical types are created for example when a BigQuery table data with column type TIMESTAMP gets exported to .avro files.

Unit test to repro the issue:

import datetime
import tempfile
import unittest

import apache_beam as beam
import fastavro


class TestBeamSchemaConversions(unittest.TestCase):
    def test_convert_timestamp_millis(self):
        """Demonstrate bug: Avro-to-Beam schema conversion cannot handle timestamp-millis logical type."""
        avro_schema = {
            "type": "record",
            "name": "Test",
            "fields": [
                {"name": "name", "type": "string"},
                {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}},
            ],
        }

        # Write test records into a temp file using fastavro.
        with tempfile.NamedTemporaryFile(delete=True) as input_avro_file:
            fastavro.writer(
                input_avro_file,
                avro_schema,
                [
                    {"name": "Alice", "timestamp": datetime.datetime(2024, 6, 3, 14, 14, 4, 765000, tzinfo=datetime.timezone.utc)},
                    {"name": "Bob", "timestamp": datetime.datetime(2024, 6, 3, 14, 14, 4, 765000, tzinfo=datetime.timezone.utc)},
                ],
                validator=True,
            )
            input_avro_file.flush()

            with self.assertRaises(TypeError) as context:
                with beam.Pipeline() as p:
                    avro_records = p | beam.io.ReadFromAvro(input_avro_file.name, as_rows=True)
                    avro_records | beam.LogElements()

            self.assertTrue("an integer is required" in str(context.exception))

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions