Skip to content

Commit

Permalink
Ephemeral server fixes (#878)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Feb 18, 2025
1 parent 865cd49 commit 6e60111
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
72 changes: 64 additions & 8 deletions core/src/ephemeral_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ pub enum EphemeralExe {
version: EphemeralExeVersion,
/// Destination directory or the user temp directory if none set.
dest_dir: Option<String>,
/// How long to cache the download for. None means forever.
ttl: Option<Duration>,
},
}

Expand Down Expand Up @@ -309,7 +311,11 @@ impl EphemeralExe {
}
Ok(path)
}
EphemeralExe::CachedDownload { version, dest_dir } => {
EphemeralExe::CachedDownload {
version,
dest_dir,
ttl,
} => {
let dest_dir = dest_dir
.as_ref()
.map(PathBuf::from)
Expand All @@ -334,8 +340,7 @@ impl EphemeralExe {
dest.display()
);

// If it already exists, skip
if dest.exists() {
if dest.exists() && remove_file_past_ttl(ttl, &dest)? {
return Ok(dest);
}

Expand Down Expand Up @@ -379,6 +384,7 @@ impl EphemeralExe {
&info.archive_url,
Path::new(&info.file_to_extract),
&dest,
false,
)
.await?
{
Expand Down Expand Up @@ -433,7 +439,7 @@ fn get_free_port(bind_ip: &str) -> io::Result<u16> {
let (socket, _addr) = listen.accept()?;

// Explicitly drop the socket to close the connection from the listening side first
std::mem::drop(socket);
drop(socket);
}

Ok(addr.port())
Expand All @@ -447,6 +453,7 @@ async fn lazy_download_exe(
uri: &str,
file_to_extract: &Path,
dest: &Path,
already_tried_cleaning_old: bool,
) -> anyhow::Result<bool> {
// If it already exists, do not extract
if dest.exists() {
Expand Down Expand Up @@ -474,7 +481,16 @@ async fn lazy_download_exe(
// This match only gets Ok if the file was downloaded and extracted to the
// temporary path
match file {
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => {
// If the download lock file exists but is old, delete it and try again, since it may
// have been left by an abandoned process.
if !already_tried_cleaning_old
&& temp_dest.metadata()?.modified()?.elapsed()?.as_secs() > 90
{
std::fs::remove_file(temp_dest)?;
return Box::pin(lazy_download_exe(client, uri, file_to_extract, dest, true)).await;
}

// Since it already exists, we'll try once a second for 20 seconds
// to wait for it to be done, then return false so the caller can
// try again.
Expand Down Expand Up @@ -530,7 +546,7 @@ async fn download_and_extract(
// We have to map the error type to an io error
let stream = resp
.bytes_stream()
.map(|item| item.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)));
.map(|item| item.map_err(|err| io::Error::new(io::ErrorKind::Other, err)));

// Since our tar/zip impls use sync IO, we have to create a bridge and run
// in a blocking closure.
Expand Down Expand Up @@ -574,12 +590,36 @@ async fn download_and_extract(
.await?
}

/// Remove the file if it's older than the TTL. Returns true if the current file can be re-used,
/// returns false if it was removed or should otherwise be re-downloaded.
fn remove_file_past_ttl(ttl: &Option<Duration>, dest: &PathBuf) -> Result<bool, anyhow::Error> {
match ttl {
None => return Ok(true),
Some(ttl) => {
if let Ok(mtime) = dest.metadata().and_then(|d| d.modified()) {
if mtime.elapsed().unwrap_or_default().lt(ttl) {
return Ok(true);
} else {
// Remove so we can re-download
std::fs::remove_file(dest)?;
}
}
// If we couldn't read the mtime something weird is probably up, so
// re-download
}
}
Ok(false)
}

#[cfg(test)]
mod tests {
use super::get_free_port;
use super::{get_free_port, remove_file_past_ttl};
use std::{
collections::HashSet,
env::temp_dir,
fs::File,
net::{TcpListener, TcpStream},
time::{Duration, SystemTime},
};

#[test]
Expand Down Expand Up @@ -609,6 +649,22 @@ mod tests {
}
}

#[tokio::test]
async fn respects_file_ttl() {
let rand_fname = format!("{}", rand::random::<u64>());
let temp_dir = temp_dir();

let dest_file_path = temp_dir.join(format!("core-test-{}", &rand_fname));
let dest_file = File::create(&dest_file_path).unwrap();
let set_time_to = SystemTime::now() - Duration::from_secs(100);
dest_file.set_modified(set_time_to).unwrap();

remove_file_past_ttl(&Some(Duration::from_secs(60)), &dest_file_path).unwrap();

// file should be gone
assert!(!dest_file_path.exists());
}

fn try_listen_and_dial_on(host: &str, port: u16) -> std::io::Result<()> {
let listener = TcpListener::bind((host, port))?;
let _stream = TcpStream::connect((host, port))?;
Expand All @@ -617,7 +673,7 @@ mod tests {
let (socket, _addr) = listener.accept()?;

// Explicitly drop the socket to close the connection from the listening side first
std::mem::drop(socket);
drop(socket);

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,8 @@ pub fn default_cached_download() -> EphemeralExe {
sdk_version: "0.1.0".to_string(),
},
dest_dir: None,
// 15 days
ttl: Some(Duration::from_secs(60 * 60 * 24 * 15)),
}
}

Expand Down
1 change: 1 addition & 0 deletions tests/integ_tests/ephemeral_server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ fn fixed_cached_download(version: &str) -> EphemeralExe {
EphemeralExe::CachedDownload {
version: EphemeralExeVersion::Fixed(version.to_string()),
dest_dir: None,
ttl: None,
}
}

Expand Down

0 comments on commit 6e60111

Please sign in to comment.