Skip to content

Commit 438a58e

Browse files
authored
Add cache and runtime metadata to Process workunits (#12469)
To improve our ability to debug cache hits/misses, and to determine what the upper bound is on "time saved", this change: * Adds a `source` field to `ProcessResultMetadata` and to `Process` workunits. * Records the full serialized definition of the `Process` on the workunit. * Adds metrics for the CPU time of actually running a `Process` (locally or remotely).
1 parent 5754fcf commit 438a58e

File tree

15 files changed

+162
-68
lines changed

15 files changed

+162
-68
lines changed

src/rust/engine/Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/rust/engine/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ sharded_lmdb = { path = "sharded_lmdb" }
134134
smallvec = "0.6"
135135
stdio = { path = "stdio" }
136136
store = { path = "fs/store" }
137+
serde_json = "1.0"
137138
task_executor = { path = "task_executor" }
138139
tempfile = "3"
139140
testutil_mock = { package = "mock", path = "testutil/mock" }

src/rust/engine/fs/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ lazy_static = "1"
1616
log = "0.4"
1717
parking_lot = "0.11"
1818
rlimit = "0.3"
19+
serde = "1.0.104"
1920
task_executor = { path = "../task_executor" }
2021

2122
[dev-dependencies]

src/rust/engine/fs/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use async_trait::async_trait;
5151
use bytes::Bytes;
5252
use futures::future::{self, TryFutureExt};
5353
use lazy_static::lazy_static;
54+
use serde::Serialize;
5455

5556
lazy_static! {
5657
static ref EMPTY_IGNORE: Arc<GitignoreStyleExcludes> = Arc::new(GitignoreStyleExcludes {
@@ -74,7 +75,7 @@ pub fn default_cache_path() -> PathBuf {
7475
cache_path.join("pants")
7576
}
7677

77-
#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
78+
#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash, Serialize)]
7879
pub struct RelativePath(PathBuf);
7980

8081
impl RelativePath {

src/rust/engine/process_execution/src/cache.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use workunit_store::{
1717

1818
use crate::{
1919
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
20-
ProcessCacheScope, ProcessMetadata,
20+
ProcessCacheScope, ProcessMetadata, ProcessResultSource,
2121
};
2222

2323
#[allow(dead_code)]
@@ -182,6 +182,7 @@ impl CommandRunner {
182182
action_result,
183183
platform,
184184
true,
185+
ProcessResultSource::HitLocally,
185186
)
186187
.await?
187188
} else {

src/rust/engine/process_execution/src/lib.rs

+42-20
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl TryFrom<String> for Platform {
142142
}
143143
}
144144

145-
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
145+
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize)]
146146
pub enum ProcessCacheScope {
147147
// Cached in all locations, regardless of success or failure.
148148
Always,
@@ -173,10 +173,14 @@ impl TryFrom<String> for ProcessCacheScope {
173173
}
174174
}
175175

176+
fn serialize_level<S: serde::Serializer>(level: &log::Level, s: S) -> Result<S::Ok, S::Error> {
177+
s.serialize_str(&level.to_string())
178+
}
179+
176180
///
177181
/// A process to be executed.
178182
///
179-
#[derive(Derivative, Clone, Debug, Eq)]
183+
#[derive(Derivative, Clone, Debug, Eq, Serialize)]
180184
#[derivative(PartialEq, Hash)]
181185
pub struct Process {
182186
///
@@ -216,6 +220,8 @@ pub struct Process {
216220
#[derivative(PartialEq = "ignore", Hash = "ignore")]
217221
pub description: String,
218222

223+
// NB: We serialize with a function to avoid adding a serde dep to the logging crate.
224+
#[serde(serialize_with = "serialize_level")]
219225
pub level: log::Level,
220226

221227
///
@@ -391,17 +397,40 @@ pub struct FallibleProcessResultWithPlatform {
391397

392398
/// Metadata for a ProcessResult corresponding to the REAPI `ExecutedActionMetadata` proto. This
393399
/// conversion is lossy, but the interesting parts are preserved.
394-
#[derive(Clone, Debug, Default, Eq, PartialEq)]
400+
#[derive(Clone, Debug, Eq, PartialEq)]
395401
pub struct ProcessResultMetadata {
396402
/// The time from starting to completion, including preparing the chroot and cleanup.
397403
/// Corresponds to `worker_start_timestamp` and `worker_completed_timestamp` from
398404
/// `ExecutedActionMetadata`.
405+
///
406+
/// NB: This is optional because the REAPI does not guarantee that it is returned.
399407
pub total_elapsed: Option<Duration>,
408+
/// The source of the result.
409+
pub source: ProcessResultSource,
400410
}
401411

402412
impl ProcessResultMetadata {
403-
pub fn new(total_elapsed: Option<Duration>) -> Self {
404-
ProcessResultMetadata { total_elapsed }
413+
pub fn new(total_elapsed: Option<Duration>, source: ProcessResultSource) -> Self {
414+
ProcessResultMetadata {
415+
total_elapsed,
416+
source,
417+
}
418+
}
419+
420+
pub fn new_from_metadata(metadata: ExecutedActionMetadata, source: ProcessResultSource) -> Self {
421+
let total_elapsed = match (
422+
metadata.worker_start_timestamp,
423+
metadata.worker_completed_timestamp,
424+
) {
425+
(Some(started), Some(completed)) => TimeSpan::from_start_and_end(&started, &completed, "")
426+
.map(|span| span.duration)
427+
.ok(),
428+
_ => None,
429+
};
430+
Self {
431+
total_elapsed,
432+
source,
433+
}
405434
}
406435

407436
/// How much faster a cache hit was than running the process again.
@@ -429,21 +458,6 @@ impl ProcessResultMetadata {
429458
}
430459
}
431460

432-
impl From<ExecutedActionMetadata> for ProcessResultMetadata {
433-
fn from(metadata: ExecutedActionMetadata) -> Self {
434-
let total_elapsed = match (
435-
metadata.worker_start_timestamp,
436-
metadata.worker_completed_timestamp,
437-
) {
438-
(Some(started), Some(completed)) => TimeSpan::from_start_and_end(&started, &completed, "")
439-
.map(|span| span.duration)
440-
.ok(),
441-
_ => None,
442-
};
443-
Self { total_elapsed }
444-
}
445-
}
446-
447461
impl From<ProcessResultMetadata> for ExecutedActionMetadata {
448462
fn from(metadata: ProcessResultMetadata) -> ExecutedActionMetadata {
449463
let (total_start, total_end) = match metadata.total_elapsed {
@@ -470,6 +484,14 @@ impl From<ProcessResultMetadata> for ExecutedActionMetadata {
470484
}
471485
}
472486

487+
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
488+
pub enum ProcessResultSource {
489+
RanLocally,
490+
RanRemotely,
491+
HitLocally,
492+
HitRemotely,
493+
}
494+
473495
#[derive(Clone)]
474496
pub struct Context {
475497
workunit_store: WorkunitStore,

src/rust/engine/process_execution/src/local.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use workunit_store::{in_workunit, Metric, RunningWorkunit, WorkunitMetadata};
3333

3434
use crate::{
3535
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, Process,
36-
ProcessResultMetadata,
36+
ProcessResultMetadata, ProcessResultSource,
3737
};
3838

3939
pub const USER_EXECUTABLE_MODE: u32 = 0o100755;
@@ -590,7 +590,8 @@ pub trait CapturedWorkdir {
590590
}
591591

592592
let elapsed = start_time.elapsed();
593-
let result_metadata = ProcessResultMetadata::new(Some(elapsed.into()));
593+
let result_metadata =
594+
ProcessResultMetadata::new(Some(elapsed.into()), ProcessResultSource::RanLocally);
594595

595596
match child_results_result {
596597
Ok(child_results) => {

src/rust/engine/process_execution/src/named_caches.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::collections::BTreeMap;
22
use std::path::PathBuf;
33

4-
use fs::default_cache_path;
4+
use serde::Serialize;
55

66
use crate::RelativePath;
7+
use fs::default_cache_path;
78

8-
#[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
9+
#[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord, Serialize)]
910
pub struct CacheName(String);
1011

1112
impl CacheName {
@@ -24,7 +25,7 @@ impl CacheName {
2425
}
2526
}
2627

27-
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
28+
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize)]
2829
pub struct CacheDest(String);
2930

3031
impl CacheDest {

src/rust/engine/process_execution/src/remote.rs

+19-6
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use workunit_store::{
4343

4444
use crate::{
4545
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
46-
ProcessCacheScope, ProcessMetadata, ProcessResultMetadata,
46+
ProcessCacheScope, ProcessMetadata, ProcessResultMetadata, ProcessResultSource,
4747
};
4848

4949
// Environment variable which is exclusively used for cache key invalidation.
@@ -491,6 +491,11 @@ impl CommandRunner {
491491
action_result,
492492
self.platform,
493493
false,
494+
if execute_response.cached_result {
495+
ProcessResultSource::HitRemotely
496+
} else {
497+
ProcessResultSource::RanRemotely
498+
},
494499
)
495500
.await
496501
.map_err(ExecutionError::Fatal);
@@ -1065,7 +1070,7 @@ pub async fn populate_fallible_execution_result_for_timeout(
10651070
exit_code: -libc::SIGTERM,
10661071
output_directory: hashing::EMPTY_DIGEST,
10671072
platform,
1068-
metadata: ProcessResultMetadata::default(),
1073+
metadata: ProcessResultMetadata::new(Some(elapsed.into()), ProcessResultSource::RanRemotely),
10691074
})
10701075
}
10711076

@@ -1081,6 +1086,7 @@ pub fn populate_fallible_execution_result(
10811086
action_result: &remexec::ActionResult,
10821087
platform: Platform,
10831088
treat_tree_digest_as_final_directory_hack: bool,
1089+
source: ProcessResultSource,
10841090
) -> BoxFuture<Result<FallibleProcessResultWithPlatform, String>> {
10851091
future::try_join3(
10861092
extract_stdout(&store, action_result),
@@ -1102,7 +1108,9 @@ pub fn populate_fallible_execution_result(
11021108
metadata: action_result
11031109
.execution_metadata
11041110
.clone()
1105-
.map_or(ProcessResultMetadata::default(), |metadata| metadata.into()),
1111+
.map_or(ProcessResultMetadata::new(None, source), |metadata| {
1112+
ProcessResultMetadata::new_from_metadata(metadata, source)
1113+
}),
11061114
})
11071115
},
11081116
)
@@ -1372,9 +1380,14 @@ pub async fn check_action_cache(
13721380
match action_result_response {
13731381
Ok(action_result) => {
13741382
let action_result = action_result.into_inner();
1375-
let response =
1376-
populate_fallible_execution_result(store.clone(), &action_result, platform, false)
1377-
.await?;
1383+
let response = populate_fallible_execution_result(
1384+
store.clone(),
1385+
&action_result,
1386+
platform,
1387+
false,
1388+
ProcessResultSource::HitRemotely,
1389+
)
1390+
.await?;
13781391
// TODO: This should move inside the retry_call above, both in order to be retried, and
13791392
// to ensure that we increment a miss if we fail to eagerly fetch.
13801393
if eager_fetch {

src/rust/engine/process_execution/src/remote_cache_tests.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::remote::{ensure_action_stored_locally, make_execute_request};
2121
use crate::{
2222
CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform,
2323
MultiPlatformProcess, Platform, Process, ProcessMetadata, ProcessResultMetadata,
24-
RemoteCacheWarningsBehavior,
24+
ProcessResultSource, RemoteCacheWarningsBehavior,
2525
};
2626

2727
/// A mock of the local runner used for better hermeticity of the tests.
@@ -45,7 +45,7 @@ impl MockLocalCommandRunner {
4545
exit_code,
4646
output_directory: EMPTY_DIGEST,
4747
platform: Platform::current().unwrap(),
48-
metadata: ProcessResultMetadata::default(),
48+
metadata: ProcessResultMetadata::new(None, ProcessResultSource::RanLocally),
4949
}),
5050
call_counter,
5151
delay: Duration::from_millis(delay_ms),
@@ -612,7 +612,7 @@ async fn make_action_result_basic() {
612612
output_directory: directory_digest,
613613
exit_code: 102,
614614
platform: Platform::Linux_x86_64,
615-
metadata: ProcessResultMetadata::default(),
615+
metadata: ProcessResultMetadata::new(None, ProcessResultSource::RanLocally),
616616
};
617617

618618
let (action_result, digests) = runner

src/rust/engine/process_execution/src/tests.rs

+24-9
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::collections::hash_map::DefaultHasher;
55
use std::hash::{Hash, Hasher};
66
use std::time::Duration;
77

8-
use crate::{Process, ProcessResultMetadata};
8+
use crate::{Process, ProcessResultMetadata, ProcessResultSource};
99
use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec;
1010
use prost_types::Timestamp;
1111
use remexec::ExecutedActionMetadata;
@@ -59,10 +59,14 @@ fn process_result_metadata_to_and_from_executed_action_metadata() {
5959
..ExecutedActionMetadata::default()
6060
};
6161

62-
let converted_process_result: ProcessResultMetadata = action_metadata.into();
62+
let converted_process_result: ProcessResultMetadata =
63+
ProcessResultMetadata::new_from_metadata(action_metadata, ProcessResultSource::RanLocally);
6364
assert_eq!(
6465
converted_process_result,
65-
ProcessResultMetadata::new(Some(concrete_time::Duration::new(20, 30)))
66+
ProcessResultMetadata::new(
67+
Some(concrete_time::Duration::new(20, 30)),
68+
ProcessResultSource::RanLocally
69+
)
6670
);
6771

6872
// The conversion from `ExecutedActionMetadata` to `ProcessResultMetadata` is lossy.
@@ -83,26 +87,37 @@ fn process_result_metadata_to_and_from_executed_action_metadata() {
8387
);
8488

8589
// The relevant metadata may be missing from either type.
86-
let action_metadata_missing: ProcessResultMetadata = ExecutedActionMetadata::default().into();
87-
assert_eq!(action_metadata_missing, ProcessResultMetadata::default());
88-
let process_result_missing: ExecutedActionMetadata = ProcessResultMetadata::default().into();
90+
let empty = ProcessResultMetadata::new(None, ProcessResultSource::RanLocally);
91+
let action_metadata_missing: ProcessResultMetadata = ProcessResultMetadata::new_from_metadata(
92+
ExecutedActionMetadata::default(),
93+
ProcessResultSource::RanLocally,
94+
);
95+
assert_eq!(action_metadata_missing, empty);
96+
let process_result_missing: ExecutedActionMetadata = empty.into();
8997
assert_eq!(process_result_missing, ExecutedActionMetadata::default());
9098
}
9199

92100
#[test]
93101
fn process_result_metadata_time_saved_from_cache() {
94-
let metadata = ProcessResultMetadata::new(Some(concrete_time::Duration::new(5, 150)));
102+
let metadata = ProcessResultMetadata::new(
103+
Some(concrete_time::Duration::new(5, 150)),
104+
ProcessResultSource::RanLocally,
105+
);
95106
let time_saved = metadata.time_saved_from_cache(Duration::new(1, 100));
96107
assert_eq!(time_saved, Some(Duration::new(4, 50)));
97108

98109
// If the cache lookup took more time than the process, we return 0.
99-
let metadata = ProcessResultMetadata::new(Some(concrete_time::Duration::new(1, 0)));
110+
let metadata = ProcessResultMetadata::new(
111+
Some(concrete_time::Duration::new(1, 0)),
112+
ProcessResultSource::RanLocally,
113+
);
100114
let time_saved = metadata.time_saved_from_cache(Duration::new(5, 0));
101115
assert_eq!(time_saved, Some(Duration::new(0, 0)));
102116

103117
// If the original process time wasn't recorded, we can't compute the time saved.
104118
assert_eq!(
105-
ProcessResultMetadata::default().time_saved_from_cache(Duration::new(1, 100)),
119+
ProcessResultMetadata::new(None, ProcessResultSource::RanLocally)
120+
.time_saved_from_cache(Duration::new(1, 100)),
106121
None
107122
);
108123
}

0 commit comments

Comments
 (0)