Skip to content

Commit

Permalink
NUTCH-3029 Host specific max. and min. intervals in adaptive scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Markus Jelsma committed Mar 13, 2024
1 parent 4f62dec commit 5ba50c0
Showing 1 changed file with 155 additions and 4 deletions.
159 changes: 155 additions & 4 deletions src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,20 @@
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.commons.lang.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Reader;
import java.io.FileReader;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;

/**
* This class implements an adaptive re-fetch algorithm. This works as follows:
Expand Down Expand Up @@ -79,9 +88,16 @@ public class AdaptiveFetchSchedule extends AbstractFetchSchedule {

private double SYNC_DELTA_RATE;

private Configuration conf;

private Map<String,Float> hostSpecificMaxInterval = new HashMap<>();

private Map<String,Float> hostSpecificMinInterval = new HashMap<>();

@Override
public void setConf(Configuration conf) {
super.setConf(conf);
this.conf = conf;
if (conf == null)
return;
INC_RATE = conf.getFloat("db.fetch.schedule.adaptive.inc_rate", 0.2f);
Expand All @@ -92,6 +108,136 @@ public void setConf(Configuration conf) {
SYNC_DELTA = conf.getBoolean("db.fetch.schedule.adaptive.sync_delta", true);
SYNC_DELTA_RATE = conf.getFloat(
"db.fetch.schedule.adaptive.sync_delta_rate", 0.2f);
try {
setHostSpecificIntervals("adaptive-host-specific-intervals.txt",
MIN_INTERVAL, MAX_INTERVAL);
} catch (IOException e){
LOG.error("Failed reading the configuration file. ", e);
}
}

/**
* Load host-specific min_intervals and max_intervals
* from the configuration file into the HashMaps.
*/
private void setHostSpecificIntervals(String fileName,
float defaultMin, float defaultMax) throws IOException {
Reader configReader = null;
configReader = conf.getConfResourceAsReader(fileName);
if (configReader == null) {
configReader = new FileReader(fileName);
}
BufferedReader reader = new BufferedReader(configReader);
String line;
int lineNo = 0;
while ((line = reader.readLine()) != null) {
lineNo++;
if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
line = line.trim();
String[] parts = line.split("\\s+");
if (parts.length == 3) {
// TODO: Maybe add host validatio here?
// It might get computationally expensive for large files, though.
String host = parts[0].trim().toLowerCase();
String minInt = parts[1].trim();
String maxInt = parts[2].trim();
if (minInt.equalsIgnoreCase("default")){ minInt = "0"; }
if (maxInt.equalsIgnoreCase("default")){ maxInt = "0"; }
float m,M;
try {
m = Float.parseFloat(minInt);
M = Float.parseFloat(maxInt);

//negative values and mismatched boundaries are ignored
//(default to global settings)
if (m < 0 || M < 0 || m > M){
LOG.error("Improper fetch intervals given on line " + String.valueOf(lineNo)
+ " in the config. file: " + line);
} else {

// min. interval should be positive and above the global minimum
if (m > 0 && m > defaultMin){
hostSpecificMinInterval.put(host,m);
LOG.debug("Added custom min. interval " + m + " for host " + host + ".");
} else if (m > 0) {
LOG.error("Min. interval out of bounds on line " + String.valueOf(lineNo)
+ " in the config. file: " + line);
}

// max. interval should be positive and below the global maximum
if (M > 0 && M < defaultMax){
hostSpecificMaxInterval.put(host,M);
LOG.debug("Added custom max. interval " + M + " for host " + host + ".");
} else if (M > 0){
LOG.error("Max. interval out of bounds on line " + String.valueOf(lineNo)
+ " in the config. file: " + line);
}

// zero values are ignored (default to global settings)
}
} catch (NumberFormatException e){
LOG.error("No proper fetch intervals given on line " + String.valueOf(lineNo)
+ " in the config. file: " + line, e);
}
} else {
LOG.error("Malformed (domain, min_interval, max_interval) triplet on line "
+ String.valueOf(lineNo) + " of the config. file: " + line);
}
}
}
}

/**
* Strip a URL, leaving only the host name.
*/
public static String getHostName(String url) throws URISyntaxException {
URI uri = new URI(url);
String domain = uri.getHost();
return domain;
}

/**
* Returns the max_interval for this URL, which might depend on the host.
* @param url the URL to be scheduled
* @param defaultMaxInterval the value to which to default
* if max_interval has not been configured for this host
*/
public float getMaxInterval(Text url, float defaultMaxInterval){
if (hostSpecificMaxInterval.isEmpty()) {
return defaultMaxInterval;
}
String host;
try {
host = getHostName(url.toString());
} catch (URISyntaxException e){
return defaultMaxInterval;
}
if (hostSpecificMaxInterval.containsKey(host)){
return hostSpecificMaxInterval.get(host);
}
return defaultMaxInterval;
}

/**
* Returns the min_interval for this URL, which might depend on the host.
* @param url the URL to be scheduled
* @param defaultMinInterval the value to which to default
* if min_interval has not been configured for this host
*/
public float getMinInterval(Text url, float defaultMinInterval){
if (hostSpecificMinInterval.isEmpty()) {
return defaultMinInterval;
}
String host;
try {
host = getHostName(url.toString());
} catch (URISyntaxException e){
return defaultMinInterval;
}
if (hostSpecificMinInterval.containsKey(host)){
return hostSpecificMinInterval.get(host);
}
return defaultMinInterval;
}

@Override
Expand Down Expand Up @@ -133,10 +279,15 @@ public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum,
interval = delta;
refTime = fetchTime - Math.round(delta * SYNC_DELTA_RATE * 1000);
}
if (interval < MIN_INTERVAL) {
interval = MIN_INTERVAL;
} else if (interval > MAX_INTERVAL) {
interval = MAX_INTERVAL;

// replace min_interval and max_interval with a domain-specific ones,
// if so configured.
float newMaxInterval = getMaxInterval(url, MAX_INTERVAL);
float newMinInterval = getMinInterval(url, MIN_INTERVAL);
if (interval < newMinInterval) {
interval = newMinInterval;
} else if (interval > newMaxInterval) {
interval = newMaxInterval;
}
}

Expand Down

0 comments on commit 5ba50c0

Please sign in to comment.