Skip to content

Commit 751a761

Browse files
committed
Refactor: Update notebook structure for sentiment analysis and streaming queries
1 parent 8959d29 commit 751a761

1 file changed

Lines changed: 120 additions & 7 deletions

File tree

notebooks/03_silver_to_gold.ipynb

Lines changed: 120 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,21 @@
306306
"cell_type": "markdown",
307307
"metadata": {},
308308
"source": [
309-
"## 6. SparkSQL - Sentiment par domaine\n",
310-
"\n",
309+
"## 6. Requêtes en batch"
310+
]
311+
},
312+
{
313+
"cell_type": "markdown",
314+
"metadata": {},
315+
"source": [
316+
"### 6a. SparkSQL - Sentiment par domaine\n",
317+
"\n"
318+
]
319+
},
320+
{
321+
"cell_type": "markdown",
322+
"metadata": {},
323+
"source": [
311324
"Jointure comments + stories pour répondre à : *\"Quels sites génèrent les discussions les plus positives/négatives ?\"*"
312325
]
313326
},
@@ -341,11 +354,18 @@
341354
},
342355
{
343356
"cell_type": "markdown",
344-
"source": "## 6b. Batch avec Window Function - Classement des auteurs\n\nUtilisation de RANK() pour classer les auteurs par leur ratio de commentaires positifs.",
345-
"metadata": {}
357+
"metadata": {},
358+
"source": [
359+
"## 6b. Window Function - Classement des auteurs\n",
360+
"\n",
361+
"Utilisation de RANK() pour classer les auteurs par leur ratio de commentaires positifs."
362+
]
346363
},
347364
{
348365
"cell_type": "code",
366+
"execution_count": null,
367+
"metadata": {},
368+
"outputs": [],
349369
"source": [
350370
"# Agrégation par auteur : total comments, positifs, négatifs\n",
351371
"authors_stats = comments_sentiment \\\n",
@@ -366,10 +386,103 @@
366386
" .filter(col(\"rank\") <= 20)\n",
367387
"\n",
368388
"authors_ranked.show(20, truncate=False)\n"
369-
],
389+
]
390+
},
391+
{
392+
"cell_type": "markdown",
393+
"metadata": {},
394+
"source": [
395+
"## 7. Requêtes en streaming"
396+
]
397+
},
398+
{
399+
"cell_type": "code",
400+
"execution_count": null,
401+
"metadata": {},
402+
"outputs": [],
403+
"source": [
404+
"comments_stream = spark.readStream.format(\"delta\").load(f\"{SILVER_PATH}/comments\")\n",
405+
"comments_with_sentiment = sentiment_model_fitted.transform(comments_stream) "
406+
]
407+
},
408+
{
409+
"cell_type": "markdown",
410+
"metadata": {},
411+
"source": [
412+
"Agrégation du sentiment des commentaires par fenêtre de 5minutes: "
413+
]
414+
},
415+
{
416+
"cell_type": "code",
417+
"execution_count": null,
370418
"metadata": {},
419+
"outputs": [],
420+
"source": [
421+
"sentiment_windowed = comments_with_sentiment \\\n",
422+
" .withColumn(\"sentiment_result\", explode(col(\"sentiment.result\"))) \\\n",
423+
" .groupBy(\n",
424+
" window(\n",
425+
" col(\"timestamp\"),\n",
426+
" \"5 minutes\",\n",
427+
" )\n",
428+
" ) \\\n",
429+
" .agg(\n",
430+
" count(\"*\").alias(\"total_comments\"),\n",
431+
" sum_(when(col(\"sentiment_result\") == \"pos\", 1).otherwise(0)).alias(\"positive_count\"),\n",
432+
" sum_(when(col(\"sentiment_result\") == \"neg\", 1).otherwise(0)).alias(\"negative_count\")\n",
433+
" ) \\\n",
434+
" .withColumn(\"positive_ratio\", round_(col(\"positive_count\") * 100.0 / col(\"total_comments\"), 2))\n",
435+
"\n",
436+
"sentiment_windowed.writeStream \\\n",
437+
" .format(\"delta\") \\\n",
438+
" .outputMode(\"update\") \\\n",
439+
" .option(\"checkpointLocation\", f\"{GOLD_PATH}/_checkpoints/sentiment_windowed\") \\\n",
440+
" .start(f\"{GOLD_PATH}/sentiment_real_time\")"
441+
]
442+
},
443+
{
444+
"cell_type": "markdown",
445+
"metadata": {},
446+
"source": [
447+
"## 7a. Sentiment en temps réel"
448+
]
449+
},
450+
{
451+
"cell_type": "markdown",
452+
"metadata": {},
453+
"source": [
454+
"Top mots-clés avec fenêtre glissante de 10 min, mise à jour toutes les 2 min:"
455+
]
456+
},
457+
{
458+
"cell_type": "code",
371459
"execution_count": null,
372-
"outputs": []
460+
"metadata": {},
461+
"outputs": [],
462+
"source": [
463+
"comments_with_kw = keywords_model_fitted.transform(comments_stream)\n",
464+
"keywords_exploded_stream = comments_with_kw \\\n",
465+
" .select(\n",
466+
" col(\"id\"),\n",
467+
" explode(col(\"keywords\")).alias(\"keyword\")\n",
468+
" ) \\\n",
469+
" .groupBy(\n",
470+
" window(\n",
471+
" col(\"timestamp\"),\n",
472+
" \"10 minutes\",\n",
473+
" \"2 minutes\"\n",
474+
" ),\n",
475+
" col(\"keyword\")\n",
476+
" ) \\\n",
477+
" .count() \\\n",
478+
" .orderBy(desc(\"count\"))\n",
479+
"\n",
480+
"keywords_exploded_stream.writeStream \\\n",
481+
" .format(\"delta\") \\\n",
482+
" .outputMode(\"complete\") \\\n",
483+
" .option(\"checkpointLocation\", f\"{GOLD_PATH}/_checkpoints/keywords_windowed\") \\\n",
484+
" .start(f\"{GOLD_PATH}/keywords_real_time\")"
485+
]
373486
},
374487
{
375488
"cell_type": "markdown",
@@ -531,4 +644,4 @@
531644
},
532645
"nbformat": 4,
533646
"nbformat_minor": 4
534-
}
647+
}

0 commit comments

Comments
 (0)