Skip to content

Commit cbd31b1

Browse files
slhsxcmyKefaun2601nhandyalkyan2601felixloesing
authored
Writing Data to Elasticsearch Storage Engine (#225)
* Added ElasticsearchProxy.scala; WIP; Broken Solr Using Either[SolrClient,RestHighLevelClient] leads to "Overriding type String => SolrClient does not conform to base type String => Either[SolrClient, RestHighLevelClient]" Co-Authored-By: Kevin Yan <[email protected]> * Compiles Now; More Factory extraction Extract SolrRDD and SolrDeepRDD; Cast getClient() result to SolrClient in 2 RDDs and SolrUpsert; Add getRDD and getDeepRDD to StorageProxyFactory; Add 3 add resource methods to StorageProxy and cast parameter to SolrInputDocument in SolrProxy; Add 2 dummy Elasticsearch RDDs * [Docker] Update run script with relative paths and docker-compose file * Implemented basic addResource() and commitCrawlDb() functionality for ES Co-authored-by: Felix Loesing <[email protected]> Co-authored-by: Mingyu Cui <[email protected]> * make addResourceDocs() and addResources() call addResource() for basic testing Co-authored-by: Felix Loesing <[email protected]> Co-authored-by: Mingyu Cui <[email protected]> * Working basic data insert into Elasticsearch Co-authored-by: Felix Loesing <[email protected]> Co-authored-by: Mingyu Cui <[email protected]> Co-authored-by: Nikhil Handyal <[email protected]> * Successfully pushed Resource data to Elasticsearch * RDD abstraction and began implementing ElasticsearchRDD Co-authored-by: Miles Phan <[email protected]> * Implemented ElasticsearchResultIterator minus deserialization, compiles but not tested * Debugging getPartitions() query Co-authored-by: Miles Phan <[email protected]> Co-authored-by: Mingyu Cui <[email protected]> * Implemented ElasticsearchRDD.compute() w/ proper queries/sorts * Implemented Resource(Map<String,Object>) constructor, prep for deserialization * patch for wildcard/unused imports and pom versions * Deserialize attempt and placeholder * add elasticsearch versions of statusupdate, upsert, and status/scoreupdatetransformer classes. Update crawler as well * got SearchHit data * getPartitions() successful minus querying * made StorageProxyFactory serializable Co-authored-by: Felix Loesing <[email protected]> Co-authored-by: Mingyu Cui <[email protected]> * implemented crawl logic; passes compile; actual crawl passes without error but data in db is not updated * Implemented ElasticsearchDeepRDD * Abstracted StatusUpdateTransformer and refactory factory logic Co-authored-by: Miles Phan <[email protected]> * updateResource back to addResource * resolved merge conflicts from ISSUE-224-2 to ISSUE-224 Co-authored-by: Felix Loesing <[email protected]> Co-authored-by: Mingyu Cui <[email protected]> Co-authored-by: Miles Phan <[email protected]> Co-authored-by: Nikhil Handyal <[email protected]> * Insert preliminary crawl data into Elasticsearch Co-authored-by: Felix Loesing <[email protected]> Co-authored-by: Miles Phan <[email protected]> * Added in default values for missing fields Co-authored-by: Felix Loesing <[email protected]> * resolved the int to Long error * got incrementing a field to work in Elasticsearch Co-authored-by: Miles Phan <[email protected]> * resolved status UNFETCHED/FETCHED querying issue Co-authored-by: Miles Phan <[email protected]> * crawl data successfully persisted Co-authored-by: Miles Phan <[email protected]> * cleaned up debug print statements for Elasticsearch Co-authored-by: Kevin Yan <[email protected]> Co-authored-by: Nikhil Handyal <[email protected]> Co-authored-by: Kevin Yan <[email protected]> Co-authored-by: Felix Loesing <[email protected]> Co-authored-by: Nikhil Handyal <[email protected]> Co-authored-by: Miles Phan <[email protected]> Co-authored-by: Miles Phan <[email protected]> Co-authored-by: Tom Barber <[email protected]>
1 parent 56ad891 commit cbd31b1

40 files changed

+1621
-341
lines changed

sparkler-core/bin/sparkler.sh

+21-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,33 @@
11
#!/usr/bin/env bash
22

3+
# Attempt to resolve the sparkler jar using relative paths
4+
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
5+
DIR="$DIR/.."
6+
7+
JAR=`echo $DIR/sparkler-app-*-SNAPSHOT.jar`
8+
if [ -f "$JAR" ]
9+
then
10+
# run
11+
# -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
12+
java -Xms1g -cp $DIR/conf:$JAR -Dpf4j.pluginsDir=$DIR/plugins edu.usc.irds.sparkler.Main $@
13+
exit 0
14+
fi
15+
16+
# Attempt to resolve the sparkler jar using absolute paths
17+
# We do this because in the elastic-search deployment we add sparkler.sh to /usr/bin
18+
# In that case the Sparkler jar cannot be resolved via relative paths.
19+
# The followig code block resolves the absolute location of this script on disk
20+
# We assume that it is located in sparkler-core/bin/
321
SOURCE="${BASH_SOURCE[0]}"
422
while [ -h "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink
523
DIR="$( cd -P "$( dirname "$SOURCE" )" >/dev/null 2>&1 && pwd )"
624
SOURCE="$(readlink "$SOURCE")"
725
[[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
826
done
927
DIR="$( cd -P "$( dirname "$SOURCE" )" >/dev/null 2>&1 && pwd )"
10-
SPARKLER_CORE_DIR="$DIR/.."
28+
SPARKLER_BUILD_DIR="$DIR/../build"
1129

12-
JAR=`echo $SPARKLER_CORE_DIR/build/sparkler-app-*-SNAPSHOT.jar`
30+
JAR=`echo $SPARKLER_BUILD_DIR/sparkler-app-*-SNAPSHOT.jar`
1331
if [ ! -f "$JAR" ]
1432
then
1533
echo "ERROR: Can't find Sparkler Jar at $JAR.
@@ -19,4 +37,4 @@ fi
1937

2038
# run
2139
# -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
22-
java -Xms1g -cp $DIR/conf:$JAR -Dpf4j.pluginsDir=$SPARKLER_CORE_DIR/build/plugins edu.usc.irds.sparkler.Main $@
40+
java -Xms1g -cp $DIR/conf:$JAR -Dpf4j.pluginsDir=$SPARKLER_BUILD_DIR/plugins edu.usc.irds.sparkler.Main $@

sparkler-core/conf/sparkler-default.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
# uri - Crawl Database URL. Stores crawl metadata and status updates.
2121

22-
crawldb.backend: solr # "solr" is default until "elasticsearch" becomes usable.
22+
crawldb.backend: elasticsearch # "solr" is default until "elasticsearch" becomes usable.
2323

2424
# Type: String. Default: http://localhost:8983/solr/crawldb
2525
# for standalone server
@@ -30,7 +30,7 @@ crawldb.backend: solr # "solr" is default until "elasticsearch" becomes usable.
3030
solr.uri: http://localhost:8983/solr/crawldb
3131

3232
# elasticsearch settings
33-
elasticsearch.uri: http://localhost:9200
33+
elasticsearch.uri: http://elasticsearch:9200
3434

3535

3636
##################### Apache Spark Properties ###########################

sparkler-core/pom.xml

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
<hbase.version>1.2.1</hbase.version>
4747
<solr.version>8.5.0</solr.version>
4848
<slf4j.version>1.7.30</slf4j.version>
49+
<elasticsearch.version>7.11.1</elasticsearch.version>
50+
<elasticsearch.client.version>7.11.1</elasticsearch.client.version>
4951
<snakeyaml.version>1.26</snakeyaml.version>
5052
<json.simple.version>1.1.1</json.simple.version>
5153
<commons.validator.version>1.5.1</commons.validator.version>

sparkler-core/sparkler-api/src/main/java/edu/usc/irds/sparkler/Constants.java

+3
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ interface storage { // Storage Fields
151151
String JSON_MIMETYPE = "application/json";
152152
String PARENT = "parent";
153153
String RESPONSE_TIME = "response_time";
154+
String HOSTNAME = "hostname";
154155
}
155156

157+
String defaultDateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
158+
156159
}

sparkler-core/sparkler-api/src/main/java/edu/usc/irds/sparkler/model/Resource.java

+193
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package edu.usc.irds.sparkler.model;
22

3+
import edu.usc.irds.sparkler.Constants;
34
import edu.usc.irds.sparkler.JobContext;
45
import edu.usc.irds.sparkler.util.StringUtil;
56
import org.apache.solr.client.solrj.beans.Field;
@@ -8,6 +9,7 @@
89
import java.net.MalformedURLException;
910
import java.net.URL;
1011
import java.util.Date;
12+
import java.text.SimpleDateFormat;
1113
import java.util.HashMap;
1214
import java.util.Map;
1315
import java.util.Random;
@@ -38,6 +40,15 @@ public class Resource implements Serializable {
3840
@Field("http_method") private String httpMethod;
3941
@Field("jobmeta") private String metadata;
4042

43+
private String version = "1.0";
44+
private Date modifiedTime = new Date();
45+
private String crawler = "sparkler";
46+
private Integer fetchDepth = 0;
47+
private Double pageScore = 0.0;
48+
private Integer retriesSinceFetch = -1;
49+
private Integer fetchStatusCode = 0;
50+
private Long responseTime = new Long(0);
51+
4152
public Resource() {
4253
}
4354

@@ -101,6 +112,122 @@ public Resource(String url, Integer discoverDepth, JobContext sparklerJob, Resou
101112

102113
}
103114

115+
public Resource(Map<String, Object> dataMap) {
116+
System.out.println("Resource constructor ---------------");
117+
for (String key : dataMap.keySet()) {
118+
System.out.println(key + " => " + dataMap.get(key));
119+
}
120+
121+
if (dataMap.containsKey("id")) id = (String)dataMap.get("id");
122+
if (dataMap.containsKey("url")) url = (String)dataMap.get("url");
123+
if (dataMap.containsKey("group")) group = (String)dataMap.get("group");
124+
if (dataMap.containsKey("discover_depth")) {
125+
try {
126+
discoverDepth = (Integer)dataMap.get("discover_depth");
127+
} catch (Exception e) {
128+
System.err.println("Could not retrieve and parse to Integer: discover_depth");
129+
System.err.println(e.toString());
130+
}
131+
}
132+
if (dataMap.containsKey("status")) status = (String)dataMap.get("status");
133+
if (dataMap.containsKey("fetch_timestamp")) {
134+
try {
135+
fetchTimestamp = new SimpleDateFormat(Constants.defaultDateFormat).parse((String)dataMap.get("fetch_timestamp"));
136+
} catch (Exception e) {
137+
System.err.println("Could not retrieve and parse to Date: fetch_timestamp");
138+
System.err.println(e.toString());
139+
}
140+
}
141+
if (dataMap.containsKey("crawl_id")) crawlId = (String)dataMap.get("crawl_id");
142+
if (dataMap.containsKey("dedupe_id")) dedupeId = (String)dataMap.get("dedupe_id");
143+
if (dataMap.containsKey("*_score")) {
144+
try {
145+
score = (HashMap<String, Double>)dataMap.get("*_score");
146+
} catch (Exception e) {
147+
System.err.println("Could not retrieve and parse to HashMap<String, Double>: *_score");
148+
System.err.println(e.toString());
149+
}
150+
}
151+
if (dataMap.containsKey("generate_score")) {
152+
try {
153+
generateScore = (Double)dataMap.get("generate_score");
154+
} catch (Exception e) {
155+
System.err.println("Could not retrieve and parse to Double: generate_score");
156+
System.err.println(e.toString());
157+
}
158+
}
159+
if (dataMap.containsKey("http_method")) httpMethod = (String)dataMap.get("http_method");
160+
if (dataMap.containsKey("jobmeta")) metadata = (String)dataMap.get("jobmeta");
161+
if (dataMap.containsKey("last_updated_at")) {
162+
try {
163+
lastUpdatedAt = new SimpleDateFormat(Constants.defaultDateFormat).parse((String)dataMap.get("last_updated_at"));
164+
} catch (Exception e) {
165+
System.err.println("Could not retrieve and parse to Date: last_updated_at");
166+
System.err.println(e.toString());
167+
}
168+
}
169+
if (dataMap.containsKey("indexed_at")) {
170+
try {
171+
indexedAt = new SimpleDateFormat(Constants.defaultDateFormat).parse((String)dataMap.get("indexed_at"));
172+
} catch (Exception e) {
173+
System.err.println("Could not retrieve and parse to Date: indexed_at");
174+
System.err.println(e.toString());
175+
}
176+
}
177+
if (dataMap.containsKey("hostname")) hostname = (String)dataMap.get("hostname");
178+
if (dataMap.containsKey("parent")) parent = (String)dataMap.get("parent");
179+
if (dataMap.containsKey("version")) version = (String)dataMap.get("version");
180+
if (dataMap.containsKey("modified_time")) {
181+
try {
182+
modifiedTime = new SimpleDateFormat(Constants.defaultDateFormat).parse((String)dataMap.get("modified_time"));
183+
} catch (Exception e) {
184+
System.err.println("Could not retrieve and parse to Date: modified_time");
185+
System.err.println(e.toString());
186+
}
187+
}
188+
if (dataMap.containsKey("crawler")) crawler = (String)dataMap.get("crawler");
189+
if (dataMap.containsKey("fetch_depth")) {
190+
try {
191+
fetchDepth = (Integer)dataMap.get("fetch_depth");
192+
} catch (Exception e) {
193+
System.err.println("Could not retrieve and parse to Integer: fetch_depth");
194+
System.err.println(e.toString());
195+
}
196+
}
197+
if (dataMap.containsKey("page_score")) {
198+
try {
199+
pageScore = (Double)dataMap.get("page_score");
200+
} catch (Exception e) {
201+
System.err.println("Could not retrieve and parse to Double: page_score");
202+
System.err.println(e.toString());
203+
}
204+
}
205+
if (dataMap.containsKey("retries_since_fetch")) {
206+
try {
207+
retriesSinceFetch = (Integer)dataMap.get("retries_since_fetch");
208+
} catch (Exception e) {
209+
System.err.println("Could not retrieve and parse to Integer: retries_since_fetch");
210+
System.err.println(e.toString());
211+
}
212+
}
213+
if (dataMap.containsKey("fetch_status_code")) {
214+
try {
215+
fetchStatusCode = (Integer)dataMap.get("fetch_status_code");
216+
} catch (Exception e) {
217+
System.err.println("Could not retrieve and parse to Integer: fetch_status_code");
218+
System.err.println(e.toString());
219+
}
220+
}
221+
if (dataMap.containsKey("response_time")) {
222+
try {
223+
responseTime = new Long((String)dataMap.get("response_time"));
224+
} catch (Exception e) {
225+
System.err.println("Could not retrieve and parse to Long: response_time");
226+
System.err.println(e.toString());
227+
}
228+
}
229+
}
230+
104231
@Override
105232
public String toString() {
106233
return String.format("Resource(%s, %s, %s, %d, %f, %s)",
@@ -203,4 +330,70 @@ public String getHttpMethod(){
203330
public String getMetadata(){
204331
return this.metadata;
205332
}
333+
334+
public Date getLastUpdatedAt() {
335+
return this.lastUpdatedAt;
336+
}
337+
338+
public Date getIndexedAt() {
339+
return this.indexedAt;
340+
}
341+
342+
public String getHostname() {
343+
return this.hostname;
344+
}
345+
346+
public String getParent() {
347+
return this.parent;
348+
}
349+
350+
public String getVersion() { return this.version; }
351+
352+
public Date getModifiedTime() { return this.modifiedTime; }
353+
354+
public String getCrawler() { return this.crawler; }
355+
356+
public Integer getFetchDepth() { return this.fetchDepth; }
357+
358+
public Double getPageScore() { return this.pageScore; }
359+
360+
public Integer getRetriesSinceFetch() { return this.retriesSinceFetch; }
361+
362+
public Integer getFetchStatusCode() { return this.fetchStatusCode; }
363+
364+
public Long getResponseTime() { return this.responseTime; }
365+
366+
public Map<String, Object> getDataAsMap() {
367+
Map<String, Object> dataMap = new HashMap<String, Object>() {{
368+
put("id", getId());
369+
put("url", getUrl());
370+
put("group", getGroup());
371+
put("discover_depth", getDiscoverDepth());
372+
put("status", getStatus());
373+
put("fetch_timestamp", getFetchTimestamp());
374+
put("crawl_id", getCrawlId());
375+
put("dedupe_id", getDedupeId());
376+
put("generate_score", getGenerateScore());
377+
put("http_method", getHttpMethod());
378+
put("jobmeta", getMetadata());
379+
put("last_updated_at", getLastUpdatedAt());
380+
put("indexed_at", getIndexedAt());
381+
put("hostname", getHostname());
382+
put("parent", getParent());
383+
put("version", getVersion());
384+
put("modified_time", getModifiedTime());
385+
put("crawler", getCrawler());
386+
put("fetch_depth", getFetchDepth());
387+
put("page_score", getPageScore());
388+
put("retries_since_fetch", getRetriesSinceFetch());
389+
put("fetch_status_code", getFetchStatusCode());
390+
put("response_time", getResponseTime());
391+
}};
392+
393+
Map<String, Double> scores = getScore();
394+
for (String key : scores.keySet()) {
395+
dataMap.put(key, scores.get(key));
396+
}
397+
return dataMap;
398+
}
206399
}
+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package edu.usc.irds.sparkler.storage.solr.schema;
18+
package edu.usc.irds.sparkler.storage;
1919

2020
import org.json.simple.parser.ParseException;
2121
import org.slf4j.Logger;
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package edu.usc.irds.sparkler.storage.solr.schema;
18+
package edu.usc.irds.sparkler.storage;
1919

2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;

sparkler-core/sparkler-api/src/test/resources/sparkler-default.yaml

+2-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
# uri - Crawl Database URL. Stores crawl metadata and status updates.
2121

22-
crawldb.backend: solr # "solr" is default until "elasticsearch" becomes usable.
22+
crawldb.backend: elasticsearch # "solr" is default until "elasticsearch" becomes usable.
2323

2424
# Type: String. Default: http://localhost:8983/solr/crawldb
2525
# for standalone server
@@ -30,8 +30,7 @@ crawldb.backend: solr # "solr" is default until "elasticsearch" becomes usable.
3030
solr.uri: http://localhost:8983/solr/crawldb
3131

3232
# elasticsearch settings
33-
elasticsearch.uri: http://localhost:9200
34-
33+
elasticsearch.uri: http://elasticsearch:9200
3534

3635
##################### Apache Spark Properties ###########################
3736

sparkler-core/sparkler-app/pom.xml

+10
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,16 @@
104104
</exclusions>
105105
</dependency>
106106

107+
<dependency>
108+
<groupId>org.elasticsearch</groupId>
109+
<artifactId>elasticsearch</artifactId>
110+
<version>${elasticsearch.version}</version>
111+
</dependency>
112+
<dependency>
113+
<groupId>org.elasticsearch.client</groupId>
114+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
115+
<version>${elasticsearch.client.version}</version>
116+
</dependency>
107117
<dependency>
108118
<groupId>org.apache.solr</groupId>
109119
<artifactId>solr-solrj</artifactId>

0 commit comments

Comments
 (0)