Skip to content

Commit 3b6d2c6

Browse files
Merge pull request #832 from sebastian-nagel/NUTCH-3072
NUTCH-3072 Fetcher to stop QueueFeeder if aborting with "hung threads"
2 parents 74b49e9 + c226162 commit 3b6d2c6

File tree

3 files changed

+68
-35
lines changed

3 files changed

+68
-35
lines changed

src/java/org/apache/nutch/fetcher/FetchItemQueues.java

+22-8
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ public class FetchItemQueues {
5858
int maxExceptionsPerQueue = -1;
5959
long exceptionsPerQueueDelay = -1;
6060
long exceptionsPerQueueClearAfter = 1800 * 1000L;
61-
boolean feederAlive = true;
61+
volatile boolean feederAlive = true;
62+
volatile boolean timoutReached = false;
6263
Configuration conf;
6364

6465
public static final String QUEUE_MODE_HOST = "byHost";
@@ -71,7 +72,8 @@ enum QueuingStatus {
7172
SUCCESSFULLY_QUEUED,
7273
ERROR_CREATE_FETCH_ITEM,
7374
ABOVE_EXCEPTION_THRESHOLD,
74-
HIT_BY_TIMELIMIT;
75+
HIT_BY_TIMELIMIT,
76+
HIT_BY_TIMEOUT;
7577
}
7678

7779
public FetchItemQueues(Configuration conf) {
@@ -105,7 +107,7 @@ public FetchItemQueues(Configuration conf) {
105107

106108
/**
107109
* Check whether queue mode is valid, fall-back to default mode if not.
108-
*
110+
*
109111
* @param queueMode
110112
* queue mode to check
111113
* @return valid queue mode or default
@@ -258,6 +260,18 @@ public synchronized int checkTimelimit() {
258260
return count;
259261
}
260262

263+
/**
264+
* Signal that the hard timeout is reached because new fetches / requests
265+
* where made during half of the MapReduce task timeout
266+
* (<code>mapreduce.task.timeout</code>, default value: 10 minutes). In order
267+
* to avoid that the task timeout is hit and the fetcher job is failed, we
268+
* stop the fetching now. See also the property
269+
* <code>fetcher.threads.timeout.divisor</code>.
270+
*/
271+
public void setTimeoutReached() {
272+
this.timoutReached = true;
273+
}
274+
261275
// empties the queues (used by fetcher timelimit and throughput threshold)
262276
public synchronized int emptyQueues() {
263277
int count = 0, queuesDropped = 0;
@@ -282,10 +296,10 @@ public synchronized int emptyQueues() {
282296
/**
283297
* Increment the exception counter of a queue in case of an exception e.g.
284298
* timeout; when higher than a given threshold simply empty the queue.
285-
*
299+
*
286300
* The next fetch is delayed if specified by the param {@code delay} or
287301
* configured by the property {@code fetcher.exceptions.per.queue.delay}.
288-
*
302+
*
289303
* @param queueid
290304
* a queue identifier to locate and check
291305
* @param maxExceptions
@@ -296,7 +310,7 @@ public synchronized int emptyQueues() {
296310
* in addition to the delay defined for the given queue. If a
297311
* negative value is passed the delay is chosen by
298312
* {@code fetcher.exceptions.per.queue.delay}
299-
*
313+
*
300314
* @return number of purged items
301315
*/
302316
public synchronized int checkExceptionThreshold(String queueid,
@@ -367,9 +381,9 @@ private int purgeAndBlockQueue(String queueid, FetchItemQueue fiq,
367381
/**
368382
* Increment the exception counter of a queue in case of an exception e.g.
369383
* timeout; when higher than a given threshold simply empty the queue.
370-
*
384+
*
371385
* @see #checkExceptionThreshold(String, int, long)
372-
*
386+
*
373387
* @param queueid
374388
* queue identifier to locate and check
375389
* @return number of purged items

src/java/org/apache/nutch/fetcher/Fetcher.java

+28-18
Original file line numberDiff line numberDiff line change
@@ -58,25 +58,25 @@
5858

5959
/**
6060
* A queue-based fetcher.
61-
*
61+
*
6262
* <p>
6363
* This fetcher uses a well-known model of one producer (a QueueFeeder) and many
6464
* consumers (FetcherThread-s).
65-
*
65+
*
6666
* <p>
6767
* QueueFeeder reads input fetchlists and populates a set of FetchItemQueue-s,
6868
* which hold FetchItem-s that describe the items to be fetched. There are as
6969
* many queues as there are unique hosts, but at any given time the total number
7070
* of fetch items in all queues is less than a fixed number (currently set to a
7171
* multiple of the number of threads).
72-
*
72+
*
7373
* <p>
7474
* As items are consumed from the queues, the QueueFeeder continues to add new
7575
* input items, so that their total count stays fixed (FetcherThread-s may also
7676
* add new items to the queues e.g. as a results of redirection) - until all
7777
* input items are exhausted, at which point the number of items in the queues
7878
* begins to decrease. When this number reaches 0 fetcher will finish.
79-
*
79+
*
8080
* <p>
8181
* This fetcher implementation handles per-host blocking itself, instead of
8282
* delegating this work to protocol-specific plugins. Each per-host queue
@@ -85,13 +85,13 @@
8585
* list of requests in progress, and the time the last request was finished. As
8686
* FetcherThread-s ask for new items to be fetched, queues may return eligible
8787
* items or null if for "politeness" reasons this host's queue is not yet ready.
88-
*
88+
*
8989
* <p>
9090
* If there are still unfetched items in the queues, but none of the items are
9191
* ready, FetcherThread-s will spin-wait until either some items become
9292
* available, or a timeout is reached (at which point the Fetcher will abort,
9393
* assuming the task is hung).
94-
*
94+
*
9595
* @author Andrzej Bialecki
9696
*/
9797
public class Fetcher extends NutchTool implements Tool {
@@ -147,7 +147,7 @@ public static class FetcherRun extends
147147
private AtomicInteger activeThreads = new AtomicInteger(0);
148148
private AtomicInteger spinWaiting = new AtomicInteger(0);
149149
private long start = System.currentTimeMillis();
150-
private AtomicLong lastRequestStart = new AtomicLong(start);
150+
private AtomicLong lastRequestStart = new AtomicLong(start);
151151
private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
152152
private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
153153
private AtomicInteger errors = new AtomicInteger(0); // total pages errored
@@ -157,7 +157,7 @@ public static class FetcherRun extends
157157
private AtomicInteger getActiveThreads() {
158158
return activeThreads;
159159
}
160-
160+
161161
private void reportStatus(Context context, FetchItemQueues fetchQueues, int pagesLastSec, int bytesLastSec)
162162
throws IOException {
163163
StringBuilder status = new StringBuilder();
@@ -184,13 +184,13 @@ private void reportStatus(Context context, FetchItemQueues fetchQueues, int page
184184
context.setStatus(status.toString());
185185
}
186186

187-
@Override
187+
@Override
188188
public void setup(Mapper<Text, CrawlDatum, Text, NutchWritable>.Context context) {
189189
Configuration conf = context.getConfiguration();
190190
segmentName = conf.get(Nutch.SEGMENT_NAME_KEY);
191191
storingContent = isStoringContent(conf);
192192
parsing = isParsing(conf);
193-
}
193+
}
194194

195195
@Override
196196
public void run(Context innerContext)
@@ -218,11 +218,6 @@ public void run(Context innerContext)
218218
feeder = new QueueFeeder(innerContext, fetchQueues,
219219
threadCount * queueDepthMultiplier);
220220

221-
// the value of the time limit is either -1 or the time where it should
222-
// finish
223-
long timelimit = conf.getLong("fetcher.timelimit", -1);
224-
if (timelimit != -1)
225-
feeder.setTimeLimit(timelimit);
226221
feeder.start();
227222

228223
int startDelay = conf.getInt("fetcher.threads.start.delay", 10);
@@ -427,9 +422,12 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
427422
* fetches started during half of the MapReduce task timeout
428423
* (mapreduce.task.timeout, default value: 10 minutes). In order to
429424
* avoid that the task timeout is hit and the fetcher job is failed,
430-
* we stop the fetching now.
425+
* we stop the fetching now. See also the property
426+
* fetcher.threads.timeout.divisor.
431427
*/
432428
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
429+
LOG.warn("Timeout reached with no new requests since {} seconds.",
430+
timeout);
433431
LOG.warn("Aborting with {} hung threads{}.", activeThreads,
434432
feeder.isAlive() ? " (queue feeder still alive)" : "");
435433
innerContext.getCounter("FetcherStatus", "hungThreads")
@@ -448,6 +446,18 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
448446
LOG.warn(sb.toString());
449447
}
450448
}
449+
450+
/*
451+
* signal the queue feeder that the timeout is reached and wait
452+
* shortly for it to shut down
453+
*/
454+
fetchQueues.setTimeoutReached();
455+
if (feeder.isAlive()) {
456+
LOG.info(
457+
"Signaled QueueFeeder to stop, waiting 1.5 seconds before exiting.");
458+
Thread.sleep(1500);
459+
}
460+
451461
/*
452462
* log and count queued items dropped from the fetch queues because
453463
* of the timeout
@@ -469,7 +479,7 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
469479
}
470480
}
471481

472-
public void fetch(Path segment, int threads) throws IOException,
482+
public void fetch(Path segment, int threads) throws IOException,
473483
InterruptedException, ClassNotFoundException {
474484

475485
checkConfiguration();
@@ -626,7 +636,7 @@ public Map<String, Object> run(Map<String, Object> args, String crawlId) throws
626636
else {
627637
String segmentDir = crawlId+"/segments";
628638
File segmentsDir = new File(segmentDir);
629-
File[] segmentsList = segmentsDir.listFiles();
639+
File[] segmentsList = segmentsDir.listFiles();
630640
Arrays.sort(segmentsList, (f1, f2) -> {
631641
if(f1.lastModified()>f2.lastModified())
632642
return -1;

src/java/org/apache/nutch/fetcher/QueueFeeder.java

+18-9
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,10 @@ public class QueueFeeder extends Thread {
3939

4040
private static final Logger LOG = LoggerFactory
4141
.getLogger(MethodHandles.lookup().lookupClass());
42-
42+
4343
private FetcherRun.Context context;
4444
private FetchItemQueues queues;
4545
private int size;
46-
private long timelimit = -1;
4746
private URLFilters urlFilters = null;
4847
private URLNormalizers urlNormalizers = null;
4948
private String urlNormalizerScope = URLNormalizers.SCOPE_DEFAULT;
@@ -64,10 +63,6 @@ public QueueFeeder(FetcherRun.Context context,
6463
}
6564
}
6665

67-
public void setTimeLimit(long tl) {
68-
timelimit = tl;
69-
}
70-
7166
/** Filter and normalize the url */
7267
private String filterNormalize(String url) {
7368
if (url != null) {
@@ -90,12 +85,26 @@ public void run() {
9085
int cnt = 0;
9186
int[] queuingStatus = new int[QueuingStatus.values().length];
9287
while (hasMore) {
93-
if (timelimit != -1 && System.currentTimeMillis() >= timelimit) {
88+
if (queues.timelimitExceeded() || queues.timoutReached) {
9489
// enough ... lets' simply read all the entries from the input without
9590
// processing them
91+
if (queues.timoutReached) {
92+
int qstatus = QueuingStatus.HIT_BY_TIMEOUT.ordinal();
93+
if (queuingStatus[qstatus] == 0) {
94+
LOG.info("QueueFeeder stopping, timeout reached.");
95+
}
96+
queuingStatus[qstatus]++;
97+
context.getCounter("FetcherStatus", "hitByTimeout").increment(1);
98+
} else {
99+
int qstatus = QueuingStatus.HIT_BY_TIMELIMIT.ordinal();
100+
if (queuingStatus[qstatus] == 0) {
101+
LOG.info("QueueFeeder stopping, timelimit exceeded.");
102+
}
103+
queuingStatus[qstatus]++;
104+
context.getCounter("FetcherStatus", "hitByTimeLimit").increment(1);
105+
}
96106
try {
97107
hasMore = context.nextKeyValue();
98-
queuingStatus[QueuingStatus.HIT_BY_TIMELIMIT.ordinal()]++;
99108
} catch (IOException e) {
100109
LOG.error("QueueFeeder error reading input, record " + cnt, e);
101110
return;
@@ -137,7 +146,7 @@ public void run() {
137146
url = new Text(url);
138147
}
139148
CrawlDatum datum = new CrawlDatum();
140-
datum.set((CrawlDatum) context.getCurrentValue());
149+
datum.set(context.getCurrentValue());
141150
QueuingStatus status = queues.addFetchItem(url, datum);
142151
queuingStatus[status.ordinal()]++;
143152
if (status == QueuingStatus.ABOVE_EXCEPTION_THRESHOLD) {

0 commit comments

Comments
 (0)