Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,11 @@
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.sleepycat</groupId>
<artifactId>je</artifactId>
<version>5.0.73</version>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-parsers</artifactId>
Expand Down
30 changes: 29 additions & 1 deletion src/main/java/edu/uci/ics/crawler4j/crawler/CrawlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ public class CrawlConfig {
*/
private String crawlStorageFolder;

/**
* The host used to initiate a Redis connection
*/
private String redisHost = "localhost";

/**
* The port used to initiate a Redis connection
*/
private int redisPort = 6379;

/**
* If this feature is enabled, you would be able to resume a previously
* stopped/crashed crawl. However, it makes crawling slightly slower
Expand Down Expand Up @@ -495,10 +505,28 @@ public void setAuthInfos(List<AuthInfo> authInfos) {
this.authInfos = authInfos;
}

@Override
public String getRedisHost() {
return redisHost;
}

public void setRedisHost(String redisHost) {
this.redisHost = redisHost;
}

public int getRedisPort() {
return redisPort;
}

public void setRedisPort(int redisPort) {
this.redisPort = redisPort;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Crawl storage folder: " + getCrawlStorageFolder() + "\n");
sb.append("Redis host: " + getRedisHost() + "\n");
sb.append("Redis port: " + getRedisPort() + "\n");
sb.append("Resumable crawling: " + isResumableCrawling() + "\n");
sb.append("Max depth of crawl: " + getMaxDepthOfCrawling() + "\n");
sb.append("Max pages to fetch: " + getMaxPagesToFetch() + "\n");
Expand Down
28 changes: 8 additions & 20 deletions src/main/java/edu/uci/ics/crawler4j/crawler/CrawlController.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;

import edu.uci.ics.crawler4j.fetcher.PageFetcher;
import edu.uci.ics.crawler4j.frontier.DocIDServer;
import edu.uci.ics.crawler4j.frontier.Frontier;
Expand All @@ -35,14 +29,9 @@
import edu.uci.ics.crawler4j.url.URLCanonicalizer;
import edu.uci.ics.crawler4j.url.WebURL;
import edu.uci.ics.crawler4j.util.IO;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

/**
* The controller that manages a crawling session. This class creates the
* crawler threads and monitors their progress.
Expand Down Expand Up @@ -82,7 +71,7 @@ public class CrawlController extends Configurable {
protected DocIDServer docIdServer;

protected final Object waitingLock = new Object();
protected final Environment env;


public CrawlController(CrawlConfig config, PageFetcher pageFetcher, RobotstxtServer robotstxtServer)
throws Exception {
Expand All @@ -103,10 +92,7 @@ public CrawlController(CrawlConfig config, PageFetcher pageFetcher, RobotstxtSer

boolean resumable = config.isResumableCrawling();

EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(resumable);
envConfig.setLocking(resumable);


File envHome = new File(config.getCrawlStorageFolder() + "/frontier");
if (!envHome.exists()) {
Expand All @@ -117,14 +103,16 @@ public CrawlController(CrawlConfig config, PageFetcher pageFetcher, RobotstxtSer
}
}

docIdServer = new DocIDServer(config);
frontier = new Frontier(config);

if (!resumable) {
frontier.clearData();
IO.deleteFolderContents(envHome);
logger.info("Deleted contents of: " + envHome + " ( as you have configured resumable crawling to false )");
}

env = new Environment(envHome, envConfig);
docIdServer = new DocIDServer(env, config);
frontier = new Frontier(env, config);


this.pageFetcher = pageFetcher;
this.robotstxtServer = robotstxtServer;
Expand Down Expand Up @@ -309,7 +297,7 @@ public void run() {

finished = true;
waitingLock.notifyAll();
env.close();


return;
}
Expand Down
138 changes: 34 additions & 104 deletions src/main/java/edu/uci/ics/crawler4j/frontier/Counters.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,125 +17,55 @@

package edu.uci.ics.crawler4j.frontier;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;

import edu.uci.ics.crawler4j.crawler.Configurable;
import edu.uci.ics.crawler4j.crawler.CrawlConfig;
import edu.uci.ics.crawler4j.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

/**
* @author Yasser Ganjisaffar
*/
public class Counters extends Configurable {
private static final Logger logger = LoggerFactory.getLogger(Counters.class);

public static class ReservedCounterNames {
public static final String SCHEDULED_PAGES = "Scheduled-Pages";
public static final String PROCESSED_PAGES = "Processed-Pages";
}

private static final String DATABASE_NAME = "Statistics";
protected Database statisticsDB = null;
protected Environment env;

protected final Object mutex = new Object();

protected Map<String, Long> counterValues;
private static final Logger logger = LoggerFactory.getLogger(Counters.class);

public Counters(Environment env, CrawlConfig config) {
super(config);
public static class ReservedCounterNames {
public static final String SCHEDULED_PAGES = "Scheduled-Pages";
public static final String PROCESSED_PAGES = "Processed-Pages";
}

this.env = env;
this.counterValues = new HashMap<>();
public static final int DATABASE_INDEX = 0;
protected Jedis statisticsDB = null;

/*
* When crawling is set to be resumable, we have to keep the statistics
* in a transactional database to make sure they are not lost if crawler
* is crashed or terminated unexpectedly.
*/
if (config.isResumableCrawling()) {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(true);
dbConfig.setDeferredWrite(false);
statisticsDB = env.openDatabase(null, DATABASE_NAME, dbConfig);

OperationStatus result;
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
Transaction tnx = env.beginTransaction(null, null);
Cursor cursor = statisticsDB.openCursor(tnx, null);
result = cursor.getFirst(key, value, null);
protected final Object mutex = new Object();

while (result == OperationStatus.SUCCESS) {
if (value.getData().length > 0) {
String name = new String(key.getData());
long counterValue = Util.byteArray2Long(value.getData());
counterValues.put(name, counterValue);
}
result = cursor.getNext(key, value, null);
}
cursor.close();
tnx.commit();
}
}
public Counters(CrawlConfig config) {
super(config);
statisticsDB = new Jedis(config.getRedisHost(), config.getRedisPort());
statisticsDB.select(DATABASE_INDEX);
}

public long getValue(String name) {
synchronized (mutex) {
Long value = counterValues.get(name);
if (value == null) {
return 0;
}
return value;
}
}
public long getValue(String name) {
String value = statisticsDB.get(name);
return value == null ? 0 : Long.parseLong(value);
}

public void setValue(String name, long value) {
synchronized (mutex) {
try {
counterValues.put(name, value);
if (statisticsDB != null) {
Transaction txn = env.beginTransaction(null, null);
statisticsDB.put(txn, new DatabaseEntry(name.getBytes()), new DatabaseEntry(Util.long2ByteArray(value)));
txn.commit();
}
} catch (Exception e) {
logger.error("Exception setting value", e);
}
}
}
public void setValue(String name, Long value) {
statisticsDB.set(name, value.toString());
}

public void increment(String name) {
increment(name, 1);
}
public void increment(String name) {
statisticsDB.incr(name);
}

public void increment(String name, long addition) {
synchronized (mutex) {
long prevValue = getValue(name);
setValue(name, prevValue + addition);
}
}
public void increment(String name, long addition) {
statisticsDB.incrBy(name, addition);
}

public void close() {
try {
if (statisticsDB != null) {
statisticsDB.close();
}
} catch (DatabaseException e) {
logger.error("Exception thrown while trying to close statisticsDB", e);
}
}
public void close() {
if (statisticsDB != null) {
statisticsDB.close();
}
}
}
Loading