Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
7878b0a
Add sparkccfile.py to support file-wise processing in spark jobs (use…
jt55401 Jul 31, 2024
05388f0
Merge CCFileProcessorSparkJob into sparkcc.py
jt55401 Aug 3, 2024
6a97769
Add CCFileProcessorSparkJob example and link in readme for it.
jt55401 Aug 3, 2024
44d935a
fix s3 functions so they work in spark environment
jt55401 Sep 10, 2024
c4883e1
fix bug when the file was not downloaded from s3
jt55401 Sep 11, 2024
9116b79
fix: fix scheme to recognize s3a and s3n
jt55401 Nov 27, 2024
28aabda
fix: don't log 404 errors when checking for existence
jt55401 Nov 27, 2024
002583e
docs: update description of MD5Sum job
sebastian-nagel May 27, 2025
ae1b05f
refactor: Python code style
sebastian-nagel May 27, 2025
307977f
fix(CCFileProcessor): catch exceptions while opening local files,
sebastian-nagel Aug 19, 2025
d0ac339
refactor(CCFileProcessor): rename command-line option
sebastian-nagel Aug 19, 2025
898c76b
refactor(CCFileProcessor): remove unused methods and code
sebastian-nagel Aug 19, 2025
bf1e2ba
Bundle functionality shared between methods of CCSparkJob
sebastian-nagel Sep 17, 2025
a75f32a
CCFileProcessor: Raise an exception if input or output
sebastian-nagel Sep 18, 2025
090bda2
Experimental WARC-to-WET extractor
sebastian-nagel Nov 1, 2025
b5ca990
docs: improve descriptions of CCSparkJob and derived core classes
sebastian-nagel Sep 19, 2025
0c31e7c
Add more FastWARC examples and complete README
sebastian-nagel Sep 19, 2025
2df8032
fix(README): update Hadoop version and instructions about S3/S3A support
sebastian-nagel Nov 1, 2025
15113e8
Refactor WET extractor to use identified charset
sebastian-nagel Oct 29, 2025
5d6c1a5
Add shell script to launch cc-pyspark jobs on Hadoop
sebastian-nagel Nov 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ spark-warehouse/

# get-data.sh puts data into
crawl-data/
input/
input/
/.pytest_cache/
34 changes: 28 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ This project provides examples how to process the Common Crawl dataset with [Apa

+ [count HTML tags](./html_tag_count.py) in Common Crawl's raw response data (WARC files)

+ [count web server names](./server_count.py) in Common Crawl's metadata (WAT files or WARC files)
+ [count web server names](./server_count.py) in Common Crawl's metadata (HTTP headers in WAT or WARC files)

+ list host names and corresponding [IP addresses](./server_ip_address.py) (WAT files or WARC files)

+ [word count](./word_count.py) (term and document frequency) in Common Crawl's extracted text (WET files)

+ [md5sum](./md5sum.py) Run an external command (`md5sum`) on a list of files from a manifest – WARC, WET, WAT, or any other type of file.

+ [extract links](./wat_extract_links.py) from WAT files and [construct the (host-level) web graph](./hostlinks_to_graph.py) – for further details about the web graphs see the project [cc-webgraph](https://github.com/commoncrawl/cc-webgraph)

+ [WET extractor](./wet_extractor.py), using FastWARC and Resiliparse. See also [Using FastWARC](#using-fastwarc-to-read-warc-files).

+ work with the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see also [cc-index-table](https://github.com/commoncrawl/cc-index-table) and the notes about [querying the columnar index](#querying-the-columnar-index)):

- run a SQL query and [export the result as a table](./cc_index_export.py)
Expand Down Expand Up @@ -65,7 +69,7 @@ This will install v3.5.7 of [the PySpark python package](https://spark.apache.or

Install Spark if (see the [Spark documentation](https://spark.apache.org/docs/latest/) for guidance). Then, ensure that `spark-submit` and `pyspark` are on your `$PATH`, or prepend `$SPARK_HOME/bin` when running eg `$SPARK_HOME/bin/spark-submit`.

> Note: The PySpark package is required if you want to run the tests in `test/`.
> Note: The PySpark package and "py4j" are required if you want to run the tests in `test/`. The packages are also included in Spark installations at `$SPARK_HOME/python` resp. `$SPARK_HOME/python/lib/py4j-*-src.zip`.

## Compatibility and Requirements

Expand Down Expand Up @@ -155,7 +159,10 @@ As the Common Crawl dataset lives in the Amazon Public Datasets program, you can

3. don't forget to deploy all dependencies in the cluster, see [advanced dependency management](https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management)

4. also the the file `sparkcc.py` needs to be deployed or added as argument `--py-files sparkcc.py` to `spark-submit`. Note: some of the examples require further Python files as dependencies.
4. also the the file `sparkcc.py` needs to be deployed or added as argument `--py-files sparkcc.py` to `spark-submit`. Note: several of the examples require further Python files as dependencies.


The script [run_ccpyspark_job_hadoop.sh](./run_ccpyspark_job_hadoop.sh) shows an example how to run a Spark job on a Hadoop cluster (Spark on YARN). Please, do not forget to adapt this script to your needs.


### Command-line options
Expand Down Expand Up @@ -206,7 +213,7 @@ Querying the columnar index using cc-pyspark requires authenticated S3 access. T

#### Installation of S3 Support Libraries

While WARC/WAT/WET files are read using boto3, accessing the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see option `--query` of CCIndexSparkJob) is done directly by the SparkSQL engine and requires that S3 support libraries are available. These libs are usually provided when the Spark job is run on a Hadoop cluster running on AWS (eg. EMR). However, they may not be provided for any Spark distribution and are usually absent when running Spark locally (not in a Hadoop cluster). In these situations, the easiest way is to add the libs as required packages by adding `--packages org.apache.hadoop:hadoop-aws:3.2.1` to the arguments of `spark-submit`. This will make [Spark manage the dependencies](https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management) - the hadoop-aws package and transitive dependencies are downloaded as Maven dependencies. Note that the required version of hadoop-aws package depends on the Hadoop version bundled with your Spark installation, e.g., Spark 3.2.1 bundled with Hadoop 3.2 ([spark-3.2.1-bin-hadoop3.2.tgz](https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz)).
While WARC/WAT/WET files are read using boto3, accessing the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see option `--query` of CCIndexSparkJob) is done directly by the SparkSQL engine and requires that S3 support libraries are available. These libs are usually provided when the Spark job is run on a Hadoop cluster running on AWS (eg. EMR). However, they may not be provided for any Spark distribution and are usually absent when running Spark locally (not in a Hadoop cluster). In these situations, the easiest way is to add the libs as required packages by adding `--packages org.apache.hadoop:hadoop-aws:3.3.4` to the arguments of `spark-submit`. This will make [Spark manage the dependencies](https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management) - the hadoop-aws package and transitive dependencies are downloaded as Maven dependencies. Note that the required version of hadoop-aws package depends on the Hadoop version bundled with your Spark installation, e.g., Spark 3.5.6 bundled with Hadoop 3.3.4 ([spark-3.5.6-bin-hadoop3.tgz](https://archive.apache.org/dist/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz)). Please check your Spark package and the underlying Hadoop installation for the correct version.

Please also note that:
- the schema of the URL referencing the columnar index depends on the actual S3 file system implementation: it's `s3://` on EMR but `s3a://` when using [s3a](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Introducing_the_Hadoop_S3A_client.).
Expand All @@ -217,7 +224,8 @@ Please also note that:
Below an example call to count words in 10 WARC records host under the `.is` top-level domain using the `--packages` option:
```
spark-submit \
--packages org.apache.hadoop:hadoop-aws:3.3.2 \
--packages org.apache.hadoop:hadoop-aws:3.3.4 \
--conf spark.sql.parquet.mergeSchema=true \
./cc_index_word_count.py \
--input_base_url s3://commoncrawl/ \
--query "SELECT url, warc_filename, warc_record_offset, warc_record_length, content_charset FROM ccindex WHERE crawl = 'CC-MAIN-2020-24' AND subset = 'warc' AND url_host_tld = 'is' LIMIT 10" \
Expand All @@ -241,7 +249,7 @@ Alternatively, it's possible configure the table schema explicitly:
- and use it by adding the command-line argument `--table_schema cc-index-schema-flat.json`.


### Using FastWARC to parse WARC files
### Using FastWARC to read WARC files

> [FastWARC](https://resiliparse.chatnoir.eu/en/latest/man/fastwarc.html) is a high-performance WARC parsing library for Python written in C++/Cython. The API is inspired in large parts by WARCIO, but does not aim at being a drop-in replacement.

Expand All @@ -255,6 +263,20 @@ Some differences between the warcio and FastWARC APIs are hidden from the user i

However, it's recommended that you carefully verify that your custom job implementation works in combination with FastWARC. There are subtle differences between the warcio and FastWARC APIs, including the underlying classes (WARC/HTTP headers and stream implementations). In addition, FastWARC does not support for legacy ARC files and does not automatically decode HTTP content and transfer encodings (see [Resiliparse HTTP Tools](https://resiliparse.chatnoir.eu/en/latest/man/parse/http.html#read-chunked-http-payloads)). While content and transfer encodings are already decoded in Common Crawl WARC files, this may not be the case for WARC files from other sources. See also [WARC 1.1 specification, http/https response records](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#http-and-https-schemes).

FastWARC allows to filter unwanted WARC record types at parse time, e.g., skip request records immediately not even passing them forward to the caller. To get the maximum performance from FastWARC, it's recommended to utilize the filters by setting the static class variable `fastwarc_record_filter`.

The following examples are ported to use FastWARC:
+ [count HTML tags](./html_tag_count_fastwarc.py)
+ [count web server names](./server_count_fastwarc.py)
+ list host names and corresponding [IP addresses](./server_ip_address_fastwarc.py)
+ [word count](./word_count_fastwarc.py)

In addition, the following tools are implemented using FastWARC:
+ [extract host-level links](./hostlinks_extract_fastwarc.py)
+ [WET extractor](./wet_extractor.py)

Please refer to the above [description of examples](#common-crawl-pyspark-examples) for additional details.


## Running the Tests

Expand Down
18 changes: 18 additions & 0 deletions html_tag_count_fastwarc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from fastwarc.warc import WarcRecordType

from sparkcc_fastwarc import CCFastWarcSparkJob
from html_tag_count import TagCountJob


class TagCountFastWarcJob(TagCountJob, CCFastWarcSparkJob):
""" Count HTML tag names in Common Crawl WARC files
using FastWARC to read WARC files"""

name = "TagCount"

fastwarc_record_filter = WarcRecordType.response


if __name__ == '__main__':
job = TagCountFastWarcJob()
job.run()
24 changes: 24 additions & 0 deletions md5sum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import subprocess

from sparkcc import CCFileProcessorSparkJob
from pyspark.sql.types import StructType, StructField, StringType


class MD5Sum(CCFileProcessorSparkJob):
"""MD5 sum of each file, calling the command-line utility 'md5sum'"""

name = "MD5Sum"

output_schema = StructType([
StructField("uri", StringType(), True),
StructField("md5", StringType(), True),
])

def process_file(self, uri, tempfd):
proc = subprocess.run(['md5sum', tempfd.name], capture_output=True, check=True, encoding='utf8')
digest = proc.stdout.rstrip().split()[0]
yield uri, digest

if __name__ == '__main__':
job = MD5Sum()
job.run()
84 changes: 84 additions & 0 deletions run_ccpyspark_job_hadoop.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/bin/sh

# example shell script to run a cc-pyspark job on a Hadoop cluster (Spark on YARN)

SCRIPT="$1"
WAREHOUSE="$2"

if [ -z "$SCRIPT" ] || [ -z "$WAREHOUSE" ]; then
echo "Usage: $0 <script> <warehouse> <args>..."
echo " Run a cc-pyspark job in Spark/Hadoop cluster"
echo
echo "Arguments:"
echo " <script> cc-pyspark job implementation"
echo " <warehouse> Spark SQL warehouse directory"
echo " <args>... remaining args are passed to the job"
echo
echo "Example:"
echo " $0 server_count.py hdfs:///user/max/counts \\"
echo " wat_sample.paths servers"
echo
echo "Note: don't forget to adapt the number of executors,"
echo " input/output partitions, the memory requirements"
echo " and other parameters at your need!"
echo " Some params can be set per environment variable."
exit 1
fi

# strip SCRIPT and WAREHOUSE from argument list
shift 2

SPARK_ON_YARN="--master yarn"
SPARK_HADOOP_OPTS=""
SPARK_EXTRA_OPTS=""

# defines SPARK_HOME, SPARK_HADOOP_OPTS and HADOOP_CONF_DIR
. $HOME/workspace/spark/spark_env.sh

NUM_EXECUTORS=${NUM_EXECUTORS:-1}
EXECUTOR_MEM=${EXECUTOR_MEM:-4g}
EXECUTOR_CORES=${EXECUTOR_CORES:-2}

# access data via S3
INPUT_BASE_URL="s3://commoncrawl/"

# temporary directory
# - must exist on task/compute nodes for buffering data
# - should provide several GBs of free space to hold temporarily
# the downloaded data (WARC, WAT, WET files)
TMPDIR=/data/0/tmp

export PYSPARK_PYTHON="python" # or "python3"

# Python dependencies (for simplicity, include all Python files: cc-pyspark/*.py)
PYFILES=$(ls sparkcc.py sparkcc_fastwarc.py *.py | sort -u | tr '\n' ',')



set -xe

$SPARK_HOME/bin/spark-submit \
$SPARK_ON_YARN \
$SPARK_HADOOP_OPTS \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.task.maxFailures=5 \
--conf spark.executor.memory=$EXECUTOR_MEM \
--conf spark.driver.memory=3g \
--conf spark.core.connection.ack.wait.timeout=600s \
--conf spark.network.timeout=300s \
--conf spark.shuffle.io.maxRetries=50 \
--conf spark.shuffle.io.retryWait=600s \
--conf spark.locality.wait=1s \
--conf spark.io.compression.codec=zstd \
--conf spark.checkpoint.compress=true \
--conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \
--num-executors $NUM_EXECUTORS \
--executor-cores $EXECUTOR_CORES \
--executor-memory $EXECUTOR_MEM \
--conf spark.sql.warehouse.dir=$WAREHOUSE \
--conf spark.sql.parquet.outputTimestampType=TIMESTAMP_MILLIS \
--py-files $PYFILES \
$SCRIPT \
--input_base_url $INPUT_BASE_URL \
--local_temp_dir $TMPDIR \
"$@"
2 changes: 1 addition & 1 deletion server_count_fastwarc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
class ServerCountFastWarcJob(ServerCountJob, CCFastWarcSparkJob):
""" Count server names sent in HTTP response header
(WARC and WAT is allowed as input) using FastWARC
to parse WARC files"""
to read WARC files"""

name = "CountServers"

Expand Down
22 changes: 22 additions & 0 deletions server_ip_address_fastwarc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from fastwarc.warc import WarcRecordType

from sparkcc_fastwarc import CCFastWarcSparkJob
from server_ip_address import ServerIPAddressJob


class ServerIPAddressFastWarcJob(ServerIPAddressJob, CCFastWarcSparkJob):
""" Collect server IP addresses from WARC response records
(WARC and WAT is allowed as input) using FastWARC
to parse WARC files"""

name = "ServerIPAddresses"

# process only WARC request or metadata (including WAT) records
# Note: restrict the filter accordingly, depending on whether
# WARC or WAT files are used
fastwarc_record_filter = WarcRecordType.request | WarcRecordType.metadata


if __name__ == "__main__":
job = ServerIPAddressFastWarcJob()
job.run()
Loading