Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Return statistics about forecasts as part of the jobsstats and usage API #31647

Merged
merged 11 commits into from
Jul 4, 2018
31 changes: 31 additions & 0 deletions x-pack/docs/en/rest-api/ml/jobcounts.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ progress of a job.
(object) An object that provides information about the size and contents of the model.
See <<ml-modelsizestats,model size stats objects>>

`forecasts_stats`::
(object) An object that provides statistical information about forecasts
of this job. See <<ml-forecastsstats, forecasts stats objects>>

`node`::
(object) For open jobs only, contains information about the node where the
job runs. See <<ml-stats-node,node object>>.
Expand Down Expand Up @@ -177,6 +181,33 @@ NOTE: The `over` field values are counted separately for each detector and parti
`timestamp`::
(date) The timestamp of the `model_size_stats` according to the timestamp of the data.

[float]
[[ml-forecastsstats]]
==== Forecasts Stats Objects

The `forecasts_stats` object shows statistics about forecasts. It has the following properties:

`total`::
(long) The number of forecasts currently available for this model.

`forecasted_jobs`::
(long) Number of jobs that have at least one forecast.

`memory_bytes`::
(object) Statistics about the memory usage: min, max, avg and total.

`records`::
(object) Statistics about the number of forecast records: min, max, avg and total.

`processing_time_ms`::
(object) Statistics about the forecast runtime in milliseconds: min, max, avg and total.

`status`::
(object) Counts per forecast status, for example: {"finished" : 2}.

NOTE: `memory_bytes`, `records`, `processing_time_ms`, `status` require at least 1 forecast, otherwise
these fields are ommitted.

[float]
[[ml-stats-node]]
==== Node Objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
public static final String DATAFEEDS_FIELD = "datafeeds";
public static final String COUNT = "count";
public static final String DETECTORS = "detectors";
public static final String FORECASTS = "forecasts";
public static final String MODEL_SIZE = "model_size";

private final Map<String, Object> jobsUsage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;
Expand All @@ -46,6 +47,7 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {

private static final String DATA_COUNTS = "data_counts";
private static final String MODEL_SIZE_STATS = "model_size_stats";
private static final String FORECASTS_STATS = "forecasts_stats";
private static final String STATE = "state";
private static final String NODE = "node";

Expand Down Expand Up @@ -154,18 +156,22 @@ public static class JobStats implements ToXContentObject, Writeable {
@Nullable
private ModelSizeStats modelSizeStats;
@Nullable
private ForecastStats forecastStats;
@Nullable
private TimeValue openTime;
private JobState state;
@Nullable
private DiscoveryNode node;
@Nullable
private String assignmentExplanation;

public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state,
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats,
@Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node,
@Nullable String assignmentExplanation, @Nullable TimeValue opentime) {
this.jobId = Objects.requireNonNull(jobId);
this.dataCounts = Objects.requireNonNull(dataCounts);
this.modelSizeStats = modelSizeStats;
this.forecastStats = forecastStats;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add this to the JobStats(StreamInput), writeTo(StreamOutput) with BWC checks and hashCode & equals

this.state = Objects.requireNonNull(state);
this.node = node;
this.assignmentExplanation = assignmentExplanation;
Expand All @@ -180,6 +186,9 @@ public JobStats(StreamInput in) throws IOException {
node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString();
openTime = in.readOptionalTimeValue();
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
forecastStats = in.readOptionalWriteable(ForecastStats::new);
}
}

public String getJobId() {
Expand All @@ -193,6 +202,10 @@ public DataCounts getDataCounts() {
public ModelSizeStats getModelSizeStats() {
return modelSizeStats;
}

public ForecastStats getForecastStats() {
return forecastStats;
}

public JobState getState() {
return state;
Expand Down Expand Up @@ -226,6 +239,10 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc
if (modelSizeStats != null) {
builder.field(MODEL_SIZE_STATS, modelSizeStats);
}
if (forecastStats != null) {
builder.field(FORECASTS_STATS, forecastStats);
}

builder.field(STATE, state.toString());
if (node != null) {
builder.startObject(NODE);
Expand Down Expand Up @@ -259,11 +276,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation);
out.writeOptionalTimeValue(openTime);
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
out.writeOptionalWriteable(forecastStats);
}
}

@Override
public int hashCode() {
return Objects.hash(jobId, dataCounts, modelSizeStats, state, node, assignmentExplanation, openTime);
return Objects.hash(jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime);
}

@Override
Expand All @@ -278,6 +298,7 @@ public boolean equals(Object obj) {
return Objects.equals(jobId, other.jobId)
&& Objects.equals(this.dataCounts, other.dataCounts)
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
&& Objects.equals(this.forecastStats, other.forecastStats)
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.ml.stats;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* An accumulator for simple counts where statistical measures
* are not of interest.
*/
public class CountAccumulator implements Writeable {

private Map<String, Long> counts;

public CountAccumulator() {
this.counts = new HashMap<String, Long>();
}

private CountAccumulator(Map<String, Long> counts) {
this.counts = counts;
}

public CountAccumulator(StreamInput in) throws IOException {
this.counts = in.readMap(StreamInput::readString, StreamInput::readLong);
}

public void merge(CountAccumulator other) {
counts = Stream.of(counts, other.counts).flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (x, y) -> x + y));
}

public void add(String key, Long count) {
counts.put(key, counts.getOrDefault(key, 0L) + count);
}

public Map<String, Long> asMap() {
return counts;
}

public static CountAccumulator fromTermsAggregation(StringTerms termsAggregation) {
return new CountAccumulator(termsAggregation.getBuckets().stream()
.collect(Collectors.toMap(bucket -> bucket.getKeyAsString(), bucket -> bucket.getDocCount())));
}

public void writeTo(StreamOutput out) throws IOException {
out.writeMap(counts, StreamOutput::writeString, StreamOutput::writeLong);
}

@Override
public int hashCode() {
return Objects.hash(counts);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}

if (getClass() != obj.getClass()) {
return false;
}

CountAccumulator other = (CountAccumulator) obj;
return Objects.equals(counts, other.counts);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.ml.stats;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* A class to hold statistics about forecasts.
*/
public class ForecastStats implements ToXContentObject, Writeable {

public static class Fields {
public static final String TOTAL = "total";
public static final String FORECASTED_JOBS = "forecasted_jobs";
public static final String MEMORY = "memory_bytes";
public static final String RUNTIME = "processing_time_ms";
public static final String RECORDS = "records";
public static final String STATUSES = "status";
}

private long total;
private long forecastedJobs;
private StatsAccumulator memoryStats;
private StatsAccumulator recordStats;
private StatsAccumulator runtimeStats;
private CountAccumulator statusCounts;

public ForecastStats() {
this.total = 0;
this.forecastedJobs = 0;
this.memoryStats = new StatsAccumulator();
this.recordStats = new StatsAccumulator();
this.runtimeStats = new StatsAccumulator();
this.statusCounts = new CountAccumulator();
}

/*
* Construct ForecastStats for 1 job. Additional statistics can be added by merging other ForecastStats into it.
*/
public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator recordStats, StatsAccumulator runtimeStats,
CountAccumulator statusCounts) {
this.total = total;
this.forecastedJobs = total > 0 ? 1 : 0;
this.memoryStats = Objects.requireNonNull(memoryStats);
this.recordStats = Objects.requireNonNull(recordStats);
this.runtimeStats = Objects.requireNonNull(runtimeStats);
this.statusCounts = Objects.requireNonNull(statusCounts);
}

public ForecastStats(StreamInput in) throws IOException {
this.total = in.readLong();
this.forecastedJobs = in.readLong();
this.memoryStats = new StatsAccumulator(in);
this.recordStats = new StatsAccumulator(in);
this.runtimeStats = new StatsAccumulator(in);
this.statusCounts = new CountAccumulator(in);
}

public ForecastStats merge(ForecastStats other) {
if (other == null) {
return this;
}
total += other.total;
forecastedJobs += other.forecastedJobs;
memoryStats.merge(other.memoryStats);
recordStats.merge(other.recordStats);
runtimeStats.merge(other.runtimeStats);
statusCounts.merge(other.statusCounts);

return this;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
doXContentBody(builder, params);
return builder.endObject();
}

public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.TOTAL, total);
builder.field(Fields.FORECASTED_JOBS, forecastedJobs);

if (total > 0) {
builder.field(Fields.MEMORY, memoryStats.asMap());
builder.field(Fields.RECORDS, recordStats.asMap());
builder.field(Fields.RUNTIME, runtimeStats.asMap());
builder.field(Fields.STATUSES, statusCounts.asMap());
}

return builder;
}

public Map<String, Object> asMap() {
Map<String, Object> map = new HashMap<>();
map.put(Fields.TOTAL, total);
map.put(Fields.FORECASTED_JOBS, forecastedJobs);

if (total > 0) {
map.put(Fields.MEMORY, memoryStats.asMap());
map.put(Fields.RECORDS, recordStats.asMap());
map.put(Fields.RUNTIME, runtimeStats.asMap());
map.put(Fields.STATUSES, statusCounts.asMap());
}

return map;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(total);
out.writeLong(forecastedJobs);
memoryStats.writeTo(out);
recordStats.writeTo(out);
runtimeStats.writeTo(out);
statusCounts.writeTo(out);
}

@Override
public int hashCode() {
return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, statusCounts);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}

if (getClass() != obj.getClass()) {
return false;
}

ForecastStats other = (ForecastStats) obj;
return Objects.equals(total, other.total) && Objects.equals(forecastedJobs, other.forecastedJobs)
&& Objects.equals(memoryStats, other.memoryStats) && Objects.equals(recordStats, other.recordStats)
&& Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(statusCounts, other.statusCounts);
}
}
Loading