1717package org .zouzias .spark .lucenerdd .spatial .shape
1818
1919import com .spatial4j .core .shape .Shape
20- import com .twitter .algebird .TopK
20+ import com .twitter .algebird .{ TopK , TopKMonoid }
2121import org .apache .lucene .document .Document
2222import org .apache .lucene .spatial .query .SpatialOperation
2323import org .apache .spark .rdd .RDD
2424import org .apache .spark .storage .StorageLevel
2525import org .apache .spark ._
2626import org .apache .spark .sql .{DataFrame , Row }
27- import org .zouzias .spark .lucenerdd .aggregate . SparkScoreDocAggregatable
27+ import org .zouzias .spark .lucenerdd .config . LuceneRDDConfigurable
2828import org .zouzias .spark .lucenerdd .models .SparkScoreDoc
2929import org .zouzias .spark .lucenerdd .query .LuceneQueryHelpers
3030import org .zouzias .spark .lucenerdd .response .{LuceneRDDResponse , LuceneRDDResponsePartition }
@@ -43,7 +43,7 @@ import scala.reflect.ClassTag
4343class ShapeLuceneRDD [K : ClassTag , V : ClassTag ]
4444 (private val partitionsRDD : RDD [AbstractShapeLuceneRDDPartition [K , V ]])
4545 extends RDD [(K , V )](partitionsRDD.context, List (new OneToOneDependency (partitionsRDD)))
46- with SparkScoreDocAggregatable
46+ with LuceneRDDConfigurable
4747 with Logging {
4848
4949 logInfo(" Instance is created..." )
@@ -87,16 +87,17 @@ class ShapeLuceneRDD[K: ClassTag, V: ClassTag]
8787 * @param f
8888 * @return
8989 */
90- private def docResultsMapper (f : AbstractShapeLuceneRDDPartition [K , V ] =>
90+ private def partitionMapper (f : AbstractShapeLuceneRDDPartition [K , V ] =>
9191 LuceneRDDResponsePartition ): LuceneRDDResponse = {
92- new LuceneRDDResponse (partitionsRDD.map(f(_)))
92+ new LuceneRDDResponse (partitionsRDD.map(f(_)), SparkScoreDoc .ascending )
9393 }
9494
9595 private def linker [T : ClassTag ](that : RDD [T ], pointFunctor : T => PointType ,
9696 mapper : ( PointType , AbstractShapeLuceneRDDPartition [K , V ]) =>
9797 Iterable [SparkScoreDoc ]): RDD [(T , List [SparkScoreDoc ])] = {
9898 logDebug(" Linker requested" )
9999
100+ val topKMonoid = new TopKMonoid [SparkScoreDoc ](MaxDefaultTopKValue )(SparkScoreDoc .ascending)
100101 logDebug(" Collecting query points to driver" )
101102 val queries = that.map(pointFunctor).collect()
102103 logDebug(" Query points collected to driver successfully" )
@@ -107,16 +108,16 @@ class ShapeLuceneRDD[K: ClassTag, V: ClassTag]
107108 logDebug(" Compute topK linkage per partition" )
108109 val resultsByPart : RDD [(Long , TopK [SparkScoreDoc ])] = partitionsRDD.flatMap {
109110 case partition => queriesB.value.zipWithIndex.map { case (queryPoint, index) =>
110- val results = mapper(queryPoint, partition).map(x => SparkDocAscendingTopKMonoid .build(x))
111- .reduceOption(SparkDocAscendingTopKMonoid .plus)
112- .getOrElse(SparkDocAscendingTopKMonoid .zero)
111+ val results = mapper(queryPoint, partition).map(x => topKMonoid .build(x))
112+ .reduceOption(topKMonoid .plus)
113+ .getOrElse(topKMonoid .zero)
113114
114115 (index.toLong, results)
115116 }
116117 }
117118
118119 logDebug(" Merge topK linkage results" )
119- val results = resultsByPart.reduceByKey(SparkDocAscendingTopKMonoid .plus)
120+ val results = resultsByPart.reduceByKey(topKMonoid .plus)
120121 that.zipWithIndex.map(_.swap).join(results)
121122 .map{ case (_, joined) => (joined._1, joined._2.items)}
122123 }
@@ -215,7 +216,7 @@ class ShapeLuceneRDD[K: ClassTag, V: ClassTag]
215216 searchString : String = LuceneQueryHelpers .MatchAllDocsString )
216217 : LuceneRDDResponse = {
217218 logInfo(s " Knn search with query ${queryPoint} and search string ${searchString}" )
218- docResultsMapper (_.knnSearch(queryPoint, k, searchString))
219+ partitionMapper (_.knnSearch(queryPoint, k, searchString))
219220 }
220221
221222 /**
@@ -230,7 +231,7 @@ class ShapeLuceneRDD[K: ClassTag, V: ClassTag]
230231 : LuceneRDDResponse = {
231232 logInfo(s " Circle search with center ${center} and radius ${radius}" )
232233 // Points can only intersect
233- docResultsMapper (_.circleSearch(center, radius, k,
234+ partitionMapper (_.circleSearch(center, radius, k,
234235 SpatialOperation .Intersects .getName))
235236 }
236237
@@ -246,7 +247,7 @@ class ShapeLuceneRDD[K: ClassTag, V: ClassTag]
246247 operationName : String = SpatialOperation .Intersects .getName)
247248 : LuceneRDDResponse = {
248249 logInfo(s " Spatial search with shape ${shapeWKT} and operation ${operationName}" )
249- docResultsMapper (_.spatialSearch(shapeWKT, k, operationName))
250+ partitionMapper (_.spatialSearch(shapeWKT, k, operationName))
250251 }
251252
252253 /**
@@ -261,7 +262,7 @@ class ShapeLuceneRDD[K: ClassTag, V: ClassTag]
261262 operationName : String )
262263 : LuceneRDDResponse = {
263264 logInfo(s " Spatial search with point ${point} and operation ${operationName}" )
264- docResultsMapper (_.spatialSearch(point, k, operationName))
265+ partitionMapper (_.spatialSearch(point, k, operationName))
265266 }
266267
267268 /**
@@ -277,7 +278,7 @@ class ShapeLuceneRDD[K: ClassTag, V: ClassTag]
277278 operationName : String = SpatialOperation .Intersects .getName)
278279 : LuceneRDDResponse = {
279280 logInfo(s " Bounding box with center ${center}, radius ${radius}, k = ${k}" )
280- docResultsMapper (_.bboxSearch(center, radius, k, operationName))
281+ partitionMapper (_.bboxSearch(center, radius, k, operationName))
281282 }
282283
283284 /**
@@ -292,7 +293,7 @@ class ShapeLuceneRDD[K: ClassTag, V: ClassTag]
292293 operationName : String )
293294 : LuceneRDDResponse = {
294295 logInfo(s " Bounding box with lower left ${lowerLeft}, upper right ${upperRight} and k = ${k}" )
295- docResultsMapper (_.bboxSearch(lowerLeft, upperRight, k, operationName))
296+ partitionMapper (_.bboxSearch(lowerLeft, upperRight, k, operationName))
296297 }
297298
298299 override def count (): Long = {
0 commit comments