Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.spark.ml.feature.{CountVectorizer, IDF}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -100,13 +100,34 @@ class AssembleDocumentTermMatrix(private val spark: SparkSession) extends Serial
}

/**
* Returns a document-term matrix where each element is the TF-IDF of the row's document and
* the column's term.
*
* @param docTexts a DF with two columns: title and text
*/
* Add "nice" row ids to a dataframe, using `zipWithUniqueId`. Compared to:
* {{
* import org.apache.spark.sql.functions._
* df.withColumn("id",monotonically_increasing_id)
* }}
* which creates ids of more than 10 digits (8589934597), this one generates smaller ids.
*
* @param spark
* @param df
* @return
*/
def addNiceRowId(spark: SparkSession, df: DataFrame): DataFrame = {
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val schema = df.schema
val rowsWithId = df.rdd.zipWithUniqueId.map {
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)
}
spark.sqlContext.createDataFrame(rowsWithId, StructType(StructField("id", LongType, nullable = false) +: df.schema.fields))
}

/**
* Returns a document-term matrix where each element is the TF-IDF of the row's document and
* the column's term.
*
* @param docTexts a DF with two columns: title and text
*/
def documentTermMatrix(docTexts: Dataset[(String, String)], stopWordsFile: String, numTerms: Int)
: (DataFrame, Array[String], Map[Long, String], Array[Double]) = {
: (DataFrame, Array[String], Array[Double]) = {
val terms = contentsToTerms(docTexts, stopWordsFile)

val termsDF = terms.toDF("title", "terms")
Expand All @@ -121,12 +142,10 @@ class AssembleDocumentTermMatrix(private val spark: SparkSession) extends Serial

docTermFreqs.cache()

val docIds = docTermFreqs.rdd.map(_.getString(0)).zipWithUniqueId().map(_.swap).collect().toMap

val idf = new IDF().setInputCol("termFreqs").setOutputCol("tfidfVec")
val idfModel = idf.fit(docTermFreqs)
val docTermMatrix = idfModel.transform(docTermFreqs).select("title", "tfidfVec")

(docTermMatrix, termIds, docIds, idfModel.idf.toArray)
(addNiceRowId(spark, docTermMatrix), termIds, idfModel.idf.toArray)
}
}
71 changes: 38 additions & 33 deletions ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/RunLSA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,43 @@
package com.cloudera.datascience.lsa

import breeze.linalg.{DenseMatrix => BDenseMatrix, SparseVector => BSparseVector}

import org.apache.spark.mllib.linalg.{Matrices, Matrix, SingularValueDecomposition, Vectors, Vector => MLLibVector}
import org.apache.spark.mllib.linalg.{Matrices, Matrix, SingularValueDecomposition, Vector, Vectors}
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.Map
import scala.collection.mutable.ArrayBuffer

object RunLSA {
def main(args: Array[String]): Unit = {
val k = if (args.length > 0) args(0).toInt else 100
val numTerms = if (args.length > 1) args(1).toInt else 20000
val wikidumpPath = if(args.length > 0) args(0) else "hdfs:///user/ds/Wikipedia/"
val k = if (args.length > 1) args(1).toInt else 100
val numTerms = if (args.length > 2) args(2).toInt else 20000

val spark = SparkSession.builder().config("spark.serializer", classOf[KryoSerializer].getName).getOrCreate()
val assembleMatrix = new AssembleDocumentTermMatrix(spark)
import assembleMatrix._

val docTexts: Dataset[(String, String)] = parseWikipediaDump("hdfs:///user/ds/Wikipedia/")
val docTexts: Dataset[(String, String)] = parseWikipediaDump(wikidumpPath)

val (docTermMatrix, termIds, docIds, termIdfs) = documentTermMatrix(docTexts, "stopwords.txt", numTerms)
val (docTermMatrix, termIds, termIdfs) = documentTermMatrix(docTexts, "stopwords.txt", numTerms)

docTermMatrix.cache()

val vecRdd = docTermMatrix.select("tfidfVec").rdd.map { row =>
Vectors.fromML(row.getAs[MLVector]("tfidfVec"))
import spark.implicits._
val docIds = docTermMatrix.select("id", "title").map {
case Row(id: Long, title: String) => (id, title)
}.rdd.collectAsMap

val vecRdd = docTermMatrix.select("id", "tfidfVec").rdd.map {
case Row(id: Long, v: MLVector) => IndexedRow(id, Vectors.fromML(v))
}

vecRdd.cache()
val mat = new RowMatrix(vecRdd)
val svd = mat.computeSVD(k, computeU=true)
val mat = new IndexedRowMatrix(vecRdd)
val svd: SingularValueDecomposition[IndexedRowMatrix, Matrix] = mat.computeSVD(k, computeU = true)

println("Singular values: " + svd.s)
val topConceptTerms = topTermsInTopConcepts(svd, 10, 10, termIds)
Expand Down Expand Up @@ -74,7 +79,7 @@ object RunLSA {
* @param termIds The mapping of term IDs to terms.
* @return A Seq of top concepts, in order, each with a Seq of top terms, in order.
*/
def topTermsInTopConcepts(svd: SingularValueDecomposition[RowMatrix, Matrix], numConcepts: Int,
def topTermsInTopConcepts(svd: SingularValueDecomposition[IndexedRowMatrix, Matrix], numConcepts: Int,
numTerms: Int, termIds: Array[String]): Seq[Seq[(String, Double)]] = {
val v = svd.V
val topTerms = new ArrayBuffer[Seq[(String, Double)]]()
Expand All @@ -98,28 +103,28 @@ object RunLSA {
* @param docIds The mapping of document IDs to terms.
* @return A Seq of top concepts, in order, each with a Seq of top terms, in order.
*/
def topDocsInTopConcepts(svd: SingularValueDecomposition[RowMatrix, Matrix], numConcepts: Int,
def topDocsInTopConcepts(svd: SingularValueDecomposition[IndexedRowMatrix, Matrix], numConcepts: Int,
numDocs: Int, docIds: Map[Long, String]): Seq[Seq[(String, Double)]] = {
val u = svd.U
val topDocs = new ArrayBuffer[Seq[(String, Double)]]()
for (i <- 0 until numConcepts) {
val docWeights = u.rows.map(_.toArray(i)).zipWithUniqueId
topDocs += docWeights.top(numDocs).map { case (score, id) => (docIds(id), score) }
val docWeights = u.rows.map(t => (t.index, t.vector.toArray(i)))
topDocs += docWeights.top(numDocs).map { case (id, score) => (docIds(id), score) }
}
topDocs
}
}

class LSAQueryEngine(
val svd: SingularValueDecomposition[RowMatrix, Matrix],
val svd: SingularValueDecomposition[IndexedRowMatrix, Matrix],
val termIds: Array[String],
val docIds: Map[Long, String],
val termIdfs: Array[Double]) {

val VS: BDenseMatrix[Double] = multiplyByDiagonalMatrix(svd.V, svd.s)
val normalizedVS: BDenseMatrix[Double] = rowsNormalized(VS)
val US: RowMatrix = multiplyByDiagonalRowMatrix(svd.U, svd.s)
val normalizedUS: RowMatrix = distributedRowsNormalized(US)
val US: IndexedRowMatrix = multiplyByDiagonalRowMatrix(svd.U, svd.s)
val normalizedUS: IndexedRowMatrix = distributedRowsNormalized(US)

val idTerms: Map[String, Int] = termIds.zipWithIndex.toMap
val idDocs: Map[String, Long] = docIds.map(_.swap)
Expand All @@ -128,7 +133,7 @@ class LSAQueryEngine(
* Finds the product of a dense matrix and a diagonal matrix represented by a vector.
* Breeze doesn't support efficient diagonal representations, so multiply manually.
*/
def multiplyByDiagonalMatrix(mat: Matrix, diag: MLLibVector): BDenseMatrix[Double] = {
def multiplyByDiagonalMatrix(mat: Matrix, diag: Vector): BDenseMatrix[Double] = {
val sArr = diag.toArray
new BDenseMatrix[Double](mat.numRows, mat.numCols, mat.toArray)
.mapPairs { case ((r, c), v) => v * sArr(c) }
Expand All @@ -137,12 +142,12 @@ class LSAQueryEngine(
/**
* Finds the product of a distributed matrix and a diagonal matrix represented by a vector.
*/
def multiplyByDiagonalRowMatrix(mat: RowMatrix, diag: MLLibVector): RowMatrix = {
def multiplyByDiagonalRowMatrix(mat: IndexedRowMatrix, diag: Vector): IndexedRowMatrix = {
val sArr = diag.toArray
new RowMatrix(mat.rows.map { vec =>
val vecArr = vec.toArray
val newArr = (0 until vec.size).toArray.map(i => vecArr(i) * sArr(i))
Vectors.dense(newArr)
new IndexedRowMatrix(mat.rows.map { r =>
val vecArr = r.vector.toArray
val newArr = (0 until r.vector.size).toArray.map(i => vecArr(i) * sArr(i))
IndexedRow(r.index, Vectors.dense(newArr))
})
}

Expand All @@ -161,11 +166,11 @@ class LSAQueryEngine(
/**
* Returns a distributed matrix where each row is divided by its length.
*/
def distributedRowsNormalized(mat: RowMatrix): RowMatrix = {
new RowMatrix(mat.rows.map { vec =>
val array = vec.toArray
def distributedRowsNormalized(mat: IndexedRowMatrix): IndexedRowMatrix = {
new IndexedRowMatrix(mat.rows.map { r =>
val array = r.vector.toArray
val length = math.sqrt(array.map(x => x * x).sum)
Vectors.dense(array.map(_ / length))
IndexedRow(r.index, Vectors.dense(array.map(_ / length)))
})
}

Expand All @@ -181,7 +186,7 @@ class LSAQueryEngine(
val docScores = US.multiply(rowVec)

// Find the docs with the highest scores
val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId
val allDocWeights = docScores.rows.map(t => (t.vector.toArray(0), t.index))
allDocWeights.top(10)
}

Expand All @@ -206,14 +211,14 @@ class LSAQueryEngine(
*/
def topDocsForDoc(docId: Long): Seq[(Double, Long)] = {
// Look up the row in US corresponding to the given doc ID.
val docRowArr = normalizedUS.rows.zipWithUniqueId.map(_.swap).lookup(docId).head.toArray
val docRowArr = normalizedUS.rows.map(t => (t.index, t.vector)).lookup(docId).head.toArray
val docRowVec = Matrices.dense(docRowArr.length, 1, docRowArr)

// Compute scores against every doc
val docScores = normalizedUS.multiply(docRowVec)

// Find the docs with the highest scores
val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId
val allDocWeights = docScores.rows.map(t => (t.vector.toArray(0), t.index))

// Docs can end up with NaN score if their row in U is all zeros. Filter these out.
allDocWeights.filter(!_._1.isNaN).top(10)
Expand Down Expand Up @@ -242,7 +247,7 @@ class LSAQueryEngine(
val docScores = US.multiply(termRowVec)

// Find the docs with the highest scores
val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId
val allDocWeights = docScores.rows.map(t => (t.vector.toArray(0), t.index))
allDocWeights.top(10)
}

Expand Down