Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebAPI v2.11 fails to start with many sources configured #2031 #2032

Merged
merged 2 commits into from
May 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions src/main/java/org/ohdsi/webapi/service/CDMResultsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public class CDMResultsService extends AbstractDaoService implements Initializin

@Override
public void afterPropertiesSet() throws Exception {

queryRunner.init(this.getSourceDialect(), objectMapper);
warmCaches();
}
Expand Down Expand Up @@ -243,7 +242,7 @@ public JobExecutionResource refreshCache(@PathParam("sourceKey") final String so
if(isSecured() && isAdmin()) {
Source source = getSourceRepository().findBySourceKey(sourceKey);
if (sourceAccessor.hasAccess(source)) {
JobExecutionResource jobExecutionResource = jobService.findJobByName(Constants.WARM_CACHE, getWarmCacheJobName(sourceKey));
JobExecutionResource jobExecutionResource = jobService.findJobByName(Constants.WARM_CACHE, getWarmCacheJobName(String.valueOf(source.getSourceId()),sourceKey));
if (jobExecutionResource == null) {
if (source.getDaimons().stream().anyMatch(sd -> Objects.equals(sd.getDaimonType(), SourceDaimon.DaimonType.Results))) {
return warmCacheByKey(source.getSourceKey());
Expand Down Expand Up @@ -381,8 +380,8 @@ public JsonNode getRawDrilldown(String domain, int conceptId, String sourceKey)
}

private JobExecutionResource warmCacheByKey(String sourceKey) {
if (jobService.findJobByName(getWarmCacheJobName(sourceKey), getWarmCacheJobName(sourceKey)) == null) {
Source source = getSourceRepository().findBySourceKey(sourceKey);
Source source = getSourceRepository().findBySourceKey(sourceKey);
if (jobService.findJobByName(getWarmCacheJobName(String.valueOf(source.getSourceId()), sourceKey), getWarmCacheJobName(String.valueOf(source.getSourceId()), sourceKey)) == null) {
return warmCaches(source);
} else {
return new JobExecutionResource();
Expand All @@ -399,14 +398,21 @@ private JobExecutionResource warmCaches(Source source) {
logger.info("Cache wouldn't be applied to sources without Vocabulary and Result schemas, source [{}] was omitted", source.getSourceName());
return new JobExecutionResource();
}

String jobName = getWarmCacheJobName(source.getSourceKey());
String jobName = getWarmCacheJobName(String.valueOf(source.getSourceId()), source.getSourceKey());
Step resultsCacheStep = getCountStep(source, jobName);
Step achillesCacheStep = getAchillesStep(source, jobName);

SimpleJobBuilder builder = jobBuilders.get(jobName)
.start(achillesCacheStep)
.next(resultsCacheStep);
.start(achillesCacheStep);

/*
* Only run the results cache step if the results source has a
* priority >= 1
*/
if (getResultsDaimonPriority(source) > 0) {
builder = builder.next(resultsCacheStep);
}

return createJob(source.getSourceKey(), source.getSourceId(), jobName, builder);
}

Expand All @@ -429,40 +435,45 @@ private void warmCaches(Collection<Source> sources) {
for (Source source : vocabularySources) {
sourceIds.add(source.getSourceId());
sourceKeys.add(source.getSourceKey());
String jobStepName = getWarmCacheJobName(source.getSourceKey());
String jobStepName = getWarmCacheJobName(String.valueOf(source.getSourceId()), source.getSourceKey());
// Check whether cache job for current source already exists
if (jobService.findJobByName(jobStepName, jobStepName) == null) {
// Create the job step
Step jobStep = getJobStep(source, jobStepName);

// get priority of the results daimon
int priority = getPriority(source);
int priority = getResultsDaimonPriority(source);
// if source has results daimon with high priority - put it at the beginning of the queue
if (priority == 1) {
if (priority > 0) {
jobSteps.add(0, jobStep);
} else {
jobSteps.add(jobStep);
}
}

if (counter++ >= bucketSizes[bucketIndex] - 1) {
createJob(sourceIds.stream().map(String::valueOf).collect(Collectors.joining(", ")),
createJob(sourceIds.stream().map(String::valueOf).collect(Collectors.joining(",")),
String.join(", ", sourceKeys),
jobSteps);

bucketIndex++;
counter = 0;
sourceIds.clear();
sourceKeys.clear();
jobSteps.clear();
}
}
}

private Step getJobStep(Source source, String jobStepName) {
int resultDaimonPriority = getResultsDaimonPriority(source);
SimpleJob job = new SimpleJob(jobStepName);
job.setJobRepository(jobRepository);

job.addStep(getAchillesStep(source, jobStepName));
job.addStep(getCountStep(source, jobStepName));
if (resultDaimonPriority > 0) {
job.addStep(getCountStep(source, jobStepName));
}

return stepBuilderFactory.get(jobStepName)
.job(job)
Expand Down Expand Up @@ -538,7 +549,7 @@ private Step getCountStep(Source source, String jobStepName) {
.build();
}

private int getPriority(Source source) {
private int getResultsDaimonPriority(Source source) {
Optional<Integer> resultsPriority = source.getDaimons().stream()
.filter(d -> d.getDaimonType().equals(SourceDaimon.DaimonType.Results))
.map(SourceDaimon::getPriority)
Expand All @@ -547,15 +558,18 @@ private int getPriority(Source source) {
return resultsPriority.orElse(0);
}

private String getWarmCacheJobName(String sourceKey) {
return String.format("warming cache: %s", sourceKey);
}

private String getWarmCacheJobName(String sourceIds, String sourceKeys) {
// for multiple sources: try to compose a job name from source keys, and if it is too long - use source ids
String jobName = String.format("warming cache: %s", sourceKeys);

if (jobName.length() >= 100) { // job name in batch_job_instance is varchar(100)
jobName = String.format("warming cache: %s", sourceIds);

if (jobName.length() >= 100) { // if we still have more than 100 symbols
jobName = jobName.substring(0, 88);
jobName = jobName.substring(0, jobName.lastIndexOf(','))
.concat(" and more..."); // todo: this is quick fix. need better solution
}
}
return jobName;
}
Expand Down