Postgres to Avro generator.
- Generate Avro schema from column definition.
- Generate data format consumable for Avro serialization.
Method: pg2avro.get_avro_schema
get_avro_schema(
"mytable",
"public",
[
# Dictionary mode
{
"name": "column_name_1",
"type": "int2",
"nullable": False,
},
# SqlAlchemy mode
SqlAlchemyColumn(ARRAY(TEXT), name="column_name_2"),
...
]
)
Schema generator needs the following information:
- table name
- namespace (
schemain SQL,datasetin Big Query etc.) - columns - iterable of columns, each element with:
- name
- type -
_prefix is used to indicate array types - nullable (optional,
Trueassumed if not provided)
- column mapping - optional
ColumnMappingobject with column mappings (see below for more info).
Column data can be passed in multiple formats.
- Dictionary with required keys and data
- SqlAlchemy Column object
- Any object with compatible attributes and required data
- Dictionary or object with required data, but without compatible attributes/keys, supplied with ColumnMapping.
Note: this mode supports generating schema from raw postgres data - udt_name can be used to generate the schema.
columns = [
CustomColumn(name="column_name", udt_name="int2", is_nullable=False),
]
get_avro_schema(
table_name,
namespace,
columns,
ColumnMapping(name="name", type="udt_name", nullable="is_nullable"),
)
Method: pg2avro.get_avro_row_dict
This method requires rows data and schema to generate the rows with.
- Dictionary with keys corresponding to schema field names
- Object with keys corresponding to schema field names (works the same as dictionary with corresponding fields)
- Tuple with data in the same order as fields specified in schema
columns = [
{"name": "name", "type": "varchar", "nullable": False},
{"name": "number", "type": "float4", "float4", "nullable": False},
]
schema = get_avro_schema(table_name, namespace, columns)
rows = [
{"name": "John", "number": 1.0},
RowObject(name="Jack", number=2.0),
("Jim", 3.0),
]
data = [get_avro_row_dict(row, schema) for row in rows]
Some cases might require overriding standard mapping. An example of such scenario is moving pg data into google bigquery where numeric types are handled differently and cannot accept arbitrary scale, so we may want to override that into float.
To do so, simply pass your mapping overrides as a column name keyed dict to the get_avro_schema function like so:
columns = [
{"name": "some_special_field", "type": "int"},
{"name": "numeric_with_high_scale", "type": "numeric(20, 15)"},
]
overrides = {
"some_special_field": {"pg_type": "string", "python_type": str},
"numeric_with_high_scale": {"pg_type": "float8", "python_type": float},
}
schema = get_avro_schema(table_name, namespace, columns, mapping_overrides=overrides)
pg_type- the type you want the column to look like for pg2avro instead of what was retrieved from pg/sqlalchemy etc.python_type- built in python type to use for typecasting. Usestr,float,int,tuple,list,setanddicthere.
And your some_special_field will be mapped into a string instead of int accordingly.