Skip to content

Commit

Permalink
Merge pull request #4 from Fraser999/error-log
Browse files Browse the repository at this point in the history
Set active_multi_frame when sending a multi-frame message
  • Loading branch information
marc-casperlabs authored Feb 28, 2024
2 parents dfc2a9d + e5b6242 commit 0fd196a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 17 deletions.
6 changes: 6 additions & 0 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,9 @@ where
let msg = self.juliet.create_request(channel, payload)?;
let id = msg.header().id();
self.request_map.insert(io_id, (channel, id));
if msg.is_multi_frame(self.juliet.max_frame_size()) {
self.active_multi_frame[channel.get() as usize] = Some(msg.header());
}
self.ready_queue.push_back(msg.frames());

drop(permit);
Expand All @@ -751,6 +754,9 @@ where
payload,
} => {
if let Some(msg) = self.juliet.create_response(channel, id, payload)? {
if msg.is_multi_frame(self.juliet.max_frame_size()) {
self.active_multi_frame[channel.get() as usize] = Some(msg.header());
}
self.ready_queue.push_back(msg.frames())
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2384,9 +2384,9 @@ mod tests {
outcome,
CompletedRead::ReceivedResponse {
channel,
/// The ID of the request received.
// The ID of the request received.
id,
/// The response payload.
// The response payload.
payload: None,
}
);
Expand Down
29 changes: 14 additions & 15 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1565,12 +1565,13 @@ mod tests {
});

// Preload alice's queue with requests.
let data: Vec<u8> = iter::repeat(0xFF).take(PAYLOAD_SIZE).collect();
let payload = Bytes::from(data);
let mut payloads = vec![];

let mut guards = Vec::new();

for idx in 0..NUM_REQUESTS {
for idx in 0..NUM_REQUESTS as usize {
let payload = Bytes::from_iter((idx..PAYLOAD_SIZE + (idx * 2)).map(|val| val as u8));
payloads.push(payload.clone());
let guard = alice
.client
.create_request(ChannelId::new(0))
Expand All @@ -1582,26 +1583,24 @@ mod tests {
}

let bob_join_handle = tokio::spawn(async move {
while let Some(incoming_request) = bob
.server
.next_request()
.await
.expect("bob should never error")
{
for expected_payload in payloads {
let incoming_request = bob
.server
.next_request()
.await
.expect("bob should never error")
.expect("bob should never get None");
eprintln!("bob received: {}", incoming_request);
assert_eq!(incoming_request.payload, Some(expected_payload));
incoming_request.respond(None);
}

eprintln!("bob quit quietly");
});

// Both background tasks are running, wait for requests to finish.
for (idx, guard) in guards.into_iter().enumerate() {
guard
.wait_for_response()
.await
.expect("failed to receive response");
eprintln!("guard {} done", idx);
let resp = guard.wait_for_response().await;
eprintln!("guard {idx}: {resp:?}");
}

// Join both server tasks to ensure there were no panics.
Expand Down

0 comments on commit 0fd196a

Please sign in to comment.