diff --git a/src/main/java/org/ohdsi/webapi/service/CDMResultsService.java b/src/main/java/org/ohdsi/webapi/service/CDMResultsService.java index efa69e9325..bd1962fc9a 100644 --- a/src/main/java/org/ohdsi/webapi/service/CDMResultsService.java +++ b/src/main/java/org/ohdsi/webapi/service/CDMResultsService.java @@ -134,7 +134,6 @@ public class CDMResultsService extends AbstractDaoService implements Initializin @Override public void afterPropertiesSet() throws Exception { - queryRunner.init(this.getSourceDialect(), objectMapper); warmCaches(); } @@ -243,7 +242,7 @@ public JobExecutionResource refreshCache(@PathParam("sourceKey") final String so if(isSecured() && isAdmin()) { Source source = getSourceRepository().findBySourceKey(sourceKey); if (sourceAccessor.hasAccess(source)) { - JobExecutionResource jobExecutionResource = jobService.findJobByName(Constants.WARM_CACHE, getWarmCacheJobName(sourceKey)); + JobExecutionResource jobExecutionResource = jobService.findJobByName(Constants.WARM_CACHE, getWarmCacheJobName(String.valueOf(source.getSourceId()),sourceKey)); if (jobExecutionResource == null) { if (source.getDaimons().stream().anyMatch(sd -> Objects.equals(sd.getDaimonType(), SourceDaimon.DaimonType.Results))) { return warmCacheByKey(source.getSourceKey()); @@ -381,8 +380,8 @@ public JsonNode getRawDrilldown(String domain, int conceptId, String sourceKey) } private JobExecutionResource warmCacheByKey(String sourceKey) { - if (jobService.findJobByName(getWarmCacheJobName(sourceKey), getWarmCacheJobName(sourceKey)) == null) { - Source source = getSourceRepository().findBySourceKey(sourceKey); + Source source = getSourceRepository().findBySourceKey(sourceKey); + if (jobService.findJobByName(getWarmCacheJobName(String.valueOf(source.getSourceId()), sourceKey), getWarmCacheJobName(String.valueOf(source.getSourceId()), sourceKey)) == null) { return warmCaches(source); } else { return new JobExecutionResource(); @@ -399,14 +398,21 @@ private JobExecutionResource warmCaches(Source source) { logger.info("Cache wouldn't be applied to sources without Vocabulary and Result schemas, source [{}] was omitted", source.getSourceName()); return new JobExecutionResource(); } - - String jobName = getWarmCacheJobName(source.getSourceKey()); + + String jobName = getWarmCacheJobName(String.valueOf(source.getSourceId()), source.getSourceKey()); Step resultsCacheStep = getCountStep(source, jobName); Step achillesCacheStep = getAchillesStep(source, jobName); - SimpleJobBuilder builder = jobBuilders.get(jobName) - .start(achillesCacheStep) - .next(resultsCacheStep); + .start(achillesCacheStep); + + /* + * Only run the results cache step if the results source has a + * priority >= 1 + */ + if (getResultsDaimonPriority(source) > 0) { + builder = builder.next(resultsCacheStep); + } + return createJob(source.getSourceKey(), source.getSourceId(), jobName, builder); } @@ -429,15 +435,16 @@ private void warmCaches(Collection sources) { for (Source source : vocabularySources) { sourceIds.add(source.getSourceId()); sourceKeys.add(source.getSourceKey()); - String jobStepName = getWarmCacheJobName(source.getSourceKey()); + String jobStepName = getWarmCacheJobName(String.valueOf(source.getSourceId()), source.getSourceKey()); // Check whether cache job for current source already exists if (jobService.findJobByName(jobStepName, jobStepName) == null) { + // Create the job step Step jobStep = getJobStep(source, jobStepName); // get priority of the results daimon - int priority = getPriority(source); + int priority = getResultsDaimonPriority(source); // if source has results daimon with high priority - put it at the beginning of the queue - if (priority == 1) { + if (priority > 0) { jobSteps.add(0, jobStep); } else { jobSteps.add(jobStep); @@ -445,12 +452,13 @@ private void warmCaches(Collection sources) { } if (counter++ >= bucketSizes[bucketIndex] - 1) { - createJob(sourceIds.stream().map(String::valueOf).collect(Collectors.joining(", ")), + createJob(sourceIds.stream().map(String::valueOf).collect(Collectors.joining(",")), String.join(", ", sourceKeys), jobSteps); bucketIndex++; counter = 0; + sourceIds.clear(); sourceKeys.clear(); jobSteps.clear(); } @@ -458,11 +466,14 @@ private void warmCaches(Collection sources) { } private Step getJobStep(Source source, String jobStepName) { + int resultDaimonPriority = getResultsDaimonPriority(source); SimpleJob job = new SimpleJob(jobStepName); job.setJobRepository(jobRepository); job.addStep(getAchillesStep(source, jobStepName)); - job.addStep(getCountStep(source, jobStepName)); + if (resultDaimonPriority > 0) { + job.addStep(getCountStep(source, jobStepName)); + } return stepBuilderFactory.get(jobStepName) .job(job) @@ -538,7 +549,7 @@ private Step getCountStep(Source source, String jobStepName) { .build(); } - private int getPriority(Source source) { + private int getResultsDaimonPriority(Source source) { Optional resultsPriority = source.getDaimons().stream() .filter(d -> d.getDaimonType().equals(SourceDaimon.DaimonType.Results)) .map(SourceDaimon::getPriority) @@ -547,15 +558,18 @@ private int getPriority(Source source) { return resultsPriority.orElse(0); } - private String getWarmCacheJobName(String sourceKey) { - return String.format("warming cache: %s", sourceKey); - } - private String getWarmCacheJobName(String sourceIds, String sourceKeys) { // for multiple sources: try to compose a job name from source keys, and if it is too long - use source ids String jobName = String.format("warming cache: %s", sourceKeys); + if (jobName.length() >= 100) { // job name in batch_job_instance is varchar(100) jobName = String.format("warming cache: %s", sourceIds); + + if (jobName.length() >= 100) { // if we still have more than 100 symbols + jobName = jobName.substring(0, 88); + jobName = jobName.substring(0, jobName.lastIndexOf(',')) + .concat(" and more..."); // todo: this is quick fix. need better solution + } } return jobName; }