66import pyspark .sql .functions as F
77from datetime import timedelta , timezone
88import math
9- import mlflow .pyfunc
10- import pyspark .sql .functions as F
119from pyspark .sql .types import IntegerType
1210
11+
1312def rounded_unix_timestamp (dt , num_minutes = 15 ):
1413 """
1514 Ceilings datetime dt to interval num_minutes, then returns the unix timestamp.
@@ -18,27 +17,21 @@ def rounded_unix_timestamp(dt, num_minutes=15):
1817 delta = math .ceil (nsecs / (60 * num_minutes )) * (60 * num_minutes ) - nsecs
1918 return int ((dt + timedelta (seconds = delta )).replace (tzinfo = timezone .utc ).timestamp ())
2019
20+
2121rounded_unix_timestamp_udf = F .udf (rounded_unix_timestamp , IntegerType ())
2222
23+
2324def rounded_taxi_data (taxi_data_df ):
2425 # Round the taxi data timestamp to 15 and 30 minute intervals so we can join with the pickup and dropoff features
2526 # respectively.
2627 taxi_data_df = (
2728 taxi_data_df .withColumn (
2829 "rounded_pickup_datetime" ,
29- F .to_timestamp (
30- rounded_unix_timestamp_udf (
31- taxi_data_df ["tpep_pickup_datetime" ], F .lit (15 )
32- )
33- ),
30+ F .to_timestamp (rounded_unix_timestamp_udf (taxi_data_df ["tpep_pickup_datetime" ], F .lit (15 ))),
3431 )
3532 .withColumn (
3633 "rounded_dropoff_datetime" ,
37- F .to_timestamp (
38- rounded_unix_timestamp_udf (
39- taxi_data_df ["tpep_dropoff_datetime" ], F .lit (30 )
40- )
41- ),
34+ F .to_timestamp (rounded_unix_timestamp_udf (taxi_data_df ["tpep_dropoff_datetime" ], F .lit (30 ))),
4235 )
4336 .drop ("tpep_pickup_datetime" )
4437 .drop ("tpep_dropoff_datetime" )
@@ -59,4 +52,3 @@ def rounded_taxi_data(taxi_data_df):
5952# MAGIC select * from qa_mlops_demo.marcin_wojtyczka.feature_store_inference_input
6053
6154# COMMAND ----------
62-
0 commit comments