Skip to content

IndexedSeq instead of Iterator in NearestNeighborIterator [Priority Queue Serialization Error]#7

Open
oscaroboto wants to merge 3 commits into
LinkedInAttic:masterfrom
oscaroboto:master
Open

IndexedSeq instead of Iterator in NearestNeighborIterator [Priority Queue Serialization Error]#7
oscaroboto wants to merge 3 commits into
LinkedInAttic:masterfrom
oscaroboto:master

Conversation

@oscaroboto

Copy link
Copy Markdown

There seemed to have been an issue with concurrency when returning the Iterator in the NearestNeighborIterator class inside of LSHNearestNeighborSearchModel.scala.

Iterator[(ItemId, Iterator[ItemIdDistancePair])] was changed to Iterator[(ItemId, IndexedSeq[ItemIdDistancePair])]. The iterator within the iterator is not serialized and causing a problem with the groupByKey in the getAllNearestNeighbors function. What I think was happening is that during the groupByKey the iterator within the iterator was pointing to a location in memory on a particular node, but when that iterator is copied to another node during the groupByKey it is then pointing to a random position in memory not where one expects.

As a bonus I also rewrote the groupByKey as aggregateByKey, as an aggregate would be more efficient in this case than a groupByKey. I have not done any benchmarking, but from my experience have found aggregateByKey to be more efficient.

Code compiled with ./gradlew build and passed all tests.

@namitk

namitk commented Mar 31, 2019

Copy link
Copy Markdown

LGTM but does this work? I uncommented the test in ModelTest.scala and I still get the serialization error:

com.linkedin.nn.ModelTest.testModel FAILED
    java.io.NotSerializableException: scala.collection.mutable.PriorityQueue$ResizableArrayAccess
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1.apply(PairRDDFunctions.scala:168)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$1.apply(PairRDDFunctions.scala:166)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.PairRDDFunctions.aggregateByKey(PairRDDFunctions.scala:166)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$2.apply(PairRDDFunctions.scala:192)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$aggregateByKey$2.apply(PairRDDFunctions.scala:192)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.PairRDDFunctions.aggregateByKey(PairRDDFunctions.scala:191)
        at com.linkedin.nn.model.LSHNearestNeighborSearchModel.getAllNearestNeighbors(LSHNearestNeighborSearchModel.scala:290)
        at com.linkedin.nn.model.LSHNearestNeighborSearchModel.getSelfAllNearestNeighbors(LSHNearestNeighborSearchModel.scala:303)
        at com.linkedin.nn.ModelTest$$anonfun$testModel$1.apply$mcV$sp(ModelTest.scala:40)
        at com.linkedin.nn.test.SparkTestUtils$class.sparkTest(SparkTestUtils.scala:49)
        at com.linkedin.nn.ModelTest.sparkTest(ModelTest.scala:15)
        at com.linkedin.nn.ModelTest.testModel(ModelTest.scala:18)

@uzadude

uzadude commented Sep 30, 2021

Copy link
Copy Markdown

Hi,
we just came across the need to run this package. Spark's native BucketedRandomProjectionLSH wasn't good enough for (mainly because of the bucket skew issue), and this library worked perfectly. i also see there is some interest that arose from different people over the last few years.
I think putting some life back into this repo could be beneficial, as it seems that quite a lot of work was put into it.
@namitk - you think there's a chance that you could add admins/owners to this repo to keep it going. i would be happy to contribute. maybe @oscaroboto would agree also. we can start by approving this PR as it is needed to run on a cluster.

@namitk

namitk commented Sep 30, 2021

Copy link
Copy Markdown

Hi @uzadude Very glad to hear that this library helped. I am no longer at LinkedIn though so I don't have permissions to change collaborators. Also given it is under LinkedInAttic, LinkedIn is no longer maintaining it. Not sure what the best course of action is, let me know your thoughts.

@uzadude

uzadude commented Sep 30, 2021

Copy link
Copy Markdown

Well, in my company (PayPal), we work with our private accounts in the public GitHub space, so we would have permissions also in the future. Maybe you still know someone there that can add you back as a collaborator with your current user account?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants