Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ Spark 4.0.0 is supported in the version `0.1.11` and later (need Java 17 and Sca
Binary package is available in the Maven Central Repository.


- **Spark 3.5.***: com.stabrise:spark-pdf-spark35_2.12:0.1.16
- **Spark 3.5.***: com.stabrise:spark-pdf-spark35_2.12:0.1.17
- **Spark 3.4.***: com.stabrise:spark-pdf-spark34_2.12:0.1.11 (issue with publishing fresh version)
- **Spark 3.3.***: com.stabrise:spark-pdf-spark33_2.12:0.1.16
- **Spark 4.0.***: com.stabrise:spark-pdf-spark40_2.13:0.1.16
- **Spark 3.3.***: com.stabrise:spark-pdf-spark33_2.12:0.1.17
- **Spark 4.0.***: com.stabrise:spark-pdf-spark40_2.13:0.1.17

## Options for the data source:

Expand All @@ -96,6 +96,7 @@ Binary package is available in the Maven Central Repository.
- `pagePerPartition`: Number pages per partition in Spark DataFrame. Default: "5".
- `reader`: Supports: `pdfBox` - based on PdfBox java lib, `gs` - based on GhostScript (need installation GhostScipt to the system)
- `ocrConfig`: Tesseract OCR configuration. Default: "psm=3". For more information see [Tesseract OCR Params](TesseractParams.md)
- `password`: Password for protected PDF files

## Output Columns in the DataFrame:

Expand Down Expand Up @@ -158,6 +159,7 @@ val df = spark.read.format("pdf")
.option("pagePerPartition", "2")
.option("reader", "pdfBox")
.option("ocrConfig", "psm=11")
.option("password", "pdf_password")
.load("path to the pdf file(s)")

df.select("path", "document").show()
Expand All @@ -180,6 +182,7 @@ df = spark.read.format("pdf") \
.option("pagePerPartition", "2") \
.option("reader", "pdfBox") \
.option("ocrConfig", "psm=11") \
.option("password", "pdf_password") \
.load("path to the pdf file(s)")

df.select("path", "document").show()
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import xerial.sbt.Sonatype.sonatypeCentralHost
import xerial.sbt.Sonatype.GitHubHosting

ThisBuild / version := "0.1.16"
ThisBuild / version := "0.1.17"

ThisBuild / scalaVersion := scala.util.Properties.envOrElse("SCALA_VERSION", "2.12.15") // "2.13.14", "2.12.15"
ThisBuild / organization := "com.stabrise"
Expand Down
Binary file added examples/test_encrypted.pdf
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ object PdfPartitionedFileUtil {
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
partitionValues: InternalRow,
options: Map[String,String]
): Seq[PartitionedFile] = {
val path = filePath
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())

// Load the PDF document
val document = PDDocument.load(fs.open(file.getPath))
val document = PDDocument.load(fs.open(file.getPath), options.getOrElse("password", ""))
val page_num = document.getNumberOfPages
document.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ object PdfPartitionedFileUtil {
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
partitionValues: InternalRow,
options: Map[String,String]
): Seq[PartitionedFile] = {
val path = filePath
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())

// Load the PDF document
val document = PDDocument.load(fs.open(file.getPath))
val document = PDDocument.load(fs.open(file.getPath), options.getOrElse("password", ""))
val page_num = document.getNumberOfPages
document.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ object PdfPartitionedFileUtil {
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
partitionValues: InternalRow,
options: Map[String,String]): Seq[PartitionedFile] = {
val path = filePath
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())

// Load the PDF document
val document = PDDocument.load(fs.open(file.getPath))
val document = PDDocument.load(fs.open(file.getPath), options.getOrElse("password", ""))
val page_num = document.getNumberOfPages
document.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ object PdfPartitionedFileUtil {
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
partitionValues: InternalRow,
options: Map[String,String]
): Seq[PartitionedFile] = {
val path = filePath
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())

// Load the PDF document
val document = PDDocument.load(fs.open(file.getPath))
val document = PDDocument.load(fs.open(file.getPath), options.getOrElse("password", ""))
val page_num = document.getNumberOfPages
document.close()

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/datasources/PdfPartitionReaderPDFBox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class PdfPartitionReaderPDFBox(inputPartition: FilePartition,
val hdfsPath = PdfPartitionedFileUtil.getHdfsPath(file)
val fs = hdfsPath.getFileSystem(broadcastedConf.value.value)
val status = fs.getFileStatus(hdfsPath)
document = PDDocument.load(fs.open(status.getPath))
document = PDDocument.load(fs.open(status.getPath), options.getOrElse("password", ""))
stripper = new PDFTextStripper()
pdfRenderer = new PDFRenderer(document)
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/datasources/PdfScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ case class PdfScan(
filePath = filePath,
isSplitable = true,
maxSplitBytes = maxSplitBytes,
partitionValues = partitionValues
partitionValues = partitionValues,
options = options.asScala.toMap,
)
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
}
Expand Down
Binary file added src/test/resources/pdfs/test_encrypted.pdf
Binary file not shown.
39 changes: 34 additions & 5 deletions src/test/scala/PdfDatasourceSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ class PdfDatasourceSuite extends AnyFunSuite with BeforeAndAfterEach {
checkOcrResulst(filePath, fileName, pdfDF)
}

test("PDFDataSource with PdfBox and protected PDF") {

val (filePath, fileName, pdfDF) = readPdf(
PdfReader.PDF_BOX,
"pdfs/test_encrypted.pdf",
extraOptions = Map("password" -> "tzXT4swEx8YFJH")
)
pdfDF.count() shouldBe 1
pdfDF.columns should contain allOf("path", "page_number", "text", "image", "partition_number")
pdfDF.rdd.partitions.length shouldBe 1
val data = pdfDF.select("document", "filename", "path").collect()

data.head.getString(1) shouldBe fileName
data.head.getString(2) should include(filePath)

val document = Document(data.head.getAs[Row](0))
document.path should include(filePath)
document.text should include("Test PDF file with password")
}

private def checkOcrResulst(filePath: String, fileName: String, pdfDF: DataFrame): Unit = {
pdfDF.count() shouldBe 10
pdfDF.columns should contain allOf("path", "page_number", "text", "image", "partition_number")
Expand All @@ -81,19 +101,28 @@ class PdfDatasourceSuite extends AnyFunSuite with BeforeAndAfterEach {
//pdfDF.select("document.*").show(2, truncate = true)
}

private def readPdf(reader: String, filePath: String = "pdfs/example_image_10_page.pdf") = {
private def readPdf(
reader: String,
filePath: String = "pdfs/example_image_10_page.pdf",
extraOptions: Map[String, String] = Map.empty
) = {
val fileName = Paths.get(filePath).getFileName.toString
val pdfPath = getClass.getClassLoader.getResource(filePath).getPath

// Read data using PDF data source
val pdfDF = spark.read.format("pdf")
// Build the reader with default and extra options
var readerBuilder = spark.read.format("pdf")
.option("imageType", ImageType.BINARY)
.option("resolution", "200")
.option("pagePerPartition", "2")
.option("reader", reader)
.option("ocrConfig", "psm=11")
.load(pdfPath)
.cache()

// Apply extra options
extraOptions.foreach { case (key, value) =>
readerBuilder = readerBuilder.option(key, value)
}

val pdfDF = readerBuilder.load(pdfPath).cache()
(filePath, fileName, pdfDF)
}

Expand Down