diff --git a/pom.xml b/pom.xml index 5a48a842a..3feca7793 100644 --- a/pom.xml +++ b/pom.xml @@ -129,12 +129,11 @@ compile - - com.sleepycat - je - 5.0.73 - - + + redis.clients + jedis + 2.8.0 + org.apache.tika tika-parsers diff --git a/src/main/java/edu/uci/ics/crawler4j/crawler/CrawlConfig.java b/src/main/java/edu/uci/ics/crawler4j/crawler/CrawlConfig.java index 9335c7735..540dd5f52 100644 --- a/src/main/java/edu/uci/ics/crawler4j/crawler/CrawlConfig.java +++ b/src/main/java/edu/uci/ics/crawler4j/crawler/CrawlConfig.java @@ -35,6 +35,16 @@ public class CrawlConfig { */ private String crawlStorageFolder; + /** + * The host used to initiate a Redis connection + */ + private String redisHost = "localhost"; + + /** + * The port used to initiate a Redis connection + */ + private int redisPort = 6379; + /** * If this feature is enabled, you would be able to resume a previously * stopped/crashed crawl. However, it makes crawling slightly slower @@ -495,10 +505,28 @@ public void setAuthInfos(List authInfos) { this.authInfos = authInfos; } - @Override + public String getRedisHost() { + return redisHost; + } + + public void setRedisHost(String redisHost) { + this.redisHost = redisHost; + } + + public int getRedisPort() { + return redisPort; + } + + public void setRedisPort(int redisPort) { + this.redisPort = redisPort; + } + + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Crawl storage folder: " + getCrawlStorageFolder() + "\n"); + sb.append("Redis host: " + getRedisHost() + "\n"); + sb.append("Redis port: " + getRedisPort() + "\n"); sb.append("Resumable crawling: " + isResumableCrawling() + "\n"); sb.append("Max depth of crawl: " + getMaxDepthOfCrawling() + "\n"); sb.append("Max pages to fetch: " + getMaxPagesToFetch() + "\n"); diff --git a/src/main/java/edu/uci/ics/crawler4j/crawler/CrawlController.java b/src/main/java/edu/uci/ics/crawler4j/crawler/CrawlController.java index aee1144cd..1a685ceb5 100644 --- a/src/main/java/edu/uci/ics/crawler4j/crawler/CrawlController.java +++ b/src/main/java/edu/uci/ics/crawler4j/crawler/CrawlController.java @@ -21,12 +21,6 @@ import java.util.ArrayList; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; - import edu.uci.ics.crawler4j.fetcher.PageFetcher; import edu.uci.ics.crawler4j.frontier.DocIDServer; import edu.uci.ics.crawler4j.frontier.Frontier; @@ -35,14 +29,9 @@ import edu.uci.ics.crawler4j.url.URLCanonicalizer; import edu.uci.ics.crawler4j.url.WebURL; import edu.uci.ics.crawler4j.util.IO; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.ArrayList; -import java.util.List; - /** * The controller that manages a crawling session. This class creates the * crawler threads and monitors their progress. @@ -82,7 +71,7 @@ public class CrawlController extends Configurable { protected DocIDServer docIdServer; protected final Object waitingLock = new Object(); - protected final Environment env; + public CrawlController(CrawlConfig config, PageFetcher pageFetcher, RobotstxtServer robotstxtServer) throws Exception { @@ -103,10 +92,7 @@ public CrawlController(CrawlConfig config, PageFetcher pageFetcher, RobotstxtSer boolean resumable = config.isResumableCrawling(); - EnvironmentConfig envConfig = new EnvironmentConfig(); - envConfig.setAllowCreate(true); - envConfig.setTransactional(resumable); - envConfig.setLocking(resumable); + File envHome = new File(config.getCrawlStorageFolder() + "/frontier"); if (!envHome.exists()) { @@ -117,14 +103,16 @@ public CrawlController(CrawlConfig config, PageFetcher pageFetcher, RobotstxtSer } } + docIdServer = new DocIDServer(config); + frontier = new Frontier(config); + if (!resumable) { + frontier.clearData(); IO.deleteFolderContents(envHome); logger.info("Deleted contents of: " + envHome + " ( as you have configured resumable crawling to false )"); } - env = new Environment(envHome, envConfig); - docIdServer = new DocIDServer(env, config); - frontier = new Frontier(env, config); + this.pageFetcher = pageFetcher; this.robotstxtServer = robotstxtServer; @@ -309,7 +297,7 @@ public void run() { finished = true; waitingLock.notifyAll(); - env.close(); + return; } diff --git a/src/main/java/edu/uci/ics/crawler4j/frontier/Counters.java b/src/main/java/edu/uci/ics/crawler4j/frontier/Counters.java index 476f549bc..ac5d577f4 100644 --- a/src/main/java/edu/uci/ics/crawler4j/frontier/Counters.java +++ b/src/main/java/edu/uci/ics/crawler4j/frontier/Counters.java @@ -17,125 +17,55 @@ package edu.uci.ics.crawler4j.frontier; -import java.util.HashMap; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.sleepycat.je.Cursor; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Environment; -import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.Transaction; - import edu.uci.ics.crawler4j.crawler.Configurable; import edu.uci.ics.crawler4j.crawler.CrawlConfig; -import edu.uci.ics.crawler4j.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; /** * @author Yasser Ganjisaffar */ public class Counters extends Configurable { - private static final Logger logger = LoggerFactory.getLogger(Counters.class); - - public static class ReservedCounterNames { - public static final String SCHEDULED_PAGES = "Scheduled-Pages"; - public static final String PROCESSED_PAGES = "Processed-Pages"; - } - - private static final String DATABASE_NAME = "Statistics"; - protected Database statisticsDB = null; - protected Environment env; - - protected final Object mutex = new Object(); - - protected Map counterValues; + private static final Logger logger = LoggerFactory.getLogger(Counters.class); - public Counters(Environment env, CrawlConfig config) { - super(config); + public static class ReservedCounterNames { + public static final String SCHEDULED_PAGES = "Scheduled-Pages"; + public static final String PROCESSED_PAGES = "Processed-Pages"; + } - this.env = env; - this.counterValues = new HashMap<>(); + public static final int DATABASE_INDEX = 0; + protected Jedis statisticsDB = null; - /* - * When crawling is set to be resumable, we have to keep the statistics - * in a transactional database to make sure they are not lost if crawler - * is crashed or terminated unexpectedly. - */ - if (config.isResumableCrawling()) { - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setAllowCreate(true); - dbConfig.setTransactional(true); - dbConfig.setDeferredWrite(false); - statisticsDB = env.openDatabase(null, DATABASE_NAME, dbConfig); - OperationStatus result; - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - Transaction tnx = env.beginTransaction(null, null); - Cursor cursor = statisticsDB.openCursor(tnx, null); - result = cursor.getFirst(key, value, null); + protected final Object mutex = new Object(); - while (result == OperationStatus.SUCCESS) { - if (value.getData().length > 0) { - String name = new String(key.getData()); - long counterValue = Util.byteArray2Long(value.getData()); - counterValues.put(name, counterValue); - } - result = cursor.getNext(key, value, null); - } - cursor.close(); - tnx.commit(); - } - } + public Counters(CrawlConfig config) { + super(config); + statisticsDB = new Jedis(config.getRedisHost(), config.getRedisPort()); + statisticsDB.select(DATABASE_INDEX); + } - public long getValue(String name) { - synchronized (mutex) { - Long value = counterValues.get(name); - if (value == null) { - return 0; - } - return value; - } - } + public long getValue(String name) { + String value = statisticsDB.get(name); + return value == null ? 0 : Long.parseLong(value); + } - public void setValue(String name, long value) { - synchronized (mutex) { - try { - counterValues.put(name, value); - if (statisticsDB != null) { - Transaction txn = env.beginTransaction(null, null); - statisticsDB.put(txn, new DatabaseEntry(name.getBytes()), new DatabaseEntry(Util.long2ByteArray(value))); - txn.commit(); - } - } catch (Exception e) { - logger.error("Exception setting value", e); - } - } - } + public void setValue(String name, Long value) { + statisticsDB.set(name, value.toString()); + } - public void increment(String name) { - increment(name, 1); - } + public void increment(String name) { + statisticsDB.incr(name); + } - public void increment(String name, long addition) { - synchronized (mutex) { - long prevValue = getValue(name); - setValue(name, prevValue + addition); - } - } + public void increment(String name, long addition) { + statisticsDB.incrBy(name, addition); + } - public void close() { - try { - if (statisticsDB != null) { - statisticsDB.close(); - } - } catch (DatabaseException e) { - logger.error("Exception thrown while trying to close statisticsDB", e); - } - } + public void close() { + if (statisticsDB != null) { + statisticsDB.close(); + } + } } \ No newline at end of file diff --git a/src/main/java/edu/uci/ics/crawler4j/frontier/DocIDServer.java b/src/main/java/edu/uci/ics/crawler4j/frontier/DocIDServer.java index ec90769da..98c8ef3bf 100644 --- a/src/main/java/edu/uci/ics/crawler4j/frontier/DocIDServer.java +++ b/src/main/java/edu/uci/ics/crawler4j/frontier/DocIDServer.java @@ -17,135 +17,108 @@ package edu.uci.ics.crawler4j.frontier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Environment; -import com.sleepycat.je.OperationStatus; import edu.uci.ics.crawler4j.crawler.Configurable; import edu.uci.ics.crawler4j.crawler.CrawlConfig; -import edu.uci.ics.crawler4j.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; /** * @author Yasser Ganjisaffar */ public class DocIDServer extends Configurable { - private static final Logger logger = LoggerFactory.getLogger(DocIDServer.class); - - private final Database docIDsDB; - private static final String DATABASE_NAME = "DocIDs"; - - private final Object mutex = new Object(); - - private int lastDocID; - - public DocIDServer(Environment env, CrawlConfig config) { - super(config); - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setAllowCreate(true); - dbConfig.setTransactional(config.isResumableCrawling()); - dbConfig.setDeferredWrite(!config.isResumableCrawling()); - lastDocID = 0; - docIDsDB = env.openDatabase(null, DATABASE_NAME, dbConfig); - if (config.isResumableCrawling()) { - int docCount = getDocCount(); - if (docCount > 0) { - logger.info("Loaded {} URLs that had been detected in previous crawl.", docCount); - lastDocID = docCount; - } - } - } - - /** - * Returns the docid of an already seen url. - * - * @param url the URL for which the docid is returned. - * @return the docid of the url if it is seen before. Otherwise -1 is returned. - */ - public int getDocId(String url) { - synchronized (mutex) { - OperationStatus result = null; - DatabaseEntry value = new DatabaseEntry(); - try { - DatabaseEntry key = new DatabaseEntry(url.getBytes()); - result = docIDsDB.get(null, key, value, null); - - } catch (Exception e) { - logger.error("Exception thrown while getting DocID", e); - return -1; - } - - if ((result == OperationStatus.SUCCESS) && (value.getData().length > 0)) { - return Util.byteArray2Int(value.getData()); - } - - return -1; - } - } - - public int getNewDocID(String url) { - synchronized (mutex) { - try { - // Make sure that we have not already assigned a docid for this URL - int docID = getDocId(url); - if (docID > 0) { - return docID; - } - - ++lastDocID; - docIDsDB.put(null, new DatabaseEntry(url.getBytes()), new DatabaseEntry(Util.int2ByteArray(lastDocID))); - return lastDocID; - } catch (Exception e) { - logger.error("Exception thrown while getting new DocID", e); - return -1; - } - } - } - - public void addUrlAndDocId(String url, int docId) throws Exception { - synchronized (mutex) { - if (docId <= lastDocID) { - throw new Exception("Requested doc id: " + docId + " is not larger than: " + lastDocID); - } - - // Make sure that we have not already assigned a docid for this URL - int prevDocid = getDocId(url); - if (prevDocid > 0) { - if (prevDocid == docId) { - return; - } - throw new Exception("Doc id: " + prevDocid + " is already assigned to URL: " + url); - } - - docIDsDB.put(null, new DatabaseEntry(url.getBytes()), new DatabaseEntry(Util.int2ByteArray(docId))); - lastDocID = docId; - } - } - - public boolean isSeenBefore(String url) { - return getDocId(url) != -1; - } - - public final int getDocCount() { - try { - return (int) docIDsDB.count(); - } catch (DatabaseException e) { - logger.error("Exception thrown while getting DOC Count", e); - return -1; - } - } - - public void close() { - try { - docIDsDB.close(); - } catch (DatabaseException e) { - logger.error("Exception thrown while closing DocIDServer", e); - } - } + private static final Logger logger = LoggerFactory.getLogger(DocIDServer.class); + + private final Jedis docIDsDB; + public static final int DATABASE_INDEX = 1; + public static final String KEY_PREFIX = "url:"; + + private final Object mutex = new Object(); + + private int lastDocID; + + public DocIDServer(CrawlConfig config) { + super(config); + lastDocID = 0; + docIDsDB = new Jedis(config.getRedisHost(), config.getRedisPort()); + docIDsDB.select(DATABASE_INDEX); + if (config.isResumableCrawling()) { + int docCount = getDocCount(); + if (docCount > 0) { + logger.info("Loaded {} URLs that had been detected in previous crawl.", docCount); + lastDocID = docCount; + } + } + } + + /** + * Returns the docid of an already seen url. + * + * @param url the URL for which the docid is returned. + * @return the docid of the url if it is seen before. Otherwise -1 is returned. + */ + public int getDocId(String url) { + synchronized (mutex) { + String result = docIDsDB.get(getKey(url)); + return result == null ? -1 : Integer.valueOf(result); + } + } + + public int getNewDocID(String url) { + synchronized (mutex) { + try { + // Make sure that we have not already assigned a docid for this URL + int docID = getDocId(url); + if (docID > 0) { + return docID; + } + + ++lastDocID; + docIDsDB.set(getKey(url), String.valueOf(lastDocID)); + return lastDocID; + } + catch (Exception e) { + logger.error("Exception thrown while getting new DocID", e); + return -1; + } + } + } + + public void addUrlAndDocId(String url, int docId) throws Exception { + synchronized (mutex) { + if (docId <= lastDocID) { + throw new Exception("Requested doc id: " + docId + " is not larger than: " + lastDocID); + } + + // Make sure that we have not already assigned a docid for this URL + int prevDocid = getDocId(url); + if (prevDocid > 0) { + if (prevDocid == docId) { + return; + } + throw new Exception("Doc id: " + prevDocid + " is already assigned to URL: " + url); + } + + docIDsDB.set(getKey(url), String.valueOf(docId)); + lastDocID = docId; + } + } + + private String getKey(String url) { + return KEY_PREFIX + url; + } + + public boolean isSeenBefore(String url) { + return getDocId(url) != -1; + } + + public final int getDocCount() { + return docIDsDB.keys(KEY_PREFIX + "*").size(); + } + + public void close() { + docIDsDB.close(); + } } \ No newline at end of file diff --git a/src/main/java/edu/uci/ics/crawler4j/frontier/Frontier.java b/src/main/java/edu/uci/ics/crawler4j/frontier/Frontier.java index 6a43d5e35..f9d5d0149 100644 --- a/src/main/java/edu/uci/ics/crawler4j/frontier/Frontier.java +++ b/src/main/java/edu/uci/ics/crawler4j/frontier/Frontier.java @@ -19,15 +19,12 @@ import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.Environment; - import edu.uci.ics.crawler4j.crawler.Configurable; import edu.uci.ics.crawler4j.crawler.CrawlConfig; import edu.uci.ics.crawler4j.url.WebURL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; /** * @author Yasser Ganjisaffar @@ -36,7 +33,7 @@ public class Frontier extends Configurable { protected static final Logger logger = LoggerFactory.getLogger(Frontier.class); - private static final String DATABASE_NAME = "PendingURLsDB"; + private static final int DATABASE_INDEX = 2; private static final int IN_PROCESS_RESCHEDULE_BATCH_SIZE = 100; protected WorkQueues workQueues; @@ -51,14 +48,14 @@ public class Frontier extends Configurable { protected Counters counters; - public Frontier(Environment env, CrawlConfig config) { + public Frontier(CrawlConfig config) { super(config); - this.counters = new Counters(env, config); - try { - workQueues = new WorkQueues(env, DATABASE_NAME, config.isResumableCrawling()); + this.counters = new Counters(config); + + workQueues = new WorkQueues(DATABASE_INDEX,config); if (config.isResumableCrawling()) { scheduledPages = counters.getValue(Counters.ReservedCounterNames.SCHEDULED_PAGES); - inProcessPages = new InProcessPagesDB(env); + inProcessPages = new InProcessPagesDB(config); long numPreviouslyInProcessPages = inProcessPages.getLength(); if (numPreviouslyInProcessPages > 0) { logger.info("Rescheduling {} URLs from previous crawl.", numPreviouslyInProcessPages); @@ -75,10 +72,7 @@ public Frontier(Environment env, CrawlConfig config) { inProcessPages = null; scheduledPages = 0; } - } catch (DatabaseException e) { - logger.error("Error while initializing the Frontier", e); - workQueues = null; - } + } public void scheduleAll(List urls) { @@ -90,12 +84,10 @@ public void scheduleAll(List urls) { break; } - try { + workQueues.put(url); newScheduledPage++; - } catch (DatabaseException e) { - logger.error("Error while putting the url in the work queue", e); - } + } if (newScheduledPage > 0) { scheduledPages += newScheduledPage; @@ -110,15 +102,13 @@ public void scheduleAll(List urls) { public void schedule(WebURL url) { int maxPagesToFetch = config.getMaxPagesToFetch(); synchronized (mutex) { - try { + if (maxPagesToFetch < 0 || scheduledPages < maxPagesToFetch) { workQueues.put(url); scheduledPages++; counters.increment(Counters.ReservedCounterNames.SCHEDULED_PAGES); } - } catch (DatabaseException e) { - logger.error("Error while putting the url in the work queue", e); - } + } } @@ -128,7 +118,7 @@ public void getNextURLs(int max, List result) { if (isFinished) { return; } - try { + List curResults = workQueues.get(max); workQueues.delete(curResults.size()); if (inProcessPages != null) { @@ -137,9 +127,7 @@ public void getNextURLs(int max, List result) { } } result.addAll(curResults); - } catch (DatabaseException e) { - logger.error("Error while getting next urls", e); - } + if (result.size() > 0) { return; @@ -198,4 +186,17 @@ public void finish() { waitingList.notifyAll(); } } + + public void clearData() { + Jedis jedis = new Jedis(getConfig().getRedisHost(), getConfig().getRedisPort()); + int[] dbs = new int[]{Counters.DATABASE_INDEX, + DocIDServer.DATABASE_INDEX, + Frontier.DATABASE_INDEX, + InProcessPagesDB.DATABASE_INDEX}; + for (int db : dbs) { + jedis.select(db); + jedis.flushDB(); + } + jedis.close(); + } } \ No newline at end of file diff --git a/src/main/java/edu/uci/ics/crawler4j/frontier/InProcessPagesDB.java b/src/main/java/edu/uci/ics/crawler4j/frontier/InProcessPagesDB.java index 5bafcea32..9a7251f4d 100644 --- a/src/main/java/edu/uci/ics/crawler4j/frontier/InProcessPagesDB.java +++ b/src/main/java/edu/uci/ics/crawler4j/frontier/InProcessPagesDB.java @@ -18,17 +18,11 @@ package edu.uci.ics.crawler4j.frontier; +import edu.uci.ics.crawler4j.crawler.CrawlConfig; +import edu.uci.ics.crawler4j.url.WebURL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.Environment; -import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.Transaction; - -import edu.uci.ics.crawler4j.url.WebURL; - /** * This class maintains the list of pages which are * assigned to crawlers but are not yet processed. @@ -39,10 +33,10 @@ public class InProcessPagesDB extends WorkQueues { private static final Logger logger = LoggerFactory.getLogger(InProcessPagesDB.class); - private static final String DATABASE_NAME = "InProcessPagesDB"; + public static final int DATABASE_INDEX = 3; - public InProcessPagesDB(Environment env) { - super(env, DATABASE_NAME, true); + public InProcessPagesDB(CrawlConfig crawlConfig) { + super(DATABASE_INDEX,crawlConfig); long docCount = getLength(); if (docCount > 0) { logger.info("Loaded {} URLs that have been in process in the previous crawl.", docCount); @@ -51,22 +45,10 @@ public InProcessPagesDB(Environment env) { public boolean removeURL(WebURL webUrl) { synchronized (mutex) { - DatabaseEntry key = getDatabaseEntryKey(webUrl); - DatabaseEntry value = new DatabaseEntry(); - Transaction txn = beginTransaction(); - try (Cursor cursor = openCursor(txn)) { - OperationStatus result = cursor.getSearchKey(key, value, null); - - if (result == OperationStatus.SUCCESS) { - result = cursor.delete(); - if (result == OperationStatus.SUCCESS) { - return true; - } - } - } finally { - commit(txn); - } + String key = getDatabaseEntryKey(webUrl); + getUrlsDB().del(key); + getUrlsDB().zrem(ALL_URLS, key); } - return false; + return true; } } \ No newline at end of file diff --git a/src/main/java/edu/uci/ics/crawler4j/frontier/WebURLTupleBinding.java b/src/main/java/edu/uci/ics/crawler4j/frontier/WebURLTupleBinding.java index 59c998206..9c29a76e5 100644 --- a/src/main/java/edu/uci/ics/crawler4j/frontier/WebURLTupleBinding.java +++ b/src/main/java/edu/uci/ics/crawler4j/frontier/WebURLTupleBinding.java @@ -17,38 +17,51 @@ package edu.uci.ics.crawler4j.frontier; -import com.sleepycat.bind.tuple.TupleBinding; -import com.sleepycat.bind.tuple.TupleInput; -import com.sleepycat.bind.tuple.TupleOutput; +import java.util.Map; import edu.uci.ics.crawler4j.url.WebURL; /** * @author Yasser Ganjisaffar */ -public class WebURLTupleBinding extends TupleBinding { +public class WebURLTupleBinding { - @Override - public WebURL entryToObject(TupleInput input) { + + public static final String URL = "url"; + public static final String DOCID = "docid"; + public static final String PARENT_DOCID = "parent_docid"; + public static final String PARENT_URL = "parent_url"; + public static final String DEPTH = "depth"; + public static final String PRIORITY = "priority"; + public static final String ANCHOR = "anchor"; + + public WebURL entryToObject(Map input) { WebURL webURL = new WebURL(); - webURL.setURL(input.readString()); - webURL.setDocid(input.readInt()); - webURL.setParentDocid(input.readInt()); - webURL.setParentUrl(input.readString()); - webURL.setDepth(input.readShort()); - webURL.setPriority(input.readByte()); - webURL.setAnchor(input.readString()); + webURL.setURL(input.get(URL)); + webURL.setDocid(Integer.parseInt(input.get(DOCID))); + webURL.setParentDocid(Integer.parseInt(input.get(PARENT_DOCID))); + webURL.setParentUrl(input.get(PARENT_URL)); + webURL.setDepth(Short.parseShort(input.get(DEPTH))); + webURL.setPriority(Byte.parseByte(input.get(PRIORITY))); + webURL.setAnchor(input.get(ANCHOR)); return webURL; } - @Override - public void objectToEntry(WebURL url, TupleOutput output) { - output.writeString(url.getURL()); - output.writeInt(url.getDocid()); - output.writeInt(url.getParentDocid()); - output.writeString(url.getParentUrl()); - output.writeShort(url.getDepth()); - output.writeByte(url.getPriority()); - output.writeString(url.getAnchor()); + + public void objectToEntry(WebURL url, Map output) { + output.put(URL, url.getURL()); + output.put(DOCID, String.valueOf(url.getDocid())); + output.put(PARENT_DOCID, String.valueOf(url.getParentDocid())); + String parentUrl = url.getParentUrl(); + if(parentUrl != null) { + output.put(PARENT_URL, parentUrl); + } + output.put(DEPTH, String.valueOf(url.getDepth())); + output.put(PRIORITY, String.valueOf(url.getPriority())); + String anchor = url.getAnchor(); + if(anchor != null) { + output.put(ANCHOR, anchor); + } + } } \ No newline at end of file diff --git a/src/main/java/edu/uci/ics/crawler4j/frontier/WorkQueues.java b/src/main/java/edu/uci/ics/crawler4j/frontier/WorkQueues.java index 0b42c164d..4e7c94746 100644 --- a/src/main/java/edu/uci/ics/crawler4j/frontier/WorkQueues.java +++ b/src/main/java/edu/uci/ics/crawler4j/frontier/WorkQueues.java @@ -17,130 +17,104 @@ package edu.uci.ics.crawler4j.frontier; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; -import com.sleepycat.je.Cursor; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.Environment; -import com.sleepycat.je.OperationStatus; -import com.sleepycat.je.Transaction; - +import edu.uci.ics.crawler4j.crawler.CrawlConfig; import edu.uci.ics.crawler4j.url.WebURL; import edu.uci.ics.crawler4j.util.Util; +import redis.clients.jedis.Jedis; /** * @author Yasser Ganjisaffar */ public class WorkQueues { - private final Database urlsDB; - private final Environment env; - - private final boolean resumable; - - private final WebURLTupleBinding webURLBinding; - - protected final Object mutex = new Object(); - - public WorkQueues(Environment env, String dbName, boolean resumable) { - this.env = env; - this.resumable = resumable; - DatabaseConfig dbConfig = new DatabaseConfig(); - dbConfig.setAllowCreate(true); - dbConfig.setTransactional(resumable); - dbConfig.setDeferredWrite(!resumable); - urlsDB = env.openDatabase(null, dbName, dbConfig); - webURLBinding = new WebURLTupleBinding(); - } - - protected Transaction beginTransaction() { - return resumable ? env.beginTransaction(null, null) : null; - } - - protected static void commit(Transaction tnx) { - if (tnx != null) { - tnx.commit(); - } - } - - protected Cursor openCursor(Transaction txn) { - return urlsDB.openCursor(txn, null); - } - - public List get(int max) { - synchronized (mutex) { - List results = new ArrayList<>(max); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - Transaction txn = beginTransaction(); - try (Cursor cursor = openCursor(txn)) { - OperationStatus result = cursor.getFirst(key, value, null); - int matches = 0; - while ((matches < max) && (result == OperationStatus.SUCCESS)) { - if (value.getData().length > 0) { - results.add(webURLBinding.entryToObject(value)); - matches++; - } - result = cursor.getNext(key, value, null); - } - } - commit(txn); - return results; - } - } - - public void delete(int count) { - synchronized (mutex) { - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - Transaction txn = beginTransaction(); - try (Cursor cursor = openCursor(txn)) { - OperationStatus result = cursor.getFirst(key, value, null); - int matches = 0; - while ((matches < count) && (result == OperationStatus.SUCCESS)) { - cursor.delete(); - matches++; - result = cursor.getNext(key, value, null); - } - } - commit(txn); - } - } - - /* - * The key that is used for storing URLs determines the order - * they are crawled. Lower key values results in earlier crawling. - * Here our keys are 6 bytes. The first byte comes from the URL priority. - * The second byte comes from depth of crawl at which this URL is first found. - * The rest of the 4 bytes come from the docid of the URL. As a result, - * URLs with lower priority numbers will be crawled earlier. If priority - * numbers are the same, those found at lower depths will be crawled earlier. - * If depth is also equal, those found earlier (therefore, smaller docid) will - * be crawled earlier. - */ - protected static DatabaseEntry getDatabaseEntryKey(WebURL url) { - byte[] keyData = new byte[6]; - keyData[0] = url.getPriority(); - keyData[1] = ((url.getDepth() > Byte.MAX_VALUE) ? Byte.MAX_VALUE : (byte) url.getDepth()); - Util.putIntInByteArray(url.getDocid(), keyData, 2); - return new DatabaseEntry(keyData); - } - - public void put(WebURL url) { - DatabaseEntry value = new DatabaseEntry(); - webURLBinding.objectToEntry(url, value); - Transaction txn = beginTransaction(); - urlsDB.put(txn, getDatabaseEntryKey(url), value); - commit(txn); - } - - public long getLength() { - return urlsDB.count(); - } - - public void close() { - urlsDB.close(); - } + public static final String KEY_PREFIX = "url:"; + public static final String ALL_URLS = "url:urls"; + private final Jedis urlsDB; + + private final WebURLTupleBinding webURLBinding; + + protected final Object mutex = new Object(); + + public WorkQueues(int databaseIndex,CrawlConfig crawlConfig) { + urlsDB = new Jedis(crawlConfig.getRedisHost(), crawlConfig.getRedisPort()); + urlsDB.select(databaseIndex); + webURLBinding = new WebURLTupleBinding(); + } + + public List get(int max) { + synchronized (mutex) { + List results = new ArrayList<>(max); + Set zrange = urlsDB.zrange(ALL_URLS, 0, max); + for (String s : zrange) { + Map hmap = urlsDB.hgetAll(s); + WebURL webURL = webURLBinding.entryToObject(hmap); + results.add(webURL); + } + return results; + } + } + + public void delete(int count) { + synchronized (mutex) { + if (count != 0) { + Set zrange = urlsDB.zrange(ALL_URLS, 0, count - 1); + urlsDB.zrem(ALL_URLS, zrange.toArray(new String[zrange.size()])); + for (String key : zrange) { + urlsDB.del(key); + } + } + } + } + + /* + * The key that is used for storing URLs determines the order + * they are crawled. Lower key values results in earlier crawling. + * Here our keys are 6 bytes. The first byte comes from the URL priority. + * The second byte comes from depth of crawl at which this URL is first found. + * The rest of the 4 bytes come from the docid of the URL. As a result, + * URLs with lower priority numbers will be crawled earlier. If priority + * numbers are the same, those found at lower depths will be crawled earlier. + * If depth is also equal, those found earlier (therefore, smaller docid) will + * be crawled earlier. + */ + protected static String getDatabaseEntryKey(WebURL url) { + byte depth = (url.getDepth() > Byte.MAX_VALUE) ? Byte.MAX_VALUE : (byte) url.getDepth(); + byte priority = url.getPriority(); + String key = KEY_PREFIX + priority + ":" + depth + ":" + url.getDocid(); + return key; + } + + protected static double getScore(WebURL url) { + byte[] keyData = new byte[8]; + keyData[0] = url.getPriority(); + keyData[1] = ((url.getDepth() > Byte.MAX_VALUE) ? Byte.MAX_VALUE : (byte) url.getDepth()); + Util.putIntInByteArray(url.getDocid(), keyData, 2); + return ByteBuffer.wrap(keyData).getDouble(); + } + + public void put(WebURL url) { + Map value = new HashMap<>(); + webURLBinding.objectToEntry(url, value); + String key = getDatabaseEntryKey(url); + urlsDB.hmset(key, value); + urlsDB.zadd(ALL_URLS, getScore(url), key); + } + + public Jedis getUrlsDB() { + return urlsDB; + } + + public long getLength() { + return urlsDB.zcard(ALL_URLS); + } + + public void close() { + urlsDB.close(); + } } \ No newline at end of file diff --git a/src/main/java/edu/uci/ics/crawler4j/url/WebURL.java b/src/main/java/edu/uci/ics/crawler4j/url/WebURL.java index 576950564..87ffcfb9e 100644 --- a/src/main/java/edu/uci/ics/crawler4j/url/WebURL.java +++ b/src/main/java/edu/uci/ics/crawler4j/url/WebURL.java @@ -19,19 +19,14 @@ import java.io.Serializable; -import com.sleepycat.persist.model.Entity; -import com.sleepycat.persist.model.PrimaryKey; /** * @author Yasser Ganjisaffar */ - -@Entity public class WebURL implements Serializable { private static final long serialVersionUID = 1L; - @PrimaryKey private String url; private int docid;