11from pyspark .sql import SparkSession
2- from pyspark .sql .functions import col , from_json , avg
3- from pyspark .sql .types import StructType , StructField , StringType , DecimalType , IntegerType , DateType , TimestampType
2+ from pyspark .sql .functions import col , from_json , avg , row_number , date_format , to_date
3+ from pyspark .sql .types import StructType , StructField , StringType , DecimalType , IntegerType
4+ from pyspark .sql .window import Window
5+ from pyspark .sql import functions as F
46import sys
57
68
@@ -28,10 +30,8 @@ def writeToCassandraGrouped(df, epochId):
2830
2931def pivotAndWriteToCassandra (batch_df , epochId ):
3032 trade_types = ['buy' , 'sell' ]
31-
3233 pivoted_df = batch_df .groupBy ("stock" ).pivot (
3334 "trade_type" , trade_types ).avg ("price" )
34-
3535 for trade_type in trade_types :
3636 column_name = f"avg_price_{ trade_type } "
3737 pivoted_df = pivoted_df .withColumnRenamed (trade_type , column_name )
@@ -46,6 +46,55 @@ def pivotAndWriteToCassandra(batch_df, epochId):
4646 print ("Error writing pivoted data to Cassandra:" , e )
4747
4848
49+ def rollupAndWriteToCassandra (batch_df , epochId ):
50+ trade_date_format = 'yyyy-MM-dd'
51+ string_columns = [col (c ).cast (StringType ()) for c in batch_df .columns ]
52+ batch_df_string = batch_df .select (* string_columns )
53+ batch_df_string = batch_df_string .filter (
54+ batch_df_string ["trade_date" ].isNotNull ())
55+ batch_df_string = batch_df_string .withColumn ("trade_date" ,
56+ date_format (to_date ("trade_date" , "yyyy-MM-dd" ), trade_date_format ))
57+ rolled_up_df = batch_df_string .groupBy (
58+ "trade_date" , "trade_type" ).agg (avg ("price" ).alias ("avg_price" ))
59+
60+ try :
61+ print ("DataFrame content before writing to Cassandra:" )
62+ rolled_up_df .show ()
63+ rolled_up_df .write \
64+ .format ("org.apache.spark.sql.cassandra" ) \
65+ .options (table = "rollup_stocks" , keyspace = "stockdata" ) \
66+ .mode ("append" ) \
67+ .save ()
68+ except Exception as e :
69+ print ("Error writing rolled up data to Cassandra:" , e )
70+
71+
72+ def rankAndWriteToCassandra (batch_df , epochId ):
73+ windowSpec = Window .partitionBy ("trade_type" ).orderBy ("price" )
74+ ranked_df = batch_df .withColumn ("rank" , row_number ().over (windowSpec ))
75+ try :
76+ ranked_df .write \
77+ .format ("org.apache.spark.sql.cassandra" ) \
78+ .options (table = "ranked_stocks" , keyspace = "stockdata" ) \
79+ .mode ("append" ) \
80+ .save ()
81+ except Exception as e :
82+ print ("Error writing ranked data to Cassandra:" , e )
83+
84+
85+ def analyticsAndWriteToCassandra (batch_df , epochId ):
86+ analytics_df = batch_df .withColumn (
87+ "avg_price_overall" , avg ("price" ).over (Window .partitionBy ()))
88+ try :
89+ analytics_df .write \
90+ .format ("org.apache.spark.sql.cassandra" ) \
91+ .options (table = "analytics_stocks" , keyspace = "stockdata" ) \
92+ .mode ("append" ) \
93+ .save ()
94+ except Exception as e :
95+ print ("Error writing analytics data to Cassandra:" , e )
96+
97+
4998def main ():
5099 spark = SparkSession .builder \
51100 .appName ("Spark-Kafka-Cassandra-Stocks" ) \
@@ -85,15 +134,6 @@ def main():
85134 .select (from_json (col ("value" ), schema ).alias ("data" )) \
86135 .select ("data.*" )
87136
88- query_cassandra = df_parsed .writeStream \
89- .outputMode ("append" ) \
90- .foreachBatch (writeToCassandra ) \
91- .start ()
92-
93- df_parsed = df .selectExpr ("CAST(value AS STRING)" ) \
94- .select (from_json (col ("value" ), schema ).alias ("data" )) \
95- .select ("data.*" )
96-
97137 df_grouped = df_parsed .groupBy ("trade_type" ).agg (
98138 avg ("price" ).alias ("avg_price" ))
99139
@@ -102,19 +142,32 @@ def main():
102142 .foreachBatch (writeToCassandraGrouped ) \
103143 .start ()
104144
105- df_parsed = df .selectExpr ("CAST(value AS STRING)" ) \
106- .select (from_json (col ("value" ), schema ).alias ("data" )) \
107- .select ("data.*" )
145+ query_cassandra = df_parsed .writeStream \
146+ .outputMode ("append" ) \
147+ .foreachBatch (writeToCassandra ) \
148+ .start ()
149+
150+ query_rollup = df_parsed .writeStream \
151+ .outputMode ("append" ) \
152+ .foreachBatch (rollupAndWriteToCassandra ) \
153+ .start ()
154+
155+ query_rank = df_parsed .writeStream \
156+ .outputMode ("append" ) \
157+ .foreachBatch (rankAndWriteToCassandra ) \
158+ .start ()
108159
109- query_pivot = df_parsed .writeStream \
160+ query_analytics = df_parsed .writeStream \
110161 .outputMode ("append" ) \
111- .foreachBatch (pivotAndWriteToCassandra ) \
162+ .foreachBatch (analyticsAndWriteToCassandra ) \
112163 .start ()
113164
114165 query_console .awaitTermination ()
115166 query_cassandra .awaitTermination ()
167+ query_rollup .awaitTermination ()
168+ query_rank .awaitTermination ()
169+ query_analytics .awaitTermination ()
116170 query_cassandra_grouped .awaitTermination ()
117- query_pivot .awaitTermination ()
118171
119172
120173if __name__ == "__main__" :
0 commit comments