diff --git a/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc b/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc index b2e24a298cbd0..d343cc23ae0ad 100644 --- a/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc +++ b/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc @@ -20,6 +20,10 @@ progress of a job. (object) An object that provides information about the size and contents of the model. See <> +`forecasts_stats`:: + (object) An object that provides statistical information about forecasts + of this job. See <> + `node`:: (object) For open jobs only, contains information about the node where the job runs. See <>. @@ -177,6 +181,33 @@ NOTE: The `over` field values are counted separately for each detector and parti `timestamp`:: (date) The timestamp of the `model_size_stats` according to the timestamp of the data. +[float] +[[ml-forecastsstats]] +==== Forecasts Stats Objects + +The `forecasts_stats` object shows statistics about forecasts. It has the following properties: + +`total`:: + (long) The number of forecasts currently available for this model. + +`forecasted_jobs`:: + (long) The number of jobs that have at least one forecast. + +`memory_bytes`:: + (object) Statistics about the memory usage: minimum, maximum, average and total. + +`records`:: + (object) Statistics about the number of forecast records: minimum, maximum, average and total. + +`processing_time_ms`:: + (object) Statistics about the forecast runtime in milliseconds: minimum, maximum, average and total. + +`status`:: + (object) Counts per forecast status, for example: {"finished" : 2}. + +NOTE: `memory_bytes`, `records`, `processing_time_ms` and `status` require at least 1 forecast, otherwise +these fields are ommitted. + [float] [[ml-stats-node]] ==== Node Objects diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java index 1779ca703a5d7..ebcaab8495eba 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java @@ -22,6 +22,7 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage { public static final String DATAFEEDS_FIELD = "datafeeds"; public static final String COUNT = "count"; public static final String DETECTORS = "detectors"; + public static final String FORECASTS = "forecasts"; public static final String MODEL_SIZE = "model_size"; private final Map jobsUsage; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java index ad34f5611383f..807c09363759b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -46,6 +47,7 @@ public class GetJobsStatsAction extends Action { private static final String DATA_COUNTS = "data_counts"; private static final String MODEL_SIZE_STATS = "model_size_stats"; + private static final String FORECASTS_STATS = "forecasts_stats"; private static final String STATE = "state"; private static final String NODE = "node"; @@ -154,6 +156,8 @@ public static class JobStats implements ToXContentObject, Writeable { @Nullable private ModelSizeStats modelSizeStats; @Nullable + private ForecastStats forecastStats; + @Nullable private TimeValue openTime; private JobState state; @Nullable @@ -161,11 +165,13 @@ public static class JobStats implements ToXContentObject, Writeable { @Nullable private String assignmentExplanation; - public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state, - @Nullable DiscoveryNode node, @Nullable String assignmentExplanation, @Nullable TimeValue opentime) { + public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, + @Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node, + @Nullable String assignmentExplanation, @Nullable TimeValue opentime) { this.jobId = Objects.requireNonNull(jobId); this.dataCounts = Objects.requireNonNull(dataCounts); this.modelSizeStats = modelSizeStats; + this.forecastStats = forecastStats; this.state = Objects.requireNonNull(state); this.node = node; this.assignmentExplanation = assignmentExplanation; @@ -180,6 +186,9 @@ public JobStats(StreamInput in) throws IOException { node = in.readOptionalWriteable(DiscoveryNode::new); assignmentExplanation = in.readOptionalString(); openTime = in.readOptionalTimeValue(); + if (in.getVersion().onOrAfter(Version.V_6_4_0)) { + forecastStats = in.readOptionalWriteable(ForecastStats::new); + } } public String getJobId() { @@ -193,6 +202,10 @@ public DataCounts getDataCounts() { public ModelSizeStats getModelSizeStats() { return modelSizeStats; } + + public ForecastStats getForecastStats() { + return forecastStats; + } public JobState getState() { return state; @@ -226,6 +239,10 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc if (modelSizeStats != null) { builder.field(MODEL_SIZE_STATS, modelSizeStats); } + if (forecastStats != null) { + builder.field(FORECASTS_STATS, forecastStats); + } + builder.field(STATE, state.toString()); if (node != null) { builder.startObject(NODE); @@ -259,11 +276,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(node); out.writeOptionalString(assignmentExplanation); out.writeOptionalTimeValue(openTime); + if (out.getVersion().onOrAfter(Version.V_6_4_0)) { + out.writeOptionalWriteable(forecastStats); + } } @Override public int hashCode() { - return Objects.hash(jobId, dataCounts, modelSizeStats, state, node, assignmentExplanation, openTime); + return Objects.hash(jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime); } @Override @@ -278,6 +298,7 @@ public boolean equals(Object obj) { return Objects.equals(jobId, other.jobId) && Objects.equals(this.dataCounts, other.dataCounts) && Objects.equals(this.modelSizeStats, other.modelSizeStats) + && Objects.equals(this.forecastStats, other.forecastStats) && Objects.equals(this.state, other.state) && Objects.equals(this.node, other.node) && Objects.equals(this.assignmentExplanation, other.assignmentExplanation) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java new file mode 100644 index 0000000000000..638aa8a2fa6be --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An accumulator for simple counts where statistical measures + * are not of interest. + */ +public class CountAccumulator implements Writeable { + + private Map counts; + + public CountAccumulator() { + this.counts = new HashMap(); + } + + private CountAccumulator(Map counts) { + this.counts = counts; + } + + public CountAccumulator(StreamInput in) throws IOException { + this.counts = in.readMap(StreamInput::readString, StreamInput::readLong); + } + + public void merge(CountAccumulator other) { + counts = Stream.of(counts, other.counts).flatMap(m -> m.entrySet().stream()) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (x, y) -> x + y)); + } + + public void add(String key, Long count) { + counts.put(key, counts.getOrDefault(key, 0L) + count); + } + + public Map asMap() { + return counts; + } + + public static CountAccumulator fromTermsAggregation(StringTerms termsAggregation) { + return new CountAccumulator(termsAggregation.getBuckets().stream() + .collect(Collectors.toMap(bucket -> bucket.getKeyAsString(), bucket -> bucket.getDocCount()))); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(counts, StreamOutput::writeString, StreamOutput::writeLong); + } + + @Override + public int hashCode() { + return Objects.hash(counts); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + CountAccumulator other = (CountAccumulator) obj; + return Objects.equals(counts, other.counts); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java new file mode 100644 index 0000000000000..d490e4b98a44a --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java @@ -0,0 +1,152 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * A class to hold statistics about forecasts. + */ +public class ForecastStats implements ToXContentObject, Writeable { + + public static class Fields { + public static final String TOTAL = "total"; + public static final String FORECASTED_JOBS = "forecasted_jobs"; + public static final String MEMORY = "memory_bytes"; + public static final String RUNTIME = "processing_time_ms"; + public static final String RECORDS = "records"; + public static final String STATUSES = "status"; + } + + private long total; + private long forecastedJobs; + private StatsAccumulator memoryStats; + private StatsAccumulator recordStats; + private StatsAccumulator runtimeStats; + private CountAccumulator statusCounts; + + public ForecastStats() { + this.total = 0; + this.forecastedJobs = 0; + this.memoryStats = new StatsAccumulator(); + this.recordStats = new StatsAccumulator(); + this.runtimeStats = new StatsAccumulator(); + this.statusCounts = new CountAccumulator(); + } + + /* + * Construct ForecastStats for 1 job. Additional statistics can be added by merging other ForecastStats into it. + */ + public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator recordStats, StatsAccumulator runtimeStats, + CountAccumulator statusCounts) { + this.total = total; + this.forecastedJobs = total > 0 ? 1 : 0; + this.memoryStats = Objects.requireNonNull(memoryStats); + this.recordStats = Objects.requireNonNull(recordStats); + this.runtimeStats = Objects.requireNonNull(runtimeStats); + this.statusCounts = Objects.requireNonNull(statusCounts); + } + + public ForecastStats(StreamInput in) throws IOException { + this.total = in.readLong(); + this.forecastedJobs = in.readLong(); + this.memoryStats = new StatsAccumulator(in); + this.recordStats = new StatsAccumulator(in); + this.runtimeStats = new StatsAccumulator(in); + this.statusCounts = new CountAccumulator(in); + } + + public ForecastStats merge(ForecastStats other) { + if (other == null) { + return this; + } + total += other.total; + forecastedJobs += other.forecastedJobs; + memoryStats.merge(other.memoryStats); + recordStats.merge(other.recordStats); + runtimeStats.merge(other.runtimeStats); + statusCounts.merge(other.statusCounts); + + return this; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + doXContentBody(builder, params); + return builder.endObject(); + } + + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.TOTAL, total); + builder.field(Fields.FORECASTED_JOBS, forecastedJobs); + + if (total > 0) { + builder.field(Fields.MEMORY, memoryStats.asMap()); + builder.field(Fields.RECORDS, recordStats.asMap()); + builder.field(Fields.RUNTIME, runtimeStats.asMap()); + builder.field(Fields.STATUSES, statusCounts.asMap()); + } + + return builder; + } + + public Map asMap() { + Map map = new HashMap<>(); + map.put(Fields.TOTAL, total); + map.put(Fields.FORECASTED_JOBS, forecastedJobs); + + if (total > 0) { + map.put(Fields.MEMORY, memoryStats.asMap()); + map.put(Fields.RECORDS, recordStats.asMap()); + map.put(Fields.RUNTIME, runtimeStats.asMap()); + map.put(Fields.STATUSES, statusCounts.asMap()); + } + + return map; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(total); + out.writeLong(forecastedJobs); + memoryStats.writeTo(out); + recordStats.writeTo(out); + runtimeStats.writeTo(out); + statusCounts.writeTo(out); + } + + @Override + public int hashCode() { + return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, statusCounts); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + ForecastStats other = (ForecastStats) obj; + return Objects.equals(total, other.total) && Objects.equals(forecastedJobs, other.forecastedJobs) + && Objects.equals(memoryStats, other.memoryStats) && Objects.equals(recordStats, other.recordStats) + && Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(statusCounts, other.statusCounts); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulator.java new file mode 100644 index 0000000000000..fe987db48ce17 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulator.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Helper class to collect min, max, avg and total statistics for a quantity + */ +public class StatsAccumulator implements Writeable { + + public static class Fields { + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String AVG = "avg"; + public static final String TOTAL = "total"; + } + + private long count; + private double total; + private Double min; + private Double max; + + public StatsAccumulator() { + } + + public StatsAccumulator(StreamInput in) throws IOException { + count = in.readLong(); + total = in.readDouble(); + min = in.readOptionalDouble(); + max = in.readOptionalDouble(); + } + + private StatsAccumulator(long count, double total, double min, double max) { + this.count = count; + this.total = total; + this.min = min; + this.max = max; + } + + public void add(double value) { + count++; + total += value; + min = min == null ? value : (value < min ? value : min); + max = max == null ? value : (value > max ? value : max); + } + + public double getMin() { + return min == null ? 0.0 : min; + } + + public double getMax() { + return max == null ? 0.0 : max; + } + + public double getAvg() { + return count == 0.0 ? 0.0 : total/count; + } + + public double getTotal() { + return total; + } + + public void merge(StatsAccumulator other) { + count += other.count; + total += other.total; + + // note: not using Math.min/max as some internal prefetch optimization causes an NPE + min = min == null ? other.min : (other.min == null ? min : other.min < min ? other.min : min); + max = max == null ? other.max : (other.max == null ? max : other.max > max ? other.max : max); + } + + public Map asMap() { + Map map = new HashMap<>(); + map.put(Fields.MIN, getMin()); + map.put(Fields.MAX, getMax()); + map.put(Fields.AVG, getAvg()); + map.put(Fields.TOTAL, getTotal()); + return map; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(count); + out.writeDouble(total); + out.writeOptionalDouble(min); + out.writeOptionalDouble(max); + } + + public static StatsAccumulator fromStatsAggregation(Stats statsAggregation) { + return new StatsAccumulator(statsAggregation.getCount(), statsAggregation.getSum(), statsAggregation.getMin(), + statsAggregation.getMax()); + } + + @Override + public int hashCode() { + return Objects.hash(count, total, min, max); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + StatsAccumulator other = (StatsAccumulator) obj; + return Objects.equals(count, other.count) && Objects.equals(total, other.total) && Objects.equals(min, other.min) + && Objects.equals(max, other.max); + } +} + diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java index ff979a8570aba..86a5b990728f8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java @@ -17,6 +17,8 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests; import java.net.InetAddress; import java.util.ArrayList; @@ -42,6 +44,12 @@ protected Response createTestInstance() { if (randomBoolean()) { sizeStats = new ModelSizeStats.Builder("foo").build(); } + + ForecastStats forecastStats = null; + if (randomBoolean()) { + forecastStats = new ForecastStatsTests().createTestInstance(); + } + JobState jobState = randomFrom(EnumSet.allOf(JobState.class)); DiscoveryNode node = null; @@ -56,7 +64,8 @@ protected Response createTestInstance() { if (randomBoolean()) { openTime = parseTimeValue(randomPositiveTimeValue(), "open_time-Test"); } - Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobState, node, explanation, openTime); + Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, forecastStats, jobState, node, explanation, + openTime); jobStatsList.add(jobStats); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java new file mode 100644 index 0000000000000..4e18a70a3a0a2 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms.Bucket; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CountAccumulatorTests extends AbstractWireSerializingTestCase { + + public void testEmpty() { + CountAccumulator accumulator = new CountAccumulator(); + assertEquals(Collections.emptyMap(), accumulator.asMap()); + } + + public void testAdd() { + CountAccumulator accumulator = new CountAccumulator(); + accumulator.add("a", 22L); + accumulator.add("a", 10L); + accumulator.add("a", 15L); + accumulator.add("a", -12L); + accumulator.add("a", 0L); + + accumulator.add("b", 13L); + accumulator.add("b", 1L); + accumulator.add("b", 40000L); + accumulator.add("b", -2L); + accumulator.add("b", 333L); + + assertEquals(35L, accumulator.asMap().get("a").longValue()); + assertEquals(40345L, accumulator.asMap().get("b").longValue()); + assertEquals(2, accumulator.asMap().size()); + } + + public void testMerge() { + CountAccumulator accumulator = new CountAccumulator(); + accumulator.add("a", 13L); + accumulator.add("b", 42L); + + CountAccumulator accumulator2 = new CountAccumulator(); + accumulator2.add("a", 12L); + accumulator2.add("c", -1L); + + accumulator.merge(accumulator2); + + assertEquals(25L, accumulator.asMap().get("a").longValue()); + assertEquals(42L, accumulator.asMap().get("b").longValue()); + assertEquals(-1L, accumulator.asMap().get("c").longValue()); + assertEquals(3, accumulator.asMap().size()); + } + + public void testFromTermsAggregation() { + StringTerms termsAggregation = mock(StringTerms.class); + + Bucket bucket1 = mock(Bucket.class); + when(bucket1.getKeyAsString()).thenReturn("a"); + when(bucket1.getDocCount()).thenReturn(10L); + + Bucket bucket2 = mock(Bucket.class); + when(bucket2.getKeyAsString()).thenReturn("b"); + when(bucket2.getDocCount()).thenReturn(33L); + + List buckets = Arrays.asList(bucket1, bucket2); + when(termsAggregation.getBuckets()).thenReturn(buckets); + + CountAccumulator accumulator = CountAccumulator.fromTermsAggregation(termsAggregation); + + assertEquals(10L, accumulator.asMap().get("a").longValue()); + assertEquals(33L, accumulator.asMap().get("b").longValue()); + assertEquals(2, accumulator.asMap().size()); + } + + @Override + public CountAccumulator createTestInstance() { + CountAccumulator accumulator = new CountAccumulator(); + for (int i = 0; i < randomInt(10); ++i) { + accumulator.add(randomAlphaOfLengthBetween(1, 20), randomLongBetween(1L, 100L)); + } + + return accumulator; + } + + @Override + protected Reader instanceReader() { + return CountAccumulator::new; + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java new file mode 100644 index 0000000000000..f7f5d16c5e578 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java @@ -0,0 +1,254 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats.Fields; + +import java.io.IOException; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class ForecastStatsTests extends AbstractWireSerializingTestCase { + + public void testEmpty() throws IOException { + ForecastStats forecastStats = new ForecastStats(); + + XContentBuilder builder = JsonXContent.contentBuilder(); + forecastStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + + XContentParser parser = createParser(builder); + Map properties = parser.map(); + assertTrue(properties.containsKey(Fields.TOTAL)); + assertTrue(properties.containsKey(Fields.FORECASTED_JOBS)); + assertFalse(properties.containsKey(Fields.MEMORY)); + assertFalse(properties.containsKey(Fields.RECORDS)); + assertFalse(properties.containsKey(Fields.RUNTIME)); + assertFalse(properties.containsKey(Fields.STATUSES)); + } + + public void testMerge() { + StatsAccumulator memoryStats = new StatsAccumulator(); + memoryStats.add(1000); + memoryStats.add(45000); + memoryStats.add(2300); + + StatsAccumulator recordStats = new StatsAccumulator(); + recordStats.add(10); + recordStats.add(0); + recordStats.add(20); + + StatsAccumulator runtimeStats = new StatsAccumulator(); + runtimeStats.add(0); + runtimeStats.add(0); + runtimeStats.add(10); + + CountAccumulator statusStats = new CountAccumulator(); + statusStats.add("finished", 2L); + statusStats.add("failed", 5L); + + ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats); + + StatsAccumulator memoryStats2 = new StatsAccumulator(); + memoryStats2.add(10); + memoryStats2.add(30); + + StatsAccumulator recordStats2 = new StatsAccumulator(); + recordStats2.add(10); + recordStats2.add(0); + + StatsAccumulator runtimeStats2 = new StatsAccumulator(); + runtimeStats2.add(96); + runtimeStats2.add(0); + + CountAccumulator statusStats2 = new CountAccumulator(); + statusStats2.add("finished", 2L); + statusStats2.add("scheduled", 1L); + + ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2); + + forecastStats.merge(forecastStats2); + + Map mergedStats = forecastStats.asMap(); + + assertEquals(2L, mergedStats.get(Fields.FORECASTED_JOBS)); + assertEquals(5L, mergedStats.get(Fields.TOTAL)); + + @SuppressWarnings("unchecked") + Map mergedMemoryStats = (Map) mergedStats.get(Fields.MEMORY); + + assertTrue(mergedMemoryStats != null); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.AVG), equalTo(9668.0)); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MAX), equalTo(45000.0)); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MIN), equalTo(10.0)); + + @SuppressWarnings("unchecked") + Map mergedRecordStats = (Map) mergedStats.get(Fields.RECORDS); + + assertTrue(mergedRecordStats != null); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.AVG), equalTo(8.0)); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MAX), equalTo(20.0)); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0)); + + @SuppressWarnings("unchecked") + Map mergedRuntimeStats = (Map) mergedStats.get(Fields.RUNTIME); + + assertTrue(mergedRuntimeStats != null); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.AVG), equalTo(21.2)); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MAX), equalTo(96.0)); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0)); + + @SuppressWarnings("unchecked") + Map mergedCountStats = (Map) mergedStats.get(Fields.STATUSES); + + assertTrue(mergedCountStats != null); + assertEquals(3, mergedCountStats.size()); + assertEquals(4, mergedCountStats.get("finished").longValue()); + assertEquals(5, mergedCountStats.get("failed").longValue()); + assertEquals(1, mergedCountStats.get("scheduled").longValue()); + } + + public void testChainedMerge() { + StatsAccumulator memoryStats = new StatsAccumulator(); + memoryStats.add(1000); + memoryStats.add(45000); + memoryStats.add(2300); + StatsAccumulator recordStats = new StatsAccumulator(); + recordStats.add(10); + recordStats.add(0); + recordStats.add(20); + StatsAccumulator runtimeStats = new StatsAccumulator(); + runtimeStats.add(0); + runtimeStats.add(0); + runtimeStats.add(10); + CountAccumulator statusStats = new CountAccumulator(); + statusStats.add("finished", 2L); + statusStats.add("failed", 5L); + ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats); + + StatsAccumulator memoryStats2 = new StatsAccumulator(); + memoryStats2.add(10); + memoryStats2.add(30); + StatsAccumulator recordStats2 = new StatsAccumulator(); + recordStats2.add(10); + recordStats2.add(0); + StatsAccumulator runtimeStats2 = new StatsAccumulator(); + runtimeStats2.add(96); + runtimeStats2.add(0); + CountAccumulator statusStats2 = new CountAccumulator(); + statusStats2.add("finished", 2L); + statusStats2.add("scheduled", 1L); + ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2); + + StatsAccumulator memoryStats3 = new StatsAccumulator(); + memoryStats3.add(500); + StatsAccumulator recordStats3 = new StatsAccumulator(); + recordStats3.add(50); + StatsAccumulator runtimeStats3 = new StatsAccumulator(); + runtimeStats3.add(32); + CountAccumulator statusStats3 = new CountAccumulator(); + statusStats3.add("finished", 1L); + ForecastStats forecastStats3 = new ForecastStats(1, memoryStats3, recordStats3, runtimeStats3, statusStats3); + + ForecastStats forecastStats4 = new ForecastStats(); + + // merge 4 into 3 + forecastStats3.merge(forecastStats4); + + // merge 3 into 2 + forecastStats2.merge(forecastStats3); + + // merger 2 into 1 + forecastStats.merge(forecastStats2); + + Map mergedStats = forecastStats.asMap(); + + assertEquals(3L, mergedStats.get(Fields.FORECASTED_JOBS)); + assertEquals(6L, mergedStats.get(Fields.TOTAL)); + + @SuppressWarnings("unchecked") + Map mergedMemoryStats = (Map) mergedStats.get(Fields.MEMORY); + + assertTrue(mergedMemoryStats != null); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.AVG), equalTo(8140.0)); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MAX), equalTo(45000.0)); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MIN), equalTo(10.0)); + + @SuppressWarnings("unchecked") + Map mergedRecordStats = (Map) mergedStats.get(Fields.RECORDS); + + assertTrue(mergedRecordStats != null); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.AVG), equalTo(15.0)); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MAX), equalTo(50.0)); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0)); + + @SuppressWarnings("unchecked") + Map mergedRuntimeStats = (Map) mergedStats.get(Fields.RUNTIME); + + assertTrue(mergedRuntimeStats != null); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.AVG), equalTo(23.0)); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MAX), equalTo(96.0)); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0)); + + @SuppressWarnings("unchecked") + Map mergedCountStats = (Map) mergedStats.get(Fields.STATUSES); + + assertTrue(mergedCountStats != null); + assertEquals(3, mergedCountStats.size()); + assertEquals(5, mergedCountStats.get("finished").longValue()); + assertEquals(5, mergedCountStats.get("failed").longValue()); + assertEquals(1, mergedCountStats.get("scheduled").longValue()); + } + + public void testUniqueCountOfJobs() { + ForecastStats forecastStats = createForecastStats(5, 10); + ForecastStats forecastStats2 = createForecastStats(2, 8); + ForecastStats forecastStats3 = createForecastStats(0, 0); + ForecastStats forecastStats4 = createForecastStats(0, 0); + ForecastStats forecastStats5 = createForecastStats(1, 12); + + forecastStats.merge(forecastStats2); + forecastStats.merge(forecastStats3); + forecastStats.merge(forecastStats4); + forecastStats.merge(forecastStats5); + + assertEquals(3L, forecastStats.asMap().get(Fields.FORECASTED_JOBS)); + } + + @Override + public ForecastStats createTestInstance() { + return createForecastStats(1, 22); + } + + @Override + protected Reader instanceReader() { + return ForecastStats::new; + } + + public ForecastStats createForecastStats(long minTotal, long maxTotal) { + ForecastStats forecastStats = new ForecastStats(randomLongBetween(minTotal, maxTotal), createStatsAccumulator(), + createStatsAccumulator(), createStatsAccumulator(), createCountAccumulator()); + + return forecastStats; + } + + private StatsAccumulator createStatsAccumulator() { + return new StatsAccumulatorTests().createTestInstance(); + } + + private CountAccumulator createCountAccumulator() { + return new CountAccumulatorTests().createTestInstance(); + + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java new file mode 100644 index 0000000000000..bd2df0823ae17 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class StatsAccumulatorTests extends AbstractWireSerializingTestCase { + + public void testGivenNoValues() { + StatsAccumulator accumulator = new StatsAccumulator(); + assertThat(accumulator.getMin(), equalTo(0.0)); + assertThat(accumulator.getMax(), equalTo(0.0)); + assertThat(accumulator.getTotal(), equalTo(0.0)); + assertThat(accumulator.getAvg(), equalTo(0.0)); + } + + public void testGivenPositiveValues() { + StatsAccumulator accumulator = new StatsAccumulator(); + + for (int i = 1; i <= 10; i++) { + accumulator.add(i); + } + + assertThat(accumulator.getMin(), equalTo(1.0)); + assertThat(accumulator.getMax(), equalTo(10.0)); + assertThat(accumulator.getTotal(), equalTo(55.0)); + assertThat(accumulator.getAvg(), equalTo(5.5)); + } + + public void testGivenNegativeValues() { + StatsAccumulator accumulator = new StatsAccumulator(); + + for (int i = 1; i <= 10; i++) { + accumulator.add(-1 * i); + } + + assertThat(accumulator.getMin(), equalTo(-10.0)); + assertThat(accumulator.getMax(), equalTo(-1.0)); + assertThat(accumulator.getTotal(), equalTo(-55.0)); + assertThat(accumulator.getAvg(), equalTo(-5.5)); + } + + public void testAsMap() { + StatsAccumulator accumulator = new StatsAccumulator(); + accumulator.add(5.0); + accumulator.add(10.0); + + Map expectedMap = new HashMap<>(); + expectedMap.put("min", 5.0); + expectedMap.put("max", 10.0); + expectedMap.put("avg", 7.5); + expectedMap.put("total", 15.0); + assertThat(accumulator.asMap(), equalTo(expectedMap)); + } + + public void testMerge() { + StatsAccumulator accumulator = new StatsAccumulator(); + accumulator.add(5.0); + accumulator.add(10.0); + + assertThat(accumulator.getMin(), equalTo(5.0)); + assertThat(accumulator.getMax(), equalTo(10.0)); + assertThat(accumulator.getTotal(), equalTo(15.0)); + assertThat(accumulator.getAvg(), equalTo(7.5)); + + StatsAccumulator accumulator2 = new StatsAccumulator(); + accumulator2.add(1.0); + accumulator2.add(3.0); + accumulator2.add(7.0); + + assertThat(accumulator2.getMin(), equalTo(1.0)); + assertThat(accumulator2.getMax(), equalTo(7.0)); + assertThat(accumulator2.getTotal(), equalTo(11.0)); + assertThat(accumulator2.getAvg(), equalTo(11.0 / 3.0)); + + accumulator.merge(accumulator2); + assertThat(accumulator.getMin(), equalTo(1.0)); + assertThat(accumulator.getMax(), equalTo(10.0)); + assertThat(accumulator.getTotal(), equalTo(26.0)); + assertThat(accumulator.getAvg(), equalTo(5.2)); + + // same as accumulator + StatsAccumulator accumulator3 = new StatsAccumulator(); + accumulator3.add(5.0); + accumulator3.add(10.0); + + // merging the other way should yield the same results + accumulator2.merge(accumulator3); + assertThat(accumulator2.getMin(), equalTo(1.0)); + assertThat(accumulator2.getMax(), equalTo(10.0)); + assertThat(accumulator2.getTotal(), equalTo(26.0)); + assertThat(accumulator2.getAvg(), equalTo(5.2)); + } + + public void testMergeMixedEmpty() { + StatsAccumulator accumulator = new StatsAccumulator(); + + StatsAccumulator accumulator2 = new StatsAccumulator(); + accumulator2.add(1.0); + accumulator2.add(3.0); + accumulator.merge(accumulator2); + assertThat(accumulator.getMin(), equalTo(1.0)); + assertThat(accumulator.getMax(), equalTo(3.0)); + assertThat(accumulator.getTotal(), equalTo(4.0)); + + StatsAccumulator accumulator3 = new StatsAccumulator(); + accumulator.merge(accumulator3); + assertThat(accumulator.getMin(), equalTo(1.0)); + assertThat(accumulator.getMax(), equalTo(3.0)); + assertThat(accumulator.getTotal(), equalTo(4.0)); + + StatsAccumulator accumulator4 = new StatsAccumulator(); + accumulator3.merge(accumulator4); + + assertThat(accumulator3.getMin(), equalTo(0.0)); + assertThat(accumulator3.getMax(), equalTo(0.0)); + assertThat(accumulator3.getTotal(), equalTo(0.0)); + } + + public void testFromStatsAggregation() { + Stats stats = mock(Stats.class); + when(stats.getMax()).thenReturn(25.0); + when(stats.getMin()).thenReturn(2.5); + when(stats.getCount()).thenReturn(4L); + when(stats.getSum()).thenReturn(48.0); + when(stats.getAvg()).thenReturn(12.0); + + StatsAccumulator accumulator = StatsAccumulator.fromStatsAggregation(stats); + assertThat(accumulator.getMin(), equalTo(2.5)); + assertThat(accumulator.getMax(), equalTo(25.0)); + assertThat(accumulator.getTotal(), equalTo(48.0)); + assertThat(accumulator.getAvg(), equalTo(12.0)); + } + + @Override + public StatsAccumulator createTestInstance() { + StatsAccumulator accumulator = new StatsAccumulator(); + for (int i = 0; i < randomInt(10); ++i) { + accumulator.add(randomDoubleBetween(0.0, 1000.0, true)); + } + + return accumulator; + } + + @Override + protected Reader instanceReader() { + return StatsAccumulator::new; + } +} \ No newline at end of file diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index 05af1ffee17a4..14ac43fae101c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -33,7 +33,8 @@ import org.elasticsearch.xpack.ml.job.process.NativeController; import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; -import org.elasticsearch.xpack.ml.utils.StatsAccumulator; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator; import java.io.IOException; import java.util.Arrays; @@ -192,10 +193,12 @@ public void execute(ActionListener listener) { private void addJobsUsage(GetJobsStatsAction.Response response) { StatsAccumulator allJobsDetectorsStats = new StatsAccumulator(); StatsAccumulator allJobsModelSizeStats = new StatsAccumulator(); + ForecastStats allJobsForecastStats = new ForecastStats(); Map jobCountByState = new HashMap<>(); Map detectorStatsByState = new HashMap<>(); Map modelSizeStatsByState = new HashMap<>(); + Map forecastStatsByState = new HashMap<>(); Map jobs = mlMetadata.getJobs(); List jobsStats = response.getResponse().results(); @@ -206,6 +209,7 @@ private void addJobsUsage(GetJobsStatsAction.Response response) { double modelSize = modelSizeStats == null ? 0.0 : jobStats.getModelSizeStats().getModelBytes(); + allJobsForecastStats.merge(jobStats.getForecastStats()); allJobsDetectorsStats.add(detectorsCount); allJobsModelSizeStats.add(modelSize); @@ -215,24 +219,28 @@ private void addJobsUsage(GetJobsStatsAction.Response response) { js -> new StatsAccumulator()).add(detectorsCount); modelSizeStatsByState.computeIfAbsent(jobState, js -> new StatsAccumulator()).add(modelSize); + forecastStatsByState.merge(jobState, jobStats.getForecastStats(), (f1, f2) -> f1.merge(f2)); } jobsUsage.put(MachineLearningFeatureSetUsage.ALL, createJobUsageEntry(jobs.size(), allJobsDetectorsStats, - allJobsModelSizeStats)); + allJobsModelSizeStats, allJobsForecastStats)); for (JobState jobState : jobCountByState.keySet()) { jobsUsage.put(jobState.name().toLowerCase(Locale.ROOT), createJobUsageEntry( jobCountByState.get(jobState).get(), detectorStatsByState.get(jobState), - modelSizeStatsByState.get(jobState))); + modelSizeStatsByState.get(jobState), + forecastStatsByState.get(jobState))); } } private Map createJobUsageEntry(long count, StatsAccumulator detectorStats, - StatsAccumulator modelSizeStats) { + StatsAccumulator modelSizeStats, + ForecastStats forecastStats) { Map usage = new HashMap<>(); usage.put(MachineLearningFeatureSetUsage.COUNT, count); usage.put(MachineLearningFeatureSetUsage.DETECTORS, detectorStats.asMap()); usage.put(MachineLearningFeatureSetUsage.MODEL_SIZE, modelSizeStats.asMap()); + usage.put(MachineLearningFeatureSetUsage.FORECASTS, forecastStats.asMap()); return usage; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index 1182953dfc31e..31f918dfc2571 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -106,9 +107,12 @@ protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJo JobState jobState = MlMetadata.getJobState(jobId, tasks); String assignmentExplanation = pTask.getAssignment().getExplanation(); TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(task)); - GetJobsStatsAction.Response.JobStats jobStats = new GetJobsStatsAction.Response.JobStats(jobId, stats.get().v1(), - stats.get().v2(), jobState, node, assignmentExplanation, openTime); - listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD)); + gatherForecastStats(jobId, forecastStats -> { + GetJobsStatsAction.Response.JobStats jobStats = new GetJobsStatsAction.Response.JobStats(jobId, stats.get().v1(), + stats.get().v2(), forecastStats, jobState, node, assignmentExplanation, openTime); + listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD)); + }, listener::onFailure); + } else { listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD)); } @@ -131,25 +135,31 @@ void gatherStatsForClosedJobs(MlMetadata mlMetadata, GetJobsStatsAction.Request for (int i = 0; i < jobIds.size(); i++) { int slot = i; String jobId = jobIds.get(i); - gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> { - JobState jobState = MlMetadata.getJobState(jobId, tasks); - PersistentTasksCustomMetaData.PersistentTask pTask = MlMetadata.getJobTask(jobId, tasks); - String assignmentExplanation = null; - if (pTask != null) { - assignmentExplanation = pTask.getAssignment().getExplanation(); - } - jobStats.set(slot, new GetJobsStatsAction.Response.JobStats(jobId, dataCounts, modelSizeStats, jobState, null, - assignmentExplanation, null)); - if (counter.decrementAndGet() == 0) { - List results = response.getResponse().results(); - results.addAll(jobStats.asList()); - listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(), - new QueryPage<>(results, results.size(), Job.RESULTS_FIELD))); - } + gatherForecastStats(jobId, forecastStats -> { + gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> { + JobState jobState = MlMetadata.getJobState(jobId, tasks); + PersistentTasksCustomMetaData.PersistentTask pTask = MlMetadata.getJobTask(jobId, tasks); + String assignmentExplanation = null; + if (pTask != null) { + assignmentExplanation = pTask.getAssignment().getExplanation(); + } + jobStats.set(slot, new GetJobsStatsAction.Response.JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState, + null, assignmentExplanation, null)); + if (counter.decrementAndGet() == 0) { + List results = response.getResponse().results(); + results.addAll(jobStats.asList()); + listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(), + new QueryPage<>(results, results.size(), Job.RESULTS_FIELD))); + } + }, listener::onFailure); }, listener::onFailure); } } + void gatherForecastStats(String jobId, Consumer handler, Consumer errorHandler) { + jobProvider.getForecastStats(jobId, handler, errorHandler); + } + void gatherDataCountsAndModelSizeStats(String jobId, BiConsumer handler, Consumer errorHandler) { jobProvider.dataCounts(jobId, dataCounts -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 578ddd1efc78a..7513cb5a5bbc0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -63,6 +63,9 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; @@ -93,6 +96,9 @@ import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.core.ml.stats.CountAccumulator; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils; import org.elasticsearch.xpack.core.security.support.Exceptions; @@ -1112,6 +1118,53 @@ public void getForecastRequestStats(String jobId, String forecastId, Consumer handler.accept(result.result), errorHandler, () -> null); } + public void getForecastStats(String jobId, Consumer handler, Consumer errorHandler) { + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + + QueryBuilder termQuery = new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE); + QueryBuilder jobQuery = new TermsQueryBuilder(Job.ID.getPreferredName(), jobId); + QueryBuilder finalQuery = new BoolQueryBuilder().filter(termQuery).filter(jobQuery); + + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(searchRequest.indicesOptions())); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.query(finalQuery); + sourceBuilder.aggregation( + AggregationBuilders.stats(ForecastStats.Fields.MEMORY).field(ForecastRequestStats.MEMORY_USAGE.getPreferredName())); + sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.RECORDS) + .field(ForecastRequestStats.PROCESSED_RECORD_COUNT.getPreferredName())); + sourceBuilder.aggregation( + AggregationBuilders.stats(ForecastStats.Fields.RUNTIME).field(ForecastRequestStats.PROCESSING_TIME_MS.getPreferredName())); + sourceBuilder.aggregation( + AggregationBuilders.terms(ForecastStats.Fields.STATUSES).field(ForecastRequestStats.STATUS.getPreferredName())); + sourceBuilder.size(0); + + searchRequest.source(sourceBuilder); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap(searchResponse -> { + long totalHits = searchResponse.getHits().getTotalHits(); + Aggregations aggregations = searchResponse.getAggregations(); + if (totalHits == 0 || aggregations == null) { + handler.accept(new ForecastStats()); + return; + } + Map aggregationsAsMap = aggregations.asMap(); + StatsAccumulator memoryStats = StatsAccumulator + .fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.MEMORY)); + StatsAccumulator recordStats = StatsAccumulator + .fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.RECORDS)); + StatsAccumulator runtimeStats = StatsAccumulator + .fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.RUNTIME)); + CountAccumulator statusCount = CountAccumulator + .fromTermsAggregation((StringTerms) aggregationsAsMap.get(ForecastStats.Fields.STATUSES)); + + ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats, recordStats, runtimeStats, statusCount); + handler.accept(forecastStats); + }, errorHandler), client::search); + + } + public void updateCalendar(String calendarId, Set jobIdsToAdd, Set jobIdsToRemove, Consumer handler, Consumer errorHandler) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/StatsAccumulator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/StatsAccumulator.java deleted file mode 100644 index 1f1df147d80a1..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/StatsAccumulator.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.utils; - -import java.util.HashMap; -import java.util.Map; - -/** - * Helper class to collect min, max, avg and total statistics for a quantity - */ -public class StatsAccumulator { - - private static final String MIN = "min"; - private static final String MAX = "max"; - private static final String AVG = "avg"; - private static final String TOTAL = "total"; - - private long count; - private double total; - private Double min; - private Double max; - - public void add(double value) { - count++; - total += value; - min = min == null ? value : (value < min ? value : min); - max = max == null ? value : (value > max ? value : max); - } - - public double getMin() { - return min == null ? 0.0 : min; - } - - public double getMax() { - return max == null ? 0.0 : max; - } - - public double getAvg() { - return count == 0.0 ? 0.0 : total/count; - } - - public double getTotal() { - return total; - } - - public Map asMap() { - Map map = new HashMap<>(); - map.put(MIN, getMin()); - map.put(MAX, getMax()); - map.put(AVG, getAvg()); - map.put(TOTAL, getTotal()); - return map; - } -} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index eba2054054c0d..5893a863fe38f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -39,6 +39,8 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.junit.Before; @@ -138,11 +140,11 @@ public void testUsage() throws Exception { settings.put("xpack.ml.enabled", true); Job opened1 = buildJob("opened1", Arrays.asList(buildMinDetector("foo"))); - GetJobsStatsAction.Response.JobStats opened1JobStats = buildJobStats("opened1", JobState.OPENED, 100L); + GetJobsStatsAction.Response.JobStats opened1JobStats = buildJobStats("opened1", JobState.OPENED, 100L, 3L); Job opened2 = buildJob("opened2", Arrays.asList(buildMinDetector("foo"), buildMinDetector("bar"))); - GetJobsStatsAction.Response.JobStats opened2JobStats = buildJobStats("opened2", JobState.OPENED, 200L); + GetJobsStatsAction.Response.JobStats opened2JobStats = buildJobStats("opened2", JobState.OPENED, 200L, 8L); Job closed1 = buildJob("closed1", Arrays.asList(buildMinDetector("foo"), buildMinDetector("bar"), buildMinDetector("foobar"))); - GetJobsStatsAction.Response.JobStats closed1JobStats = buildJobStats("closed1", JobState.CLOSED, 300L); + GetJobsStatsAction.Response.JobStats closed1JobStats = buildJobStats("closed1", JobState.CLOSED, 300L, 0); givenJobs(Arrays.asList(opened1, opened2, closed1), Arrays.asList(opened1JobStats, opened2JobStats, closed1JobStats)); @@ -210,6 +212,15 @@ public void testUsage() throws Exception { assertThat(source.getValue("datafeeds._all.count"), equalTo(3)); assertThat(source.getValue("datafeeds.started.count"), equalTo(2)); assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1)); + + assertThat(source.getValue("jobs._all.forecasts.total"), equalTo(11)); + assertThat(source.getValue("jobs._all.forecasts.forecasted_jobs"), equalTo(2)); + + assertThat(source.getValue("jobs.closed.forecasts.total"), equalTo(0)); + assertThat(source.getValue("jobs.closed.forecasts.forecasted_jobs"), equalTo(0)); + + assertThat(source.getValue("jobs.opened.forecasts.total"), equalTo(11)); + assertThat(source.getValue("jobs.opened.forecasts.forecasted_jobs"), equalTo(2)); } } @@ -301,12 +312,16 @@ private static Job buildJob(String jobId, List detectors) { .build(new Date(randomNonNegativeLong())); } - private static GetJobsStatsAction.Response.JobStats buildJobStats(String jobId, JobState state, long modelBytes) { + private static GetJobsStatsAction.Response.JobStats buildJobStats(String jobId, JobState state, long modelBytes, + long numberOfForecasts) { ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder(jobId); modelSizeStats.setModelBytes(modelBytes); GetJobsStatsAction.Response.JobStats jobStats = mock(GetJobsStatsAction.Response.JobStats.class); + ForecastStats forecastStats = buildForecastStats(numberOfForecasts); + when(jobStats.getJobId()).thenReturn(jobId); when(jobStats.getModelSizeStats()).thenReturn(modelSizeStats.build()); + when(jobStats.getForecastStats()).thenReturn(forecastStats); when(jobStats.getState()).thenReturn(state); return jobStats; } @@ -316,4 +331,8 @@ private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats when(stats.getDatafeedState()).thenReturn(state); return stats; } + + private static ForecastStats buildForecastStats(long numberOfForecasts) { + return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java index 40bc82c6048c7..2e00ad71251db 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java @@ -37,7 +37,7 @@ public void testDetermineJobIds() { result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Collections.singletonList("id1"), Collections.singletonList( - new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.OPENED, null, null, null))); + new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null))); assertEquals(0, result.size()); result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, @@ -49,7 +49,7 @@ public void testDetermineJobIds() { result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), - Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, + Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.CLOSED, null, null, null)) ); assertEquals(2, result.size()); @@ -58,17 +58,16 @@ public void testDetermineJobIds() { result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), Arrays.asList( - new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.OPENED, null, null, null), - new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.OPENED, null, null, null) + new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null), + new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null) )); assertEquals(1, result.size()); assertEquals("id2", result.get(0)); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), - Arrays.asList( - new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.OPENED, null, null, null), - new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobState.OPENED, null, null, null), - new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.OPENED, null, null, null))); + result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), Arrays.asList( + new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null), + new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, null, JobState.OPENED, null, null, null), + new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null))); assertEquals(0, result.size()); // No jobs running, but job 4 is being deleted diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/StatsAccumulatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/StatsAccumulatorTests.java deleted file mode 100644 index ae9b6a7360c13..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/StatsAccumulatorTests.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.utils; - -import org.elasticsearch.test.ESTestCase; - -import java.util.HashMap; -import java.util.Map; - -import static org.hamcrest.Matchers.equalTo; - -public class StatsAccumulatorTests extends ESTestCase { - - public void testGivenNoValues() { - StatsAccumulator accumulator = new StatsAccumulator(); - assertThat(accumulator.getMin(), equalTo(0.0)); - assertThat(accumulator.getMax(), equalTo(0.0)); - assertThat(accumulator.getTotal(), equalTo(0.0)); - assertThat(accumulator.getAvg(), equalTo(0.0)); - } - - public void testGivenPositiveValues() { - StatsAccumulator accumulator = new StatsAccumulator(); - - for (int i = 1; i <= 10; i++) { - accumulator.add(i); - } - - assertThat(accumulator.getMin(), equalTo(1.0)); - assertThat(accumulator.getMax(), equalTo(10.0)); - assertThat(accumulator.getTotal(), equalTo(55.0)); - assertThat(accumulator.getAvg(), equalTo(5.5)); - } - - public void testGivenNegativeValues() { - StatsAccumulator accumulator = new StatsAccumulator(); - - for (int i = 1; i <= 10; i++) { - accumulator.add(-1 * i); - } - - assertThat(accumulator.getMin(), equalTo(-10.0)); - assertThat(accumulator.getMax(), equalTo(-1.0)); - assertThat(accumulator.getTotal(), equalTo(-55.0)); - assertThat(accumulator.getAvg(), equalTo(-5.5)); - } - - public void testAsMap() { - StatsAccumulator accumulator = new StatsAccumulator(); - accumulator.add(5.0); - accumulator.add(10.0); - - Map expectedMap = new HashMap<>(); - expectedMap.put("min", 5.0); - expectedMap.put("max", 10.0); - expectedMap.put("avg", 7.5); - expectedMap.put("total", 15.0); - assertThat(accumulator.asMap(), equalTo(expectedMap)); - } -} \ No newline at end of file diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java index 88f34c4577c1c..9d37073a426cc 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase; @@ -100,7 +101,9 @@ public void testToXContent() throws IOException { .build(); final DataCounts dataCounts = new DataCounts("_job_id", 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, date3, date4, date5, date6, date7); - final JobStats jobStats = new JobStats("_job", dataCounts, modelStats, JobState.OPENED, discoveryNode, "_explanation", time); + final ForecastStats forecastStats = new ForecastStats(); + final JobStats jobStats = new JobStats("_job", dataCounts, modelStats, forecastStats, JobState.OPENED, discoveryNode, + "_explanation", time); final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L); final JobStatsMonitoringDoc document = new JobStatsMonitoringDoc("_cluster", 1502266739402L, 1506593717631L, node, jobStats); @@ -152,6 +155,9 @@ public void testToXContent() throws IOException { + "\"log_time\":1483315322002," + "\"timestamp\":1483228861001" + "}," + + "\"forecasts_stats\":{" + + "\"total\":0,\"forecasted_jobs\":0" + + "}," + "\"state\":\"opened\"," + "\"node\":{" + "\"id\":\"_node_id\","