Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dedup load from in-memory dedup cache db #385

Merged
merged 1 commit into from
May 20, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ protected TroughClient troughClient() throws MalformedURLException {
"DROP table dedup;";
protected static final String SELECT_ALL_SQL =
"SELECT * FROM dedup;";
protected static final String DEDUP_QUERY_SQL =
"SELECT * FROM dedup WHERE digest_key = ? LIMIT 1";

protected ConcurrentHashMap<String, Object> segmentCache = new ConcurrentHashMap<String, Object>();
protected Connection dedupDbConnection = null;
@Override
Expand Down Expand Up @@ -168,26 +171,52 @@ public void load(CrawlURI curi) {
// WARCWriterProcessor knows it should put the info in there
HashMap<String, Object> contentDigestHistory = curi.getContentDigestHistory();

try {
String sql = "select * from dedup where digest_key = %s";
List<Map<String, Object>> results = troughClient().read(getSegmentId(), sql, new String[] {persistKeyFor(curi)});
if (!results.isEmpty()) {
Map<String,Object> hist = new HashMap<String, Object>();
hist.put(A_ORIGINAL_URL, results.get(0).get("url"));
hist.put(A_ORIGINAL_DATE, results.get(0).get("date"));
hist.put(A_WARC_RECORD_ID, results.get(0).get("id"));
//Check in-memory segment first
boolean memoryDedupHit = false;
if(segmentCache.containsKey(getSegmentId())) {
String segmentDedupQuerySql = segmentizeDedupTableName(getSegmentId(), DEDUP_QUERY_SQL);
try(PreparedStatement dedupQueryStatement = dedupDbConnection.prepareStatement(segmentDedupQuerySql)) {
dedupQueryStatement.setString(1, persistKeyFor(curi));
ResultSet rs = dedupQueryStatement.executeQuery();
while(rs.next()) {
Map<String, Object> hist = new HashMap<String, Object>();
hist.put(A_ORIGINAL_URL, rs.getString("url"));
hist.put(A_ORIGINAL_DATE, rs.getString("date"));
hist.put(A_WARC_RECORD_ID, rs.getString("id"));
if (logger.isLoggable(Level.FINER)) {
logger.finer("loaded in-memory history by digest " + persistKeyFor(curi)
+ " for uri " + curi + " - " + hist);
}
contentDigestHistory.putAll(hist);
memoryDedupHit=true;
break;
}
} catch (Exception e) {
logger.log(Level.WARNING, "problem querying in-memory dedup in " + getSegmentId() + " for url " + curi + " sql: "+segmentDedupQuerySql, e);
}
}
if(!memoryDedupHit) {
try {
String sql = "select * from dedup where digest_key = %s";
List<Map<String, Object>> results = troughClient().read(getSegmentId(), sql, new String[]{persistKeyFor(curi)});
if (!results.isEmpty()) {
Map<String, Object> hist = new HashMap<String, Object>();
hist.put(A_ORIGINAL_URL, results.get(0).get("url"));
hist.put(A_ORIGINAL_DATE, results.get(0).get("date"));
hist.put(A_WARC_RECORD_ID, results.get(0).get("id"));

if (logger.isLoggable(Level.FINER)) {
logger.finer("loaded history by digest " + persistKeyFor(curi)
+ " for uri " + curi + " - " + hist);
if (logger.isLoggable(Level.FINER)) {
logger.finer("loaded history by digest " + persistKeyFor(curi)
+ " for uri " + curi + " - " + hist);
}
contentDigestHistory.putAll(hist);
}
contentDigestHistory.putAll(hist);
} catch (TroughNoReadUrlException e) {
// this is totally normal at the beginning of the crawl, for example
logger.log(Level.FINE, "problem retrieving dedup info from trough segment " + getSegmentId() + " for url " + curi, e);
} catch (Exception e) {
logger.log(Level.WARNING, "problem retrieving dedup info from trough segment " + getSegmentId() + " for url " + curi, e);
}
} catch (TroughNoReadUrlException e) {
// this is totally normal at the beginning of the crawl, for example
logger.log(Level.FINE, "problem retrieving dedup info from trough segment " + getSegmentId() + " for url " + curi, e);
} catch (Exception e) {
logger.log(Level.WARNING, "problem retrieving dedup info from trough segment " + getSegmentId() + " for url " + curi, e);
}
}

Expand Down