Skip to content

Commit

Permalink
Merge pull request #1123 from rust-lang/cache-rework
Browse files Browse the repository at this point in the history
  • Loading branch information
shepmaster authored Nov 24, 2024
2 parents 73f90a9 + f116c28 commit 97331e5
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 342 deletions.
51 changes: 33 additions & 18 deletions compiler/base/orchestrator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,7 @@ impl Container {
already_cancelled = true;

let msg = CoordinatorMessage::Kill;
trace!("processing {msg:?}");
trace!(msg_name = msg.as_ref(), "processing");
to_worker_tx.send(msg).await.context(KillSnafu)?;
},

Expand All @@ -1847,12 +1847,12 @@ impl Container {
}
};

trace!("processing {msg:?}");
trace!(msg_name = msg.as_ref(), "processing");
to_worker_tx.send(msg).await.context(StdinSnafu)?;
},

Some(container_msg) = from_worker_rx.recv() => {
trace!("processing {container_msg:?}");
trace!(msg_name = container_msg.as_ref(), "processing");

match container_msg {
WorkerMessage::ExecuteCommand(resp) => {
Expand Down Expand Up @@ -2358,48 +2358,63 @@ impl Commander {
gc_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
select! {
command = command_rx.recv() => {
let Some((ack_tx, command)) = command else { break };
enum Event {
Command(Option<(oneshot::Sender<()>, DemultiplexCommand)>),

FromWorker(Option<Multiplexed<WorkerMessage>>),

// Find any channels where the receivers have been
// dropped and clear out the sending halves.
Gc,
}
use Event::*;

let event = select! {
command = command_rx.recv() => Command(command),

msg = from_worker_rx.recv() => FromWorker(msg),

_ = gc_interval.tick() => Gc,
};

match event {
Command(None) => break,
Command(Some((ack_tx, command))) => {
match command {
DemultiplexCommand::Listen(job_id, waiter) => {
trace!("adding listener for {job_id:?}");
trace!(job_id, "adding listener (many)");
let old = waiting.insert(job_id, waiter);
ensure!(old.is_none(), DuplicateDemultiplexerClientSnafu { job_id });
}

DemultiplexCommand::ListenOnce(job_id, waiter) => {
trace!("adding listener for {job_id:?}");
trace!(job_id, "adding listener (once)");
let old = waiting_once.insert(job_id, waiter);
ensure!(old.is_none(), DuplicateDemultiplexerClientSnafu { job_id });
}
}

ack_tx.send(()).ok(/* Don't care about it */);
},

msg = from_worker_rx.recv() => {
let Some(Multiplexed(job_id, msg)) = msg else { break };
}

FromWorker(None) => break,
FromWorker(Some(Multiplexed(job_id, msg))) => {
if let Some(waiter) = waiting_once.remove(&job_id) {
trace!("notifying listener for {job_id:?}");
trace!(job_id, "notifying listener (once)");
waiter.send(msg).ok(/* Don't care about it */);
continue;
}

if let Some(waiter) = waiting.get(&job_id) {
trace!("notifying listener for {job_id:?}");
trace!(job_id, "notifying listener (many)");
waiter.send(msg).await.ok(/* Don't care about it */);
continue;
}

warn!("no listener for {job_id:?}");
warn!(job_id, "no listener to notify");
}

// Find any channels where the receivers have been
// dropped and clear out the sending halves.
_ = gc_interval.tick() => {
Gc => {
waiting = mem::take(&mut waiting)
.into_iter()
.filter(|(_job_id, tx)| !tx.is_closed())
Expand Down
2 changes: 1 addition & 1 deletion compiler/base/orchestrator/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ macro_rules! impl_narrow_to_broad {
#[derive(Debug, Serialize, Deserialize)]
pub struct Multiplexed<T>(pub JobId, pub T);

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, strum_macros::AsRefStr)]
pub enum CoordinatorMessage {
WriteFile(WriteFileRequest),
DeleteFile(DeleteFileRequest),
Expand Down
1 change: 1 addition & 0 deletions tests/spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

capture_js_log = ENV.fetch('CAPTURE_JS_LOG', 'false').casecmp?('true')
Selenium::WebDriver.logger.level = :debug if capture_js_log
Selenium::WebDriver.logger.ignore(:clear_local_storage, :clear_session_storage)

browser_options = ::Selenium::WebDriver::Firefox::Options.new
browser_options.add_argument('-headless') if ENV.fetch('HEADLESS', 'true').casecmp?('true')
Expand Down
6 changes: 0 additions & 6 deletions ui/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ pub(crate) enum Endpoint {
MacroExpansion,
MetaCrates,
MetaVersions,
MetaVersionStable,
MetaVersionBeta,
MetaVersionNightly,
MetaVersionRustfmt,
MetaVersionClippy,
MetaVersionMiri,
Evaluate,
}

Expand Down
Loading

0 comments on commit 97331e5

Please sign in to comment.