11import pyarrow as pa
22import pyarrow .parquet as pq
3+ import pyarrow .compute as pc
34import requests
45import sys
56import os
@@ -105,6 +106,9 @@ def download_and_convert_pb_to_parquet(url, existing_parquet_path=None):
105106
106107 # Define PyArrow schema
107108 schema = pa .schema ([
109+ ('position_latitude' , pa .float64 ()),
110+ ('position_longitude' , pa .float64 ()),
111+ ('geometry' , pa .string ()),
108112 ('id' , pa .string ()),
109113 ('is_deleted' , pa .bool_ ()),
110114 ('trip_id' , pa .string ()),
@@ -124,8 +128,6 @@ def download_and_convert_pb_to_parquet(url, existing_parquet_path=None):
124128 ('vehicle_id' , pa .string ()),
125129 ('vehicle_label' , pa .string ()),
126130 ('license_plate' , pa .string ()),
127- ('position_latitude' , pa .float64 ()),
128- ('position_longitude' , pa .float64 ()),
129131 ('bearing' , pa .float64 ()),
130132 ('odometer' , pa .float64 ()),
131133 ('speed' , pa .float64 ()),
@@ -167,6 +169,7 @@ def download_and_convert_pb_to_parquet(url, existing_parquet_path=None):
167169 'license_plate' : [record .get ('license_plate' ) for record in records ],
168170 'position_latitude' : [record .get ('position_latitude' ) for record in records ],
169171 'position_longitude' : [record .get ('position_longitude' ) for record in records ],
172+ 'geometry' : [f"POINT ({ record .get ('position_longitude' )} { record .get ('position_latitude' )} )" if record .get ('position_latitude' ) is not None and record .get ('position_longitude' ) is not None else None for record in records ],
170173 'bearing' : [record .get ('bearing' ) for record in records ],
171174 'odometer' : [record .get ('odometer' ) for record in records ],
172175 'speed' : [record .get ('speed' ) for record in records ],
@@ -191,10 +194,10 @@ def download_and_convert_pb_to_parquet(url, existing_parquet_path=None):
191194 # Concatenate the new table with the existing one
192195 combined_table = pa .concat_tables ([existing_table , table ])
193196 # Write back to the Parquet file
194- pq .write_table (combined_table , existing_parquet_path )
197+ pq .write_table (combined_table , existing_parquet_path , compression = 'GZIP' )
195198 else :
196199 # Write to a new Parquet file
197- pq .write_table (table , 'gtfs_realtime_data.parquet ' )
200+ pq .write_table (table , 'gtfs_realtime_data.geoparquet' , compression = 'GZIP ' )
198201
199202# Example usage
200203if __name__ == "__main__" :
0 commit comments