Skip to content

Commit

Permalink
Add support for Brioche.download() for locked downloads (#102)
Browse files Browse the repository at this point in the history
* Move download code to new `brioche_core::download` module

* Implement `Brioche.download()` static downloads

* Fix lockfiles changing with empty `downloads` object
  • Loading branch information
kylewlacy authored Jul 21, 2024
1 parent 53837d7 commit 7451f58
Show file tree
Hide file tree
Showing 9 changed files with 478 additions and 103 deletions.
75 changes: 1 addition & 74 deletions crates/brioche-core/src/bake/download.rs
Original file line number Diff line number Diff line change
@@ -1,84 +1,11 @@
use anyhow::Context as _;
use futures::TryStreamExt as _;
use tokio_util::compat::FuturesAsyncReadCompatExt as _;

use crate::{
recipe::{Directory, DownloadRecipe, File},
Brioche,
};

#[tracing::instrument(skip(brioche, download), fields(url = %download.url))]
pub async fn bake_download(brioche: &Brioche, download: DownloadRecipe) -> anyhow::Result<File> {
// Acquire a permit to save the blob
let save_blob_permit = crate::blob::get_save_blob_permit().await?;

// Acquire a permit to download
tracing::debug!("acquiring download semaphore permit");
let _permit = brioche.download_semaphore.acquire().await?;
tracing::debug!("acquired download semaphore permit");

tracing::debug!(url = %download.url, "starting download");

let job_id = brioche.reporter.add_job(crate::reporter::NewJob::Download {
url: download.url.clone(),
});

let response = brioche
.download_client
.get(download.url.clone())
.send()
.await?;
let response = response.error_for_status()?;

let content_length = response.content_length().or_else(|| {
let content_length = response.headers().get(reqwest::header::CONTENT_LENGTH)?;
let content_length = content_length.to_str().ok()?.parse().ok()?;
if content_length == 0 {
None
} else {
Some(content_length)
}
});

let mut download_stream = response
.bytes_stream()
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
.into_async_read()
.compat();
let download_stream = std::pin::pin!(download_stream);

let save_blob_options = crate::blob::SaveBlobOptions::new()
.expected_hash(Some(download.hash))
.on_progress(|bytes_read| {
if let Some(content_length) = content_length {
let progress_percent = (bytes_read as f64 / content_length as f64) * 100.0;
let progress_percent = progress_percent.round().min(99.0) as u8;
brioche.reporter.update_job(
job_id,
crate::reporter::UpdateJob::Download {
progress_percent: Some(progress_percent),
},
);
}

Ok(())
});

let blob_hash = crate::blob::save_blob_from_reader(
brioche,
save_blob_permit,
download_stream,
save_blob_options,
)
.await
.context("failed to save blob")?;

brioche.reporter.update_job(
job_id,
crate::reporter::UpdateJob::Download {
progress_percent: Some(100),
},
);
let blob_hash = crate::download::download(brioche, &download.url, Some(download.hash)).await?;

Ok(File {
content_blob: blob_hash,
Expand Down
81 changes: 81 additions & 0 deletions crates/brioche-core/src/download.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use anyhow::Context as _;
use futures::TryStreamExt as _;
use tokio_util::compat::FuturesAsyncReadCompatExt as _;

use crate::Brioche;

#[tracing::instrument(skip(brioche, expected_hash))]
pub async fn download(
brioche: &Brioche,
url: &url::Url,
expected_hash: Option<crate::Hash>,
) -> anyhow::Result<crate::blob::BlobHash> {
// Acquire a permit to save the blob
let save_blob_permit = crate::blob::get_save_blob_permit().await?;

// Acquire a permit to download
tracing::debug!("acquiring download semaphore permit");
let _permit = brioche.download_semaphore.acquire().await?;
tracing::debug!("acquired download semaphore permit");

tracing::debug!(%url, "starting download");

let job_id = brioche
.reporter
.add_job(crate::reporter::NewJob::Download { url: url.clone() });

let response = brioche.download_client.get(url.clone()).send().await?;
let response = response.error_for_status()?;

let content_length = response.content_length().or_else(|| {
let content_length = response.headers().get(reqwest::header::CONTENT_LENGTH)?;
let content_length = content_length.to_str().ok()?.parse().ok()?;
if content_length == 0 {
None
} else {
Some(content_length)
}
});

let mut download_stream = response
.bytes_stream()
.map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
.into_async_read()
.compat();
let download_stream = std::pin::pin!(download_stream);

let save_blob_options = crate::blob::SaveBlobOptions::new()
.expected_hash(expected_hash)
.on_progress(|bytes_read| {
if let Some(content_length) = content_length {
let progress_percent = (bytes_read as f64 / content_length as f64) * 100.0;
let progress_percent = progress_percent.round().min(99.0) as u8;
brioche.reporter.update_job(
job_id,
crate::reporter::UpdateJob::Download {
progress_percent: Some(progress_percent),
},
);
}

Ok(())
});

let blob_hash = crate::blob::save_blob_from_reader(
brioche,
save_blob_permit,
download_stream,
save_blob_options,
)
.await
.context("failed to save blob")?;

brioche.reporter.update_job(
job_id,
crate::reporter::UpdateJob::Download {
progress_percent: Some(100),
},
);

Ok(blob_hash)
}
5 changes: 5 additions & 0 deletions crates/brioche-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::{

pub mod bake;
pub mod blob;
pub mod download;
pub mod encoding;
pub mod fs_utils;
pub mod input;
Expand Down Expand Up @@ -312,6 +313,10 @@ pub enum Hasher {
}

impl Hasher {
pub fn new_sha256() -> Self {
Self::Sha256(sha2::Sha256::new())
}

pub fn for_hash(hash: &Hash) -> Self {
match hash {
Hash::Sha256 { .. } => Self::Sha256(sha2::Sha256::new()),
Expand Down
Loading

0 comments on commit 7451f58

Please sign in to comment.