Skip to content

Commit 8959d29

Browse files
Angry-Jayh00dieB0y
andauthored
Feat/simple gold (#17)
* feat: Add Silver to Gold transformation notebook with Spark NLP integration Co-authored-by: Angry-Jay <jinozebian@gmail.com> * feat: Enhance Silver to Gold transformation with keyword extraction and improved NER pipeline Co-authored-by: Angry-Jay <jinozebian@gmail.com> * feat: Enhance documentation in Silver to Gold notebook with detailed objectives and explanations for each pipeline Co-authored-by: Angry-Jay <jinozebian@gmail.com> * feat: Simplify and enhance documentation in Silver to Gold notebook for clarity and conciseness * refactor: Remove comments_enriched from Silver layer (YAGNI) - Remove redundant join in Silver (comments + stories) - Join is now done in Gold SparkSQL when needed - Simplify Silver to only 2 tables: stories, comments * feat: Update Docker configuration to change Spark service condition and enhance notebook dependencies * feat: Update Garage access keys and enhance notebook metadata for consistency * feat: Add reminder for creating access key, secret key, and buckets in Garage UI before launching a notebook * refactor: Consolidate Spark session configuration and credentials in Silver to Gold notebook * feat: Refactor Spark session configuration and enhance sentiment analysis with Universal Sentence Encoder * fix: Update Garage access keys for consistency and remove unused savefig calls * fix: Replace size function with length for keyword filtering in notebook * feat: Consolidate Spark NLP imports and add author ranking analysis with window function * chore: Add TODO comments for replacing Garage credentials in notebooks * Update README.md * Update notebooks/03_silver_to_gold.ipynb * Update notebooks/03_silver_to_gold.ipynb * Update notebooks/03_silver_to_gold.ipynb * Update notebooks/03_silver_to_gold.ipynb * Update notebooks/03_silver_to_gold.ipynb --------- Co-authored-by: Manne Emile KITSOUKOU <emilemannekitsoukou@gmail.com> Co-authored-by: Angry-Jay <jinozebian@gmail.com>
1 parent 5212845 commit 8959d29

6 files changed

Lines changed: 698 additions & 55 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ docker volume create garage-data
99
docker-compose up -d
1010
```
1111

12+
Don't forget to create your access key, secret key and buckets before launching a notebook, in the Garage UI interface !
1213

1314
---
1415

docker-compose.yml

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ services:
6262
kafka:
6363
condition: service_healthy
6464
spark:
65-
condition: service_healthy
65+
condition: service_started
6666
ports:
6767
- "8888:8888"
6868
- "4040:4040"
@@ -86,20 +86,14 @@ services:
8686
ports:
8787
- '8080:8080'
8888
- '7077:7077'
89-
healthcheck:
90-
test: ["CMD-SHELL", "nc -z localhost 7077 || exit 1"]
91-
interval: 10s
92-
timeout: 5s
93-
retries: 12
94-
start_period: 30s
9589
networks:
9690
- hackernews-network
9791

9892
spark-worker:
9993
image: grosinosky/spark:3.5.3
10094
depends_on:
10195
spark:
102-
condition: service_healthy
96+
condition: service_started
10397
environment:
10498
- SPARK_MODE=worker
10599
- SPARK_MASTER_URL=spark://spark:7077
@@ -164,4 +158,4 @@ volumes:
164158

165159
networks:
166160
hackernews-network:
167-
driver: bridge
161+
driver: bridge

notebooks/01_kafka_to_bronze.ipynb

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@
2626
"from pyspark.sql import SQLContext\n",
2727
"\n",
2828
"# Configuration\n",
29+
"# TODO : A remplacer par vos propres identifiants Garage\n",
2930
"KAFKA_SERVERS = \"kafka:9092\"\n",
3031
"GARAGE_ENDPOINT = \"http://garage:3900\"\n",
31-
"GARAGE_ACCESS_KEY = \"GKa25124b4fd82613c063217f3\"\n",
32-
"GARAGE_SECRET_KEY = \"008126399688f9b1efc3a3093079b066e4c6471fa256b52788da0c927194147e\"\n",
32+
"GARAGE_ACCESS_KEY = \"GK907b22f51dc0d0c5164474f2\"\n",
33+
"GARAGE_SECRET_KEY = \"6cf587853042d92d2cf6bb85b7c46a6a2400a47822e9baae32f9be0b7c5c9663\"\n",
3334
"\n",
3435
"# Spark config avec cluster (inspiré TP8)\n",
3536
"conf = SparkConf() \\\n",
@@ -292,13 +293,21 @@
292293
],
293294
"metadata": {
294295
"kernelspec": {
295-
"display_name": "Python 3",
296+
"display_name": "Python 3 (ipykernel)",
296297
"language": "python",
297298
"name": "python3"
298299
},
299300
"language_info": {
301+
"codemirror_mode": {
302+
"name": "ipython",
303+
"version": 3
304+
},
305+
"file_extension": ".py",
306+
"mimetype": "text/x-python",
300307
"name": "python",
301-
"version": "3.12.0"
308+
"nbconvert_exporter": "python",
309+
"pygments_lexer": "ipython3",
310+
"version": "3.12.11"
302311
}
303312
},
304313
"nbformat": 4,

notebooks/02_bronze_to_silver.ipynb

Lines changed: 146 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,143 +3,240 @@
33
{
44
"cell_type": "markdown",
55
"metadata": {},
6-
"source": "# Bronze → Silver Layer"
6+
"source": [
7+
"# Bronze → Silver Layer"
8+
]
79
},
810
{
911
"cell_type": "markdown",
1012
"metadata": {},
11-
"source": "## 1. Configuration Spark"
13+
"source": [
14+
"## 1. Configuration Spark"
15+
]
1216
},
1317
{
1418
"cell_type": "code",
1519
"execution_count": null,
1620
"metadata": {},
1721
"outputs": [],
18-
"source": "from pyspark.sql import SparkSession\n\nGARAGE_ENDPOINT = \"http://garage:3900\"\nGARAGE_ACCESS_KEY = \"GKa25124b4fd82613c063217f3\"\nGARAGE_SECRET_KEY = \"008126399688f9b1efc3a3093079b066e4c6471fa256b52788da0c927194147e\"\n\nBRONZE_PATH = \"s3a://bronze/hackernews\"\nSILVER_PATH = \"s3a://silver/hackernews\"\n\nspark = SparkSession.builder \\\n .appName(\"BronzeToSilver\") \\\n .master(\"spark://spark:7077\") \\\n .config(\"spark.jars.packages\", \n \"org.apache.hadoop:hadoop-aws:3.3.4,\"\n \"com.amazonaws:aws-java-sdk-bundle:1.12.262,\"\n \"io.delta:delta-spark_2.12:3.3.0\") \\\n .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\") \\\n .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\") \\\n .config(\"spark.hadoop.fs.s3a.multiobjectdelete.enable\", \"false\") \\\n .config(\"spark.sql.shuffle.partitions\", \"10\") \\\n .getOrCreate()\n\nhadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()\nhadoop_conf.set(\"fs.s3a.endpoint\", GARAGE_ENDPOINT)\nhadoop_conf.set(\"fs.s3a.access.key\", GARAGE_ACCESS_KEY)\nhadoop_conf.set(\"fs.s3a.secret.key\", GARAGE_SECRET_KEY)\nhadoop_conf.set(\"fs.s3a.endpoint.region\", \"garage\")\nhadoop_conf.set(\"fs.s3a.path.style.access\", \"true\")\nhadoop_conf.set(\"fs.s3a.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\nhadoop_conf.set(\"fs.s3a.connection.ssl.enabled\", \"false\")"
22+
"source": [
23+
"from pyspark.sql import SparkSession\n",
24+
"\n",
25+
"# TODO : A remplacer par vos propres identifiants Garage\n",
26+
"GARAGE_ENDPOINT = \"http://garage:3900\"\n",
27+
"GARAGE_ACCESS_KEY = \"GK907b22f51dc0d0c5164474f2\"\n",
28+
"GARAGE_SECRET_KEY = \"6cf587853042d92d2cf6bb85b7c46a6a2400a47822e9baae32f9be0b7c5c9663\"\n",
29+
"\n",
30+
"BRONZE_PATH = \"s3a://bronze/hackernews\"\n",
31+
"SILVER_PATH = \"s3a://silver/hackernews\"\n",
32+
"\n",
33+
"spark = SparkSession.builder \\\n",
34+
" .appName(\"BronzeToSilver\") \\\n",
35+
" .master(\"spark://spark:7077\") \\\n",
36+
" .config(\"spark.jars.packages\", \n",
37+
" \"org.apache.hadoop:hadoop-aws:3.3.4,\"\n",
38+
" \"com.amazonaws:aws-java-sdk-bundle:1.12.262,\"\n",
39+
" \"io.delta:delta-spark_2.12:3.3.0\") \\\n",
40+
" .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\") \\\n",
41+
" .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\") \\\n",
42+
" .config(\"spark.hadoop.fs.s3a.multiobjectdelete.enable\", \"false\") \\\n",
43+
" .config(\"spark.sql.shuffle.partitions\", \"10\") \\\n",
44+
" .getOrCreate()\n",
45+
"\n",
46+
"hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()\n",
47+
"hadoop_conf.set(\"fs.s3a.endpoint\", GARAGE_ENDPOINT)\n",
48+
"hadoop_conf.set(\"fs.s3a.access.key\", GARAGE_ACCESS_KEY)\n",
49+
"hadoop_conf.set(\"fs.s3a.secret.key\", GARAGE_SECRET_KEY)\n",
50+
"hadoop_conf.set(\"fs.s3a.endpoint.region\", \"garage\")\n",
51+
"hadoop_conf.set(\"fs.s3a.path.style.access\", \"true\")\n",
52+
"hadoop_conf.set(\"fs.s3a.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n",
53+
"hadoop_conf.set(\"fs.s3a.connection.ssl.enabled\", \"false\")"
54+
]
1955
},
2056
{
2157
"cell_type": "markdown",
2258
"metadata": {},
23-
"source": "## 2. Création bucket Silver"
59+
"source": [
60+
"## 2. Création bucket Silver"
61+
]
2462
},
2563
{
2664
"cell_type": "code",
2765
"execution_count": null,
2866
"metadata": {},
2967
"outputs": [],
30-
"source": "# Bucket \"silver\" à créer manuellement via Garage CLI/WebUI si nécessaire"
68+
"source": [
69+
"# Bucket \"silver\" à créer manuellement via Garage CLI/WebUI si nécessaire"
70+
]
3171
},
3272
{
3373
"cell_type": "markdown",
3474
"metadata": {},
35-
"source": "## 3. Lecture Bronze"
36-
},
37-
{
38-
"cell_type": "code",
39-
"execution_count": null,
40-
"metadata": {},
41-
"outputs": [],
42-
"source": "stories_bronze = spark.read.format(\"delta\").load(f\"{BRONZE_PATH}/stories\")\ncomments_bronze = spark.read.format(\"delta\").load(f\"{BRONZE_PATH}/comments\")\n\nprint(f\"Stories: {stories_bronze.count()}, Comments: {comments_bronze.count()}\")"
75+
"source": [
76+
"## 3. Lecture Bronze"
77+
]
4378
},
4479
{
4580
"cell_type": "code",
4681
"execution_count": null,
4782
"metadata": {},
4883
"outputs": [],
49-
"source": "stories_bronze.printSchema()"
50-
},
51-
{
52-
"cell_type": "markdown",
53-
"metadata": {},
54-
"source": "## 4. Fonctions de nettoyage"
84+
"source": [
85+
"stories_bronze = spark.read.format(\"delta\").load(f\"{BRONZE_PATH}/stories\")\n",
86+
"comments_bronze = spark.read.format(\"delta\").load(f\"{BRONZE_PATH}/comments\")\n",
87+
"\n",
88+
"print(f\"Stories: {stories_bronze.count()}, Comments: {comments_bronze.count()}\")"
89+
]
5590
},
5691
{
5792
"cell_type": "code",
5893
"execution_count": null,
5994
"metadata": {},
6095
"outputs": [],
61-
"source": "from pyspark.sql.functions import col, when, regexp_replace, regexp_extract, length, trim, coalesce, lit\n\ndef clean_html(column):\n c = col(column)\n c = regexp_replace(c, r\"<[^>]+>\", \" \")\n c = regexp_replace(c, r\"\\s+\", \" \")\n\n html_entities = {\n r\"&#x27;\": \"'\",\n r\"&#x2F;\": \"/\",\n r\"&quot;\": '\"',\n r\"&amp;\": \"&\",\n r\"&lt;\": \"<\",\n r\"&gt;\": \">\"\n }\n for k, v in html_entities.items():\n c = regexp_replace(c, k, v)\n\n return when(col(column).isNull(), lit(\"\")).otherwise(trim(c))\n\ndef extract_domain(column):\n return regexp_extract(col(column), r\"https?://(?:www\\.)?([^/]+)\", 1)"
96+
"source": [
97+
"stories_bronze.printSchema()"
98+
]
6299
},
63100
{
64101
"cell_type": "markdown",
65102
"metadata": {},
66-
"source": "## 5. Nettoyage Stories"
103+
"source": [
104+
"## 4. Fonctions de nettoyage"
105+
]
67106
},
68107
{
69108
"cell_type": "code",
70109
"execution_count": null,
71110
"metadata": {},
72111
"outputs": [],
73-
"source": "stories_silver = stories_bronze \\\n .filter(col(\"id\").isNotNull()) \\\n .dropDuplicates([\"id\"]) \\\n .withColumn(\"text_clean\", clean_html(\"text\")) \\\n .withColumn(\"domain\", extract_domain(\"url\")) \\\n .select(\"id\", \"by\", \"title\", \"url\", \"domain\", \"score\", \"descendants\", \n \"text_clean\", \"timestamp\", \"_ingested_at\")\n\nstories_silver.show(3, truncate=40)"
112+
"source": [
113+
"from pyspark.sql.functions import col, when, regexp_replace, regexp_extract, length, trim, coalesce, lit\n",
114+
"\n",
115+
"def clean_html(column):\n",
116+
" c = col(column)\n",
117+
" c = regexp_replace(c, r\"<[^>]+>\", \" \")\n",
118+
" c = regexp_replace(c, r\"\\s+\", \" \")\n",
119+
"\n",
120+
" html_entities = {\n",
121+
" r\"&#x27;\": \"'\",\n",
122+
" r\"&#x2F;\": \"/\",\n",
123+
" r\"&quot;\": '\"',\n",
124+
" r\"&amp;\": \"&\",\n",
125+
" r\"&lt;\": \"<\",\n",
126+
" r\"&gt;\": \">\"\n",
127+
" }\n",
128+
" for k, v in html_entities.items():\n",
129+
" c = regexp_replace(c, k, v)\n",
130+
"\n",
131+
" return when(col(column).isNull(), lit(\"\")).otherwise(trim(c))\n",
132+
"\n",
133+
"def extract_domain(column):\n",
134+
" return regexp_extract(col(column), r\"https?://(?:www\\.)?([^/]+)\", 1)"
135+
]
74136
},
75137
{
76138
"cell_type": "markdown",
77139
"metadata": {},
78-
"source": "## 6. Nettoyage Comments"
140+
"source": [
141+
"## 5. Nettoyage Stories"
142+
]
79143
},
80144
{
81145
"cell_type": "code",
82146
"execution_count": null,
83147
"metadata": {},
84148
"outputs": [],
85-
"source": "comments_silver = comments_bronze \\\n .filter(col(\"id\").isNotNull()) \\\n .filter(coalesce(col(\"deleted\"), lit(False)) == False) \\\n .filter(coalesce(col(\"dead\"), lit(False)) == False) \\\n .dropDuplicates([\"id\"]) \\\n .withColumn(\"text_clean\", clean_html(\"text\")) \\\n .filter(length(col(\"text_clean\")) > 0) \\\n .select(\"id\", \"by\", \"parent\", \"text_clean\", \"timestamp\", \"_ingested_at\")\n\ncomments_silver.show(3, truncate=40)"
149+
"source": [
150+
"stories_silver = stories_bronze \\\n",
151+
" .filter(col(\"id\").isNotNull()) \\\n",
152+
" .dropDuplicates([\"id\"]) \\\n",
153+
" .withColumn(\"text_clean\", clean_html(\"text\")) \\\n",
154+
" .withColumn(\"domain\", extract_domain(\"url\")) \\\n",
155+
" .select(\"id\", \"by\", \"title\", \"url\", \"domain\", \"score\", \"descendants\", \n",
156+
" \"text_clean\", \"timestamp\", \"_ingested_at\")\n",
157+
"\n",
158+
"stories_silver.show(3, truncate=40)"
159+
]
86160
},
87161
{
88162
"cell_type": "markdown",
89163
"metadata": {},
90-
"source": "## 7. Jointure Comments + Stories"
164+
"source": [
165+
"## 6. Nettoyage Comments"
166+
]
91167
},
92168
{
93169
"cell_type": "code",
94170
"execution_count": null,
95171
"metadata": {},
96172
"outputs": [],
97-
"source": "stories_for_join = stories_silver.select(\n col(\"id\").alias(\"story_id\"),\n col(\"title\").alias(\"story_title\"),\n col(\"score\").alias(\"story_score\"),\n col(\"domain\").alias(\"story_domain\")\n)\n\ncomments_enriched = comments_silver.join(\n stories_for_join,\n comments_silver[\"parent\"] == stories_for_join[\"story_id\"],\n \"left\"\n)\n\ncomments_enriched.show(3, truncate=30)"
173+
"source": [
174+
"comments_silver = comments_bronze \\\n",
175+
" .filter(col(\"id\").isNotNull()) \\\n",
176+
" .filter(coalesce(col(\"deleted\"), lit(False)) == False) \\\n",
177+
" .filter(coalesce(col(\"dead\"), lit(False)) == False) \\\n",
178+
" .dropDuplicates([\"id\"]) \\\n",
179+
" .withColumn(\"text_clean\", clean_html(\"text\")) \\\n",
180+
" .filter(length(col(\"text_clean\")) > 0) \\\n",
181+
" .select(\"id\", \"by\", \"parent\", \"text_clean\", \"timestamp\", \"_ingested_at\")\n",
182+
"\n",
183+
"comments_silver.show(3, truncate=40)"
184+
]
98185
},
99186
{
100187
"cell_type": "markdown",
101188
"metadata": {},
102-
"source": "## 8. Écriture Silver"
103-
},
104-
{
105-
"cell_type": "code",
106-
"execution_count": null,
107-
"metadata": {},
108-
"outputs": [],
109-
"source": "stories_silver.write.format(\"delta\").mode(\"overwrite\").save(f\"{SILVER_PATH}/stories\")"
189+
"source": [
190+
"## 7. Écriture Silver"
191+
]
110192
},
111193
{
112194
"cell_type": "code",
113195
"execution_count": null,
114196
"metadata": {},
115197
"outputs": [],
116-
"source": "comments_silver.write.format(\"delta\").mode(\"overwrite\").save(f\"{SILVER_PATH}/comments\")"
198+
"source": [
199+
"stories_silver.write.format(\"delta\").mode(\"overwrite\").save(f\"{SILVER_PATH}/stories\")"
200+
]
117201
},
118202
{
119203
"cell_type": "code",
120204
"execution_count": null,
121205
"metadata": {},
122206
"outputs": [],
123-
"source": "comments_enriched.write.format(\"delta\").mode(\"overwrite\").save(f\"{SILVER_PATH}/comments_enriched\")"
207+
"source": [
208+
"comments_silver.write.format(\"delta\").mode(\"overwrite\").save(f\"{SILVER_PATH}/comments\")"
209+
]
124210
},
125211
{
126212
"cell_type": "markdown",
127213
"metadata": {},
128-
"source": "## 9. Vérification"
214+
"source": [
215+
"## 8. Vérification"
216+
]
129217
},
130218
{
131219
"cell_type": "code",
132220
"execution_count": null,
133221
"metadata": {},
134222
"outputs": [],
135-
"source": "spark.read.format(\"delta\").load(f\"{SILVER_PATH}/stories\").show(3, truncate=30)\nspark.read.format(\"delta\").load(f\"{SILVER_PATH}/comments\").show(3, truncate=30)\nspark.read.format(\"delta\").load(f\"{SILVER_PATH}/comments_enriched\").show(3, truncate=30)"
223+
"source": [
224+
"spark.read.format(\"delta\").load(f\"{SILVER_PATH}/stories\").show(3, truncate=30)\n",
225+
"spark.read.format(\"delta\").load(f\"{SILVER_PATH}/comments\").show(3, truncate=30)"
226+
]
136227
},
137228
{
138229
"cell_type": "code",
139230
"execution_count": null,
140231
"metadata": {},
141232
"outputs": [],
142-
"source": "spark.read.format(\"delta\").load(f\"{SILVER_PATH}/stories\") \\\n .filter(col(\"domain\") != \"\") \\\n .groupBy(\"domain\").count() \\\n .orderBy(col(\"count\").desc()) \\\n .show(5)"
233+
"source": [
234+
"spark.read.format(\"delta\").load(f\"{SILVER_PATH}/stories\") \\\n",
235+
" .filter(col(\"domain\") != \"\") \\\n",
236+
" .groupBy(\"domain\").count() \\\n",
237+
" .orderBy(col(\"count\").desc()) \\\n",
238+
" .show(5)"
239+
]
143240
},
144241
{
145242
"cell_type": "code",
@@ -158,10 +255,18 @@
158255
"name": "python3"
159256
},
160257
"language_info": {
258+
"codemirror_mode": {
259+
"name": "ipython",
260+
"version": 3
261+
},
262+
"file_extension": ".py",
263+
"mimetype": "text/x-python",
161264
"name": "python",
162-
"version": "3.11.0"
265+
"nbconvert_exporter": "python",
266+
"pygments_lexer": "ipython3",
267+
"version": "3.12.11"
163268
}
164269
},
165270
"nbformat": 4,
166271
"nbformat_minor": 4
167-
}
272+
}

0 commit comments

Comments
 (0)