Skip to content

Commit

Permalink
NUTCH-3058 Fetcher: counter for hung threads (#820)
Browse files Browse the repository at this point in the history
- add counter FetcherStatus:hungThreads
- log stack traces of hung threads with level WARN (instead of DEBUG)
  • Loading branch information
sebastian-nagel authored Sep 16, 2024
1 parent 9d138ff commit 582cdd4
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
7 changes: 4 additions & 3 deletions conf/nutch-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1200,10 +1200,11 @@
<property>
<name>fetcher.threads.timeout.divisor</name>
<value>2</value>
<description>(EXPERT)The thread time-out divisor to use. By default threads have a time-out
<description>(EXPERT) The thread time-out divisor to use. By default threads have a time-out
value of mapreduce.task.timeout / 2. Increase this setting if the fetcher waits too
long before killing hanged threads. Be careful, a too high setting (+8) will most likely kill the
fetcher threads prematurely.
long before killing hung threads. Be careful, a too high setting (+8) will most likely kill the
fetcher threads prematurely. The fetcher thread time-out avoids that the task timeout (defined by
the Hadoop configuration property mapreduce.task.timeout) is reached and the fetcher job is failed.
</description>
</property>

Expand Down
48 changes: 31 additions & 17 deletions src/java/org/apache/nutch/fetcher/Fetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -419,27 +419,41 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
.increment(hitByTimeLimit);
}

// some requests seem to hang, despite all intentions
/*
* Some requests seem to hang, with no fetches finished and no new
* fetches started during half of the MapReduce task timeout
* (mapreduce.task.timeout, default value: 10 minutes). In order to
* avoid that the task timeout is hit and the fetcher job is failed,
* we stop the fetching now.
*/
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
if (LOG.isWarnEnabled()) {
LOG.warn("Aborting with {} hung threads.", activeThreads);
for (int i = 0; i < fetcherThreads.size(); i++) {
FetcherThread thread = fetcherThreads.get(i);
if (thread.isAlive()) {
LOG.warn("Thread #{} hung while processing {}", i,
thread.getReprUrl());
if (LOG.isDebugEnabled()) {
StackTraceElement[] stack = thread.getStackTrace();
StringBuilder sb = new StringBuilder();
sb.append("Stack of thread #").append(i).append(":\n");
for (StackTraceElement s : stack) {
sb.append(s.toString()).append('\n');
}
LOG.debug(sb.toString());
}
LOG.warn("Aborting with {} hung threads.", activeThreads);
innerContext.getCounter("FetcherStatus", "hungThreads")
.increment(activeThreads.get());
for (int i = 0; i < fetcherThreads.size(); i++) {
FetcherThread thread = fetcherThreads.get(i);
if (thread.isAlive()) {
LOG.warn("Thread #{} hung while processing {}", i,
thread.getReprUrl());
StackTraceElement[] stack = thread.getStackTrace();
StringBuilder sb = new StringBuilder();
sb.append("Stack of thread #").append(i).append(":\n");
for (StackTraceElement s : stack) {
sb.append(s.toString()).append('\n');
}
LOG.warn(sb.toString());
}
}
/*
* log and count queued items dropped from the fetch queues because
* of the timeout
*/
LOG.warn("Aborting with {} queued fetch items in {} queues{}.",
fetchQueues.getTotalSize(), fetchQueues.getQueueCount(),
feeder.isAlive() ? " (queue feeder still alive)" : "");
int hitByTimeout = fetchQueues.emptyQueues();
innerContext.getCounter("FetcherStatus", "hitByTimeout")
.increment(hitByTimeout);
return;
}

Expand Down

0 comments on commit 582cdd4

Please sign in to comment.