Skip to content

Add Avro file support for Parsons Tables #1371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/table.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ From Parsons Table
* - :py:meth:`~parsons.etl.tofrom.ToFrom.to_csv`
- CSV File
- Write a table to a local csv file
* - :py:meth:`~parsons.etl.tofrom.ToFrom.to_avro`
- Avro File
- Write a table to a local avro file
* - :py:meth:`~parsons.etl.tofrom.ToFrom.to_s3_csv`
- AWS s3 Bucket
- Write a table to a csv stored in S3
Expand Down Expand Up @@ -57,6 +60,9 @@ From Parsons Table
* - :py:meth:`~parsons.etl.tofrom.ToFrom.append_csv`
- CSV file
- Appends table to an existing CSV
* - :py:meth:`~parsons.etl.tofrom.ToFrom.append_avro`
- Avro file
- Appends table to an existing Avro file
* - :py:meth:`~parsons.etl.tofrom.ToFrom.to_zip_csv`
- ZIP file
- Writes a table to a CSV in a zip archive
Expand All @@ -83,6 +89,9 @@ Create Parsons Table object using the following methods.
* - :py:meth:`~parsons.etl.tofrom.ToFrom.from_csv`
- File like object, local path, url, ftp.
- Loads a csv object into a Table
* - :py:meth:`~parsons.etl.tofrom.ToFrom.from_avro`
- Avro File
- Load a table from a local avro file
* - :py:meth:`~parsons.etl.tofrom.ToFrom.from_json`
- File like object, local path, url, ftp.
- Loads a json object into a Table
Expand Down
141 changes: 141 additions & 0 deletions parsons/etl/tofrom.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,125 @@ def to_html(

return local_path

def to_avro(
self, target, schema=None, sample=9, codec="deflate", compression_level=None, **avro_args
):
"""
Outputs table to an Avro file.

In order to use this method, you must have the `fastavro` library installed.
If using limited dependencies, you can install it with `pip install parsons[avro]`.

Write the table into a new avro file according to schema passed.

This method assume that each column has values with the same type
for all rows of the source `table`.

Avro is a data serialization framework that is generally is faster
and safer than text formats like Json, XML or CSV.

`Args:`
target: str
the file path for creating the avro file.
Note that if a file already exists at the given location, it will be
overwritten.
schema: dict
defines the rows field structure of the file.
Check fastavro `documentation`_ and Avro schema `reference`_ for details.
sample: int, optional
defines how many rows are inspected
for discovering the field types and building a schema for the avro file
when the `schema` argument is not passed. Default is 9.
codec: str, optional
The `codec` argument (string, optional) sets the compression codec used to
shrink data in the file. It can be 'null', 'deflate' (default), 'bzip2' or
'snappy', 'zstandard', 'lz4', 'xz' (if installed)
compression_level: int, optional
sets the level of compression to use with the specified codec (if the codec supports it)
avro_args: kwargs
Additionally there are support for passing extra options in the
argument `**avro_args` that are fowarded directly to fastavro. [Check the
fastavro documentation](https://fastavro.readthedocs.io/en/latest/) for reference.

Example usage for writing files::

>>> # set up a Avro file to demonstrate with
>>> table2 = [['name', 'friends', 'age'],
... ['Bob', 42, 33],
... ['Jim', 13, 69],
... ['Joe', 86, 17],
... ['Ted', 23, 51]]
...
>>> schema2 = {
... 'doc': 'Some people records.',
... 'name': 'People',
... 'namespace': 'test',
... 'type': 'record',
... 'fields': [
... {'name': 'name', 'type': 'string'},
... {'name': 'friends', 'type': 'int'},
... {'name': 'age', 'type': 'int'},
... ]
... }
...
>>> # now demonstrate writing with toavro()
>>> from parsons import Table

>>> Table.toavro(table2, 'example.file2.avro', schema=schema2)
...
>>> # this was what was saved above
>>> tbl2 = Table.fromavro('example.file2.avro')
>>> tbl2
+-------+---------+-----+
| name | friends | age |
+=======+=========+=====+
| 'Bob' | 42 | 33 |
+-------+---------+-----+
| 'Jim' | 13 | 69 |
+-------+---------+-----+
| 'Joe' | 86 | 17 |
+-------+---------+-----+
| 'Ted' | 23 | 51 |
+-------+---------+-----+
"""

return petl.toavro(
self.table,
target,
schema=schema,
sample=sample,
codec=codec,
compression_level=compression_level,
**avro_args,
)

def append_avro(self, target, schema=None, sample=9, **avro_args):
"""
Append table to an existing Avro file.

Write the table into an existing avro file according to schema passed.

This method assume that each column has values with the same type
for all rows of the source `table`.

`Args:`
target: str
the file path for creating the avro file.
schema: dict
defines the rows field structure of the file.
Check fastavro `documentation`_ and Avro schema `reference`_ for details.
sample: int, optional
defines how many rows are inspected
for discovering the field types and building a schema for the avro file
when the `schema` argument is not passed. Default is 9.
avro_args: kwargs
Additionally there are support for passing extra options in the
argument `**avro_args` that are fowarded directly to fastavro. Check the
fastavro `documentation`_ for reference.
"""

return petl.appendavro(self.table, target, schema=schema, sample=sample, **avro_args)

def to_csv(
self,
local_path=None,
Expand Down Expand Up @@ -715,6 +834,28 @@ def to_civis(
**civisargs,
)

@classmethod
def from_avro(cls, local_path, limit=None, skips=0, **avro_args):
r"""
Create a ``parsons table`` from an Avro file.

`Args:`
local_path: str
The path to the Avro file.
limit: int, optional
The maximum number of rows to extract. Default is ``None`` (all rows).
skips: int, optional
The number of rows to skip from the start. Default is 0.
\**avro_args: kwargs
Additional arguments passed to `fastavro.reader`.

`Returns:`
Parsons Table
See :ref:`parsons-table` for output options.
"""

return cls(petl.fromavro(local_path, limit=limit, skips=skips, **avro_args))

@classmethod
def from_csv(cls, local_path, **csvargs):
r"""
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ curlify==2.2.1
dbt_core>=1.5.0
defusedxml>=0.7.1, <=0.8.0
facebook-business==22.0.2
fastavro==1.10.0
google-api-core==2.24.2
google-api-python-client==2.163.0
google-auth==2.38.0
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def main():
extras_require = {
"airtable": ["pyairtable"],
"alchemer": ["surveygizmo"],
"avro": ["fastavro"],
"azure": ["azure-storage-blob"],
"box": ["boxsdk"],
"braintree": ["braintree"],
Expand Down
153 changes: 153 additions & 0 deletions test/test_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,159 @@ def test_to_temp_html(self):
with open(path, "r") as f:
self.assertEqual(f.read(), html)

def test_to_avro_basic(self):
# Create a temporary directory and file
with tempfile.TemporaryDirectory() as temp_dir:
avro_file = os.path.join(temp_dir, "test.avro")

# Create a test table
tbl = Table([{"first": "Bob", "last": "Smith"}])

# Test basic functionality
tbl.to_avro(avro_file)

# Verify the file exists
assert os.path.exists(avro_file)

# Read it back and verify content
result_tbl = Table.from_avro(avro_file)
assert len(result_tbl) == len(tbl)
assert sorted(result_tbl.columns) == sorted(tbl.columns)

# Check data values match
for i in range(len(tbl)):
for col in tbl.columns:
assert result_tbl[i][col] == tbl[i][col]

def test_to_avro_with_schema(self):
# Create a temporary directory and file
with tempfile.TemporaryDirectory() as temp_dir:
avro_file = os.path.join(temp_dir, "test.avro")

# Create a test table
tbl = Table([{"first": "Bob", "last": "Smith"}])

# Test with explicit schema
schema = {
"doc": "Some people records.",
"name": "People",
"namespace": "test",
"type": "record",
"fields": [
{"name": "first", "type": "string"},
{"name": "last", "type": "string"},
],
}

tbl.to_avro(avro_file, schema=schema)

# Read it back and verify content
result_tbl = Table.from_avro(avro_file)
assert len(result_tbl) == len(tbl)
assert sorted(result_tbl.columns) == sorted(tbl.columns)

def test_to_avro_different_codecs(self):
# Create a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
# Create a test table
tbl = Table([{"first": "Bob", "last": "Smith"}])

# Test with different compression codecs
for codec in ["null", "deflate", "bzip2"]:
test_file = os.path.join(temp_dir, f"test_{codec}.avro")
tbl.to_avro(test_file, codec=codec)

# Verify the file exists
assert os.path.exists(test_file)

# Read it back and verify content
result_tbl = Table.from_avro(test_file)
assert len(result_tbl) == len(tbl)
assert sorted(result_tbl.columns) == sorted(tbl.columns)

def test_to_avro_with_compression_level(self):
# Create a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
# Create a test table
tbl = Table([{"first": "Bob", "last": "Smith"}])

# Test with compression level
codec = "deflate"
for level in [1, 5, 9]:
test_file = os.path.join(temp_dir, f"test_level_{level}.avro")
tbl.to_avro(test_file, codec=codec, compression_level=level)

# Verify the file exists
assert os.path.exists(test_file)

# Read it back and verify content
result_tbl = Table.from_avro(test_file)
assert len(result_tbl) == len(tbl)

def test_to_avro_sample_size(self):
# Create a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
# Create a test table
tbl = Table([{"first": "Bob", "last": "Smith"}])

# Test with different sample sizes for schema inference
for sample in [1, 5, 10]:
test_file = os.path.join(temp_dir, f"test_sample_{sample}.avro")
tbl.to_avro(test_file, sample=sample)

# Verify the file exists
assert os.path.exists(test_file)

# Read it back and verify content
result_tbl = Table.from_avro(test_file)
assert len(result_tbl) == len(tbl)

def test_to_avro_with_avro_args(self):
# Create a temporary directory and file
with tempfile.TemporaryDirectory() as temp_dir:
avro_file = os.path.join(temp_dir, "test.avro")

# Create a test table
tbl = Table([{"first": "Bob", "last": "Smith"}])

# Test with additional arguments to fastavro
tbl.to_avro(
avro_file,
sync_interval=16000, # Custom sync marker interval
metadata={"created_by": "parsons_test"},
)

# Verify the file exists
assert os.path.exists(avro_file)

# Read it back and verify content
result_tbl = Table.from_avro(avro_file)
assert len(result_tbl) == len(tbl)

def test_to_avro_complex_types(self):
# Create a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
# Test with more complex data types
complex_data = [
{
"name": "Bob",
"tags": ["tag1", "tag2"],
"metadata": {"city": "NYC", "state": "NY"},
},
{"name": "Jim", "tags": ["tag3"], "metadata": {"city": "LA", "state": "CA"}},
]
complex_tbl = Table(complex_data)

test_file = os.path.join(temp_dir, "test_complex.avro")
complex_tbl.to_avro(test_file)

# Verify the file exists
assert os.path.exists(test_file)

# Read it back and verify content
result_tbl = Table.from_avro(test_file)
assert len(result_tbl) == len(complex_tbl)

def _assert_expected_csv(self, path, orig_tbl):
result_tbl = Table.from_csv(path)
assert_matching_tables(orig_tbl, result_tbl)
Expand Down
Loading