Skip to content

Commit 02cf427

Browse files
authored
Prepare for release v0.3.6 (#155)
* [sbt] version updates * [sbt] disable build for scala 2.12 * [conf] allow not_analyzed string fields (#145) * [not-analyzed-fields] do not analyzed fields ending with _notanalyzed * [sbt] version updates * [sbt] disable build for scala 2.12 * [conf] allow not_analyzed string fields (#145) * [not-analyzed-fields] do not analyzed fields ending with _notanalyzed * Revert "Revert "Setting version to 0.3.5-SNAPSHOT"" This reverts commit a6da0af. * [build] update Lucene to 7.7.0 * Hotfix: issue 150 (#151) * Remove unused code (#141) * Revert "Setting version to 0.3.4-SNAPSHOT" This reverts commit 2f1d7be. * README: update to 0.3.3 * README: fix javadoc badge * remove unused param * [sbt] version updates * [conf] allow not_analyzed string fields (#145) * [not-analyzed-fields] do not analyzed fields ending with _notanalyzed * [hotfix] fixes issue 150 * [tests] issue 150 * fix typo * [blockEntityLinkage] drop queryPartColumns * [sbt] version updates * [scripts] fix shell * Block linkage: allow a block linker with Row to Query (#154) * [linkage] block linker with => Query * [linkage] block linker is Row => Query * remove Query analyzer on methods * [sbt] set version to 0.3.6-SNAPSHOT
1 parent 40385db commit 02cf427

File tree

9 files changed

+84
-32
lines changed

9 files changed

+84
-32
lines changed

README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ You can link against this library (for Spark 1.4+) in your program at the follow
4848
Using SBT:
4949

5050
```
51-
libraryDependencies += "org.zouzias" %% "spark-lucenerdd" % "0.3.4"
51+
libraryDependencies += "org.zouzias" %% "spark-lucenerdd" % "0.3.6"
5252
```
5353

5454
Using Maven:
@@ -57,15 +57,15 @@ Using Maven:
5757
<dependency>
5858
<groupId>org.zouzias</groupId>
5959
<artifactId>spark-lucenerdd_2.11</artifactId>
60-
<version>0.3.4</version>
60+
<version>0.3.6</version>
6161
</dependency>
6262
```
6363

6464
This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
6565
For example, to include it when starting the spark shell:
6666

6767
```
68-
$ bin/spark-shell --packages org.zouzias:spark-lucenerdd_2.11:0.3.4
68+
$ bin/spark-shell --packages org.zouzias:spark-lucenerdd_2.11:0.3.6
6969
```
7070

7171
Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
@@ -76,7 +76,9 @@ The project has the following compatibility with Apache Spark:
7676

7777
Artifact | Release Date | Spark compatibility | Notes | Status
7878
------------------------- | --------------- | -------------------------- | ----- | ----
79-
0.3.5-SNAPSHOT | | >= 2.3.1, JVM 8 | [develop](https://github.com/zouzias/spark-lucenerdd/tree/develop) | Under Development
79+
0.3.7-SNAPSHOT | | >= 2.4.0, JVM 8 | [develop](https://github.com/zouzias/spark-lucenerdd/tree/develop) | Under Development
80+
0.3.6 | 2019-03-11 | >= 2.4.0, JVM 8 | [tag v0.3.6](https://github.com/zouzias/spark-lucenerdd/tree/v0.3.6) | Released
81+
0.3.5 | 2019-02-7 | >= 2.4.0, JVM 8 | [tag v0.3.5](https://github.com/zouzias/spark-lucenerdd/tree/v0.3.5) | Released
8082
0.3.4 | 2018-11-27 | >= 2.4.0, JVM 8 | [tag v0.3.4](https://github.com/zouzias/spark-lucenerdd/tree/v0.3.4) | Released
8183
0.2.8 | 2017-05-30 | 2.1.x, JVM 7 | [tag v0.2.8](https://github.com/zouzias/spark-lucenerdd/tree/v0.2.8) | Released
8284
0.1.0 | 2016-09-26 | 1.4.x, 1.5.x, 1.6.x| [tag v0.1.0](https://github.com/zouzias/spark-lucenerdd/tree/v0.1.0) | Cross-released with 2.10/2.11

build.sbt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pomExtra := <scm>
7777
</developer>
7878
</developers>
7979

80-
val luceneV = "7.6.0"
80+
val luceneV = "7.7.1"
8181

8282
spName := "zouzias/spark-lucenerdd"
8383
sparkVersion := "2.4.0"
@@ -94,8 +94,8 @@ testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.va
9494

9595

9696
// scalastyle:off
97-
val scalactic = "org.scalactic" %% "scalactic" % "3.0.5"
98-
val scalatest = "org.scalatest" %% "scalatest" % "3.0.5" % "test"
97+
val scalactic = "org.scalactic" %% "scalactic" % "3.0.6"
98+
val scalatest = "org.scalatest" %% "scalatest" % "3.0.6" % "test"
9999

100100
val joda_time = "joda-time" % "joda-time" % "2.10.1"
101101
val algebird = "com.twitter" %% "algebird-core" % "0.13.5"
@@ -111,7 +111,7 @@ val lucene_expressions = "org.apache.lucene" % "lucene-expre
111111
val lucene_spatial = "org.apache.lucene" % "lucene-spatial" % luceneV
112112
val lucene_spatial_extras = "org.apache.lucene" % "lucene-spatial-extras" % luceneV
113113

114-
val jts = "org.locationtech.jts" % "jts-core" % "1.16.0"
114+
val jts = "org.locationtech.jts" % "jts-core" % "1.16.1"
115115
// scalastyle:on
116116

117117

spark-shell.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ CURRENT_DIR=`pwd`
66
SPARK_LUCENERDD_VERSION=`cat version.sbt | awk '{print $5}' | xargs`
77

88
# You should have downloaded this spark version under your ${HOME}
9-
SPARK_VERSION="2.3.1"
9+
SPARK_VERSION="2.4.0"
1010

1111
echo "==============================================="
1212
echo "Loading LuceneRDD with version ${SPARK_LUCENERDD_VERSION}"

src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ object LuceneRDD extends Versionable
469469
*
470470
* @param queries Queries / entities to be linked with @corpus
471471
* @param entities DataFrame of entities to be linked with queries parameter
472-
* @param rowToQueryString Converts each [[Row]] to a 'Lucene Query Syntax'
472+
* @param rowToQuery Function[Row, Query] that converts [[Row]] to a Lucene [[Query]]
473473
* @param queryPartColumns List of query columns for [[HashPartitioner]]
474474
* @param entityPartColumns List of entity columns for [[HashPartitioner]]
475475
* @param topK Number of linked results
@@ -481,7 +481,7 @@ object LuceneRDD extends Versionable
481481
*/
482482
def blockEntityLinkage(queries: DataFrame,
483483
entities: DataFrame,
484-
rowToQueryString: Row => String,
484+
rowToQuery: Row => Query,
485485
queryPartColumns: Array[String],
486486
entityPartColumns: Array[String],
487487
topK : Int = 3,
@@ -496,17 +496,18 @@ object LuceneRDD extends Versionable
496496
"Query Partition columns must be non-empty for block linkage")
497497

498498

499-
val partColumn = "__PARTITION_COLUMN__"
499+
val partColumnLeft = "__PARTITION_COLUMN_LEFT__"
500+
val partColumnRight = "__PARTITION_COLUMN_RIGHT__"
500501

501502
// Prepare input DataFrames for cogroup operation.
502503
// Keyed them on queryPartColumns and entityPartColumns
503504
// I.e., Query/Entity DataFrame are now of type (String, Row)
504-
val blocked = entities.withColumn(partColumn,
505-
concat(entityPartColumns.map(entities.col): _*))
506-
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumn)))
507-
val blockedQueries = queries.withColumn(partColumn,
505+
val blocked = entities.withColumn(partColumnLeft,
508506
concat(entityPartColumns.map(entities.col): _*))
509-
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumn)))
507+
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumnLeft)))
508+
val blockedQueries = queries.withColumn(partColumnRight,
509+
concat(queryPartColumns.map(queries.col): _*)).drop(queryPartColumns: _*)
510+
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumnRight)))
510511

511512
// Cogroup queries and entities. Map over each
512513
// CoGrouped partition and instantiate Lucene index on partitioned
@@ -520,7 +521,7 @@ object LuceneRDD extends Versionable
520521
queryAnalyzer, similarity)
521522

522523
// Multi-query lucene index
523-
qs.map(q => (q, lucenePart.query(rowToQueryString(q), topK).results.toArray))
524+
qs.map(q => (q, lucenePart.query(rowToQuery(q), topK).results.toArray))
524525
}
525526
}
526527
}
@@ -529,7 +530,7 @@ object LuceneRDD extends Versionable
529530
* Deduplication via blocking
530531
*
531532
* @param entities Entities [[DataFrame]] to deduplicate
532-
* @param rowToQueryString Function that maps [[Row]] to Lucene Query String
533+
* @param rowToQuery Function that maps [[Row]] to Lucene [[Query]]
533534
* @param blockingColumns Columns on which exact match is required
534535
* @param topK Number of top-K query results
535536
* @param indexAnalyzer Lucene analyzer at index time
@@ -540,7 +541,7 @@ object LuceneRDD extends Versionable
540541
* @return
541542
*/
542543
def blockDedup(entities: DataFrame,
543-
rowToQueryString: Row => String,
544+
rowToQuery: Row => Query,
544545
blockingColumns: Array[String],
545546
topK : Int = 3,
546547
indexAnalyzer: String = getOrElseEn(IndexAnalyzerConfigName),
@@ -574,7 +575,7 @@ object LuceneRDD extends Versionable
574575
queryAnalyzer, similarity)
575576

576577
// Multi-query lucene index
577-
iterQueries.map(q => (q, lucenePart.query(rowToQueryString(q), topK).results.toArray))
578+
iterQueries.map(q => (q, lucenePart.query(rowToQuery(q), topK).results.toArray))
578579
}
579580
}
580581
}

src/main/scala/org/zouzias/spark/lucenerdd/partition/AbstractLuceneRDDPartition.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.zouzias.spark.lucenerdd.partition
1818

19-
import org.apache.lucene.search.BooleanClause
19+
import org.apache.lucene.search.{BooleanClause, Query}
2020
import org.zouzias.spark.lucenerdd.models.indexstats.IndexStatistics
2121
import org.zouzias.spark.lucenerdd.models.{SparkFacetResult, TermVectorEntry}
2222
import org.zouzias.spark.lucenerdd.response.LuceneRDDResponsePartition
@@ -62,6 +62,16 @@ private[lucenerdd] abstract class AbstractLuceneRDDPartition[T] extends Serializ
6262
*/
6363
def query(searchString: String, topK: Int): LuceneRDDResponsePartition
6464

65+
66+
/**
67+
* Lucene search using Lucene [[Query]]
68+
* @param query Lucene query, i.e., [[org.apache.lucene.search.BooleanQuery]] or
69+
* [[org.apache.lucene.search.PhraseQuery]]
70+
* @param topK Number of documents to return
71+
* @return
72+
*/
73+
def query(query: Query, topK: Int): LuceneRDDResponsePartition
74+
6575
/**
6676
* Multiple generic Lucene Queries using QueryParser
6777
* @param searchString Lucene query string

src/main/scala/org/zouzias/spark/lucenerdd/partition/LuceneRDDPartition.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,13 @@ private[lucenerdd] class LuceneRDDPartition[T]
137137
LuceneRDDResponsePartition(results.toIterator)
138138
}
139139

140+
override def query(query: Query,
141+
topK: Int): LuceneRDDResponsePartition = {
142+
val results = LuceneQueryHelpers.searchQuery(indexSearcher, query, topK)
143+
144+
LuceneRDDResponsePartition(results.toIterator)
145+
}
146+
140147
override def queries(searchStrings: Iterable[String],
141148
topK: Int): Iterable[(String, LuceneRDDResponsePartition)] = {
142149
searchStrings.map( searchString =>

src/main/scala/org/zouzias/spark/lucenerdd/query/LuceneQueryHelpers.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,18 +95,35 @@ object LuceneQueryHelpers extends Serializable {
9595
*
9696
* @param indexSearcher Index searcher
9797
* @param searchString Lucene search query string
98-
* @param topK Number of returned documents
98+
* @param topK Number of documents to return
9999
* @param analyzer Lucene Analyzer
100100
* @return
101101
*/
102102
def searchParser(indexSearcher: IndexSearcher,
103103
searchString: String,
104-
topK: Int, analyzer: Analyzer)
104+
topK: Int,
105+
analyzer: Analyzer)
105106
: Seq[SparkScoreDoc] = {
106107
val q = parseQueryString(searchString, analyzer)
107108
indexSearcher.search(q, topK).scoreDocs.map(SparkScoreDoc(indexSearcher, _))
108109
}
109110

111+
/**
112+
* Lucene search using a Lucene [[Query]]
113+
*
114+
* Important: Query analysis is done during the definition of query
115+
* @param indexSearcher Lucene index searcher
116+
* @param query Lucene query
117+
* @param topK Number of documents to return
118+
* @return
119+
*/
120+
def searchQuery(indexSearcher: IndexSearcher,
121+
query: Query,
122+
topK: Int)
123+
: Seq[SparkScoreDoc] = {
124+
indexSearcher.search(query, topK).scoreDocs.map(SparkScoreDoc(indexSearcher, _))
125+
}
126+
110127
/**
111128
* Faceted search using [[SortedSetDocValuesFacetCounts]]
112129
*

src/test/scala/org/zouzias/spark/lucenerdd/BlockingDedupSpec.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.zouzias.spark.lucenerdd
1818

1919
import com.holdenkarau.spark.testing.SharedSparkContext
20+
import org.apache.lucene.index.Term
21+
import org.apache.lucene.search.{Query, TermQuery}
2022
import org.apache.spark.SparkConf
2123
import org.apache.spark.sql.{Row, SparkSession}
2224
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
@@ -44,10 +46,11 @@ class BlockingDedupSpec extends FlatSpec
4446
}
4547
val df = sc.parallelize(people).repartition(2).toDF()
4648

47-
val linker: Row => String = { row =>
49+
val linker: Row => Query = { row =>
4850
val name = row.getString(row.fieldIndex("name"))
51+
val term = new Term("name", name)
4952

50-
s"name:$name"
53+
new TermQuery(term)
5154
}
5255

5356

src/test/scala/org/zouzias/spark/lucenerdd/BlockingLinkageSpec.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.zouzias.spark.lucenerdd
1818

1919
import com.holdenkarau.spark.testing.SharedSparkContext
20+
import org.apache.lucene.index.Term
21+
import org.apache.lucene.search.{Query, TermQuery}
2022
import org.apache.spark.SparkConf
2123
import org.apache.spark.sql.{Row, SparkSession}
2224
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
@@ -37,24 +39,34 @@ class BlockingLinkageSpec extends FlatSpec
3739
val spark = SparkSession.builder().getOrCreate()
3840
import spark.implicits._
3941

40-
val people: Array[Person] = Array("fear", "death", "water", "fire", "house")
42+
val peopleLeft: Array[Person] = Array("fear", "death", "water", "fire", "house")
4143
.zipWithIndex.map { case (str, index) =>
4244
val email = if (index % 2 == 0) "yes@gmail.com" else "no@gmail.com"
4345
Person(str, index, email)
4446
}
45-
val df = sc.parallelize(people).repartition(2).toDF()
4647

47-
val linker: Row => String = { row =>
48+
val peopleRight: Array[Person] = Array("fear", "death", "water", "fire", "house")
49+
.zipWithIndex.map { case (str, index) =>
50+
val email = if (index % 2 == 0) "yes@gmail.com" else "no@gmail.com"
51+
Person(str, index, email)
52+
}
53+
54+
val leftDF = sc.parallelize(peopleLeft).repartition(2).toDF()
55+
val rightDF = sc.parallelize(peopleRight).repartition(3).toDF()
56+
57+
// Define a Lucene Term linker
58+
val linker: Row => Query = { row =>
4859
val name = row.getString(row.fieldIndex("name"))
60+
val term = new Term("name", name)
4961

50-
s"name:$name"
62+
new TermQuery(term)
5163
}
5264

5365

54-
val linked = LuceneRDD.blockEntityLinkage(df, df, linker,
66+
val linked = LuceneRDD.blockEntityLinkage(leftDF, rightDF, linker,
5567
Array("email"), Array("email"))
5668

57-
val linkedCount, dfCount = (linked.count, df.count())
69+
val linkedCount, dfCount = (linked.count, leftDF.count())
5870

5971
linkedCount should equal(dfCount)
6072

0 commit comments

Comments
 (0)