Skip to content

Commit

Permalink
Make certain ML node settings dynamic (#33565) (#33961)
Browse files Browse the repository at this point in the history
* Make certain ML node settings dynamic (#33565)

* Changing to pull in updating settings and pass to constructor

* adding note about only newly opened jobs getting updated value
  • Loading branch information
benwtrent authored and kcm committed Oct 30, 2018
1 parent 7320b92 commit 3443a2a
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,11 @@ public List<Setting<?>> getSettings() {
MAX_MACHINE_MEMORY_PERCENT,
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING,
DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP));
Expand Down Expand Up @@ -379,7 +382,12 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
// This will only only happen when path.home is not set, which is disallowed in production
throw new ElasticsearchException("Failed to create native process controller for Machine Learning");
}
autodetectProcessFactory = new NativeAutodetectProcessFactory(environment, settings, nativeController, client);
autodetectProcessFactory = new NativeAutodetectProcessFactory(
environment,
settings,
nativeController,
client,
clusterService);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController);
} catch (IOException e) {
// This also should not happen in production, as the MachineLearningFeatureSet should have
Expand All @@ -397,7 +405,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool,
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor);
normalizerFactory, xContentRegistry, auditor, clusterService);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -42,15 +43,28 @@ public class DataCountsReporter extends AbstractComponent {
* The max percentage of date parse errors allowed before
* an exception is thrown.
*/
@Deprecated
public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING = Setting.intSetting("max.percent.date.errors", 25,
Property.NodeScope);

Property.NodeScope, Property.Deprecated);
public static final Setting<Integer> MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING = Setting.intSetting(
"xpack.ml.max_percent_date_errors",
ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
0,
Property.Dynamic,
Property.NodeScope);
/**
* The max percentage of out of order records allowed before
* an exception is thrown.
*/
@Deprecated
public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING = Setting
.intSetting("max.percent.outoforder.errors", 25, Property.NodeScope);
.intSetting("max.percent.outoforder.errors", 25, Property.NodeScope, Property.Deprecated);
public static final Setting<Integer> MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING = Setting.intSetting(
"xpack.ml.max_percent_out_of_order_errors",
ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
0,
Property.Dynamic,
Property.NodeScope);

private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L);

Expand All @@ -66,14 +80,15 @@ public class DataCountsReporter extends AbstractComponent {
private long logEvery = 1;
private long logCount = 0;

private final int acceptablePercentDateParseErrors;
private final int acceptablePercentOutOfOrderErrors;
private volatile int acceptablePercentDateParseErrors;
private volatile int acceptablePercentOutOfOrderErrors;

private Function<Long, Boolean> reportingBoundaryFunction;

private DataStreamDiagnostics diagnostics;

public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister) {
public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister,
ClusterService clusterService) {

super(settings);

Expand All @@ -84,9 +99,12 @@ public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobData
incrementalRecordStats = new DataCounts(job.getId());
diagnostics = new DataStreamDiagnostics(job, counts);

acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);

acceptablePercentDateParseErrors = MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING, this::setAcceptablePercentDateParseErrors);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING, this::setAcceptablePercentOutOfOrderErrors);
reportingBoundaryFunction = this::reportEvery10000Records;
}

Expand Down Expand Up @@ -352,4 +370,17 @@ private void retrieveDiagnosticsIntermediateResults() {

diagnostics.resetCounts();
}

private void setAcceptablePercentDateParseErrors(int acceptablePercentDateParseErrors) {
logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(),
this.acceptablePercentDateParseErrors, acceptablePercentDateParseErrors);
this.acceptablePercentDateParseErrors = acceptablePercentDateParseErrors;
}

private void setAcceptablePercentOutOfOrderErrors(int acceptablePercentOutOfOrderErrors) {
logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(),
this.acceptablePercentOutOfOrderErrors, acceptablePercentOutOfOrderErrors);
this.acceptablePercentOutOfOrderErrors = acceptablePercentOutOfOrderErrors;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,16 @@ public class AutodetectBuilder {
/**
* The maximum number of anomaly records that will be written each bucket
*/
@Deprecated
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING = Setting.intSetting("max.anomaly.records", DEFAULT_MAX_NUM_RECORDS,
Setting.Property.NodeScope);
Setting.Property.NodeScope, Setting.Property.Deprecated);
// Though this setting is dynamic, it is only set when a new job is opened. So, already runnin jobs will not get the updated value.
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting(
"xpack.ml.max_anomaly_records",
MAX_ANOMALY_RECORDS_SETTING,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/**
* Config setting storing the flag that disables model persistence
Expand Down Expand Up @@ -244,9 +252,8 @@ List<String> buildAutodetectCommand() {
return command;
}


static String maxAnomalyRecordsArg(Settings settings) {
return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING.get(settings);
return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings);
}

private static String getTimeFieldOrDefault(Job job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
Expand Down Expand Up @@ -130,12 +131,13 @@ public class AutodetectProcessManager extends AbstractComponent {
private final NamedXContentRegistry xContentRegistry;

private final Auditor auditor;
private final ClusterService clusterService;

public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool,
JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister,
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
NamedXContentRegistry xContentRegistry, Auditor auditor) {
NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService) {
super(settings);
this.environment = environment;
this.client = client;
Expand All @@ -150,6 +152,7 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie
this.jobDataCountsPersister = jobDataCountsPersister;
this.auditor = auditor;
this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
this.clusterService = clusterService;
}

public void onNodeStartup() {
Expand Down Expand Up @@ -493,8 +496,11 @@ AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
// A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(),
jobDataCountsPersister);
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings,
job,
autodetectParams.dataCounts(),
jobDataCountsPersister,
clusterService);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider,
new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
Expand Down Expand Up @@ -40,12 +41,15 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
private final Environment env;
private final Settings settings;
private final NativeController nativeController;
private final ClusterService clusterService;

public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client) {
public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
ClusterService clusterService) {
this.env = Objects.requireNonNull(env);
this.settings = Objects.requireNonNull(settings);
this.nativeController = Objects.requireNonNull(nativeController);
this.client = client;
this.clusterService = clusterService;
}

@Override
Expand Down Expand Up @@ -85,8 +89,15 @@ public AutodetectProcess createAutodetectProcess(Job job,
private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
List<Path> filesToDelete) {
try {

Settings updatedSettings = Settings.builder()
.put(settings)
.put(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.getKey(),
clusterService.getClusterSettings().get(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC))
.build();

AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env,
settings, nativeController, processPipes)
updatedSettings, nativeController, processPipes)
.referencedFilters(autodetectParams.filters())
.scheduledEvents(autodetectParams.scheduledEvents());

Expand All @@ -95,7 +106,6 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro
if (autodetectParams.quantiles() != null) {
autodetectBuilder.quantiles(autodetectParams.quantiles());
}

autodetectBuilder.build();
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
} catch (IOException e) {
Expand All @@ -104,5 +114,6 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro
throw ExceptionsHelper.serverError(msg, e);
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,44 @@
*/
package org.elasticsearch.xpack.ml.job.process;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;

import static org.elasticsearch.mock.orig.Mockito.when;
import static org.mockito.Mockito.mock;

public class CountingInputStreamTests extends ESTestCase {

private ClusterService clusterService;

@Before
public void setUpMocks() {
Settings settings = Settings.builder().put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), 10)
.put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), 10)
.build();
Set<Setting<?>> setOfSettings = new HashSet<>();
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING);
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING);
ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings);

clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
}

public void testRead_OneByteAtATime() throws IOException {

DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);

final String TEXT = "123";
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
Expand All @@ -30,7 +56,7 @@ public void testRead_OneByteAtATime() throws IOException {
public void testRead_WithBuffer() throws IOException {
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";

DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);

InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));

Expand All @@ -44,7 +70,7 @@ public void testRead_WithBuffer() throws IOException {
public void testRead_WithTinyBuffer() throws IOException {
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";

DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);

InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));

Expand All @@ -57,7 +83,7 @@ public void testRead_WithTinyBuffer() throws IOException {

public void testRead_WithResets() throws IOException {

DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);

final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
Expand Down
Loading

0 comments on commit 3443a2a

Please sign in to comment.