Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasi-http: make the buffer and budget capacity of the OutgoingBody writer configurable #9670

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use std::io::{self, Read};
use test_programs::wasi::http::types::{Method, Scheme};

fn main() {
// TODO: ensure more than 700 bytes is allowed without error
const LEN: usize = 700;
// Make sure the final body is larger than 1024*1024, but we cannot allocate
// so much memory directly in the wasm program, so we use the `repeat`
// method to increase the body size.
const LEN: usize = 1024;
const REPEAT: usize = 1025;
let mut buffer = [0; LEN];
let addr = std::env::var("HTTP_SERVER").unwrap();
io::repeat(0b001).read_exact(&mut buffer).unwrap();
Expand All @@ -13,7 +16,7 @@ fn main() {
Scheme::Http,
&addr,
"/post",
Some(&buffer),
Some(&buffer.repeat(REPEAT)),
None,
None,
None,
Expand All @@ -26,5 +29,5 @@ fn main() {
assert_eq!(res.status, 200);
let method = res.header("x-wasmtime-test-method").unwrap();
assert_eq!(std::str::from_utf8(method).unwrap(), "POST");
assert_eq!(res.body.len(), LEN);
assert_eq!(res.body.len(), LEN * REPEAT);
}
29 changes: 14 additions & 15 deletions crates/test-programs/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ pub fn request(
.body()
.map_err(|_| anyhow!("outgoing request write failed"))?;

let options = http_types::RequestOptions::new();
options
.set_connect_timeout(connect_timeout)
.map_err(|()| anyhow!("failed to set connect_timeout"))?;
options
.set_first_byte_timeout(first_by_timeout)
.map_err(|()| anyhow!("failed to set first_byte_timeout"))?;
options
.set_between_bytes_timeout(between_bytes_timeout)
.map_err(|()| anyhow!("failed to set between_bytes_timeout"))?;
let options = Some(options);

let future_response = outgoing_handler::handle(request, options)?;

if let Some(mut buf) = body {
let request_body = outgoing_body
.write()
Expand Down Expand Up @@ -110,21 +124,6 @@ pub fn request(
Err(_) => anyhow::bail!("output stream error"),
};
}

let options = http_types::RequestOptions::new();
options
.set_connect_timeout(connect_timeout)
.map_err(|()| anyhow!("failed to set connect_timeout"))?;
options
.set_first_byte_timeout(first_by_timeout)
.map_err(|()| anyhow!("failed to set first_byte_timeout"))?;
options
.set_between_bytes_timeout(between_bytes_timeout)
.map_err(|()| anyhow!("failed to set between_bytes_timeout"))?;
let options = Some(options);

let future_response = outgoing_handler::handle(request, options)?;

http_types::OutgoingBody::finish(outgoing_body, None)?;

let incoming_response = match future_response.get() {
Expand Down
16 changes: 11 additions & 5 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,14 @@ pub struct HostOutgoingBody {

impl HostOutgoingBody {
/// Create a new `HostOutgoingBody`
pub fn new(context: StreamContext, size: Option<u64>) -> (Self, HyperOutgoingBody) {
pub fn new(
context: StreamContext,
size: Option<u64>,
buffer_chunks: usize,
chunk_size: usize,
) -> (Self, HyperOutgoingBody) {
assert!(buffer_chunks >= 1);

let written = size.map(WrittenState::new);

use tokio::sync::oneshot::error::RecvError;
Expand Down Expand Up @@ -469,17 +476,16 @@ impl HostOutgoingBody {
}
}

let (body_sender, body_receiver) = mpsc::channel(2);
// always add 1 buffer here because one empty slot is required
let (body_sender, body_receiver) = mpsc::channel(buffer_chunks + 1);
let (finish_sender, finish_receiver) = oneshot::channel();
let body_impl = BodyImpl {
body_receiver,
finish_receiver: Some(finish_receiver),
}
.boxed();

// TODO: this capacity constant is arbitrary, and should be configurable
let output_stream =
BodyWriteStream::new(context, 1024 * 1024, body_sender, written.clone());
let output_stream = BodyWriteStream::new(context, chunk_size, body_sender, written.clone());

(
Self {
Expand Down
37 changes: 37 additions & 0 deletions crates/wasi-http/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,19 @@ pub trait WasiHttpView: Send {
fn is_forbidden_header(&mut self, _name: &HeaderName) -> bool {
false
}

/// Number of distinct write calls to the outgoing body's output-stream
/// that the implementation will buffer.
/// Default: 1.
fn outgoing_body_buffer_chunks(&mut self) -> usize {
1
}

/// Maximum size allowed in a write call to the outgoing body's output-stream.
/// Default: 1024 * 1024.
fn outgoing_body_chunk_size(&mut self) -> usize {
1024 * 1024
}
}

impl<T: ?Sized + WasiHttpView> WasiHttpView for &mut T {
Expand Down Expand Up @@ -156,6 +169,14 @@ impl<T: ?Sized + WasiHttpView> WasiHttpView for &mut T {
fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
T::is_forbidden_header(self, name)
}

fn outgoing_body_buffer_chunks(&mut self) -> usize {
T::outgoing_body_buffer_chunks(self)
}

fn outgoing_body_chunk_size(&mut self) -> usize {
T::outgoing_body_chunk_size(self)
}
}

impl<T: ?Sized + WasiHttpView> WasiHttpView for Box<T> {
Expand Down Expand Up @@ -187,6 +208,14 @@ impl<T: ?Sized + WasiHttpView> WasiHttpView for Box<T> {
fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
T::is_forbidden_header(self, name)
}

fn outgoing_body_buffer_chunks(&mut self) -> usize {
T::outgoing_body_buffer_chunks(self)
}

fn outgoing_body_chunk_size(&mut self) -> usize {
T::outgoing_body_chunk_size(self)
}
}

/// A concrete structure that all generated `Host` traits are implemented for.
Expand Down Expand Up @@ -233,6 +262,14 @@ impl<T: WasiHttpView> WasiHttpView for WasiHttpImpl<T> {
fn is_forbidden_header(&mut self, name: &HeaderName) -> bool {
self.0.is_forbidden_header(name)
}

fn outgoing_body_buffer_chunks(&mut self) -> usize {
self.0.outgoing_body_buffer_chunks()
}

fn outgoing_body_chunk_size(&mut self) -> usize {
self.0.outgoing_body_chunk_size()
}
}

/// Returns `true` when the header is forbidden according to this [`WasiHttpView`] implementation.
Expand Down
10 changes: 8 additions & 2 deletions crates/wasi-http/src/types_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ where
&mut self,
request: Resource<HostOutgoingRequest>,
) -> wasmtime::Result<Result<Resource<HostOutgoingBody>, ()>> {
let buffer_chunks = self.outgoing_body_buffer_chunks();
let chunk_size = self.outgoing_body_chunk_size();
let req = self
.table()
.get_mut(&request)
Expand All @@ -405,7 +407,8 @@ where
Err(e) => return Ok(Err(e)),
};

let (host_body, hyper_body) = HostOutgoingBody::new(StreamContext::Request, size);
let (host_body, hyper_body) =
HostOutgoingBody::new(StreamContext::Request, size, buffer_chunks, chunk_size);

req.body = Some(hyper_body);

Expand Down Expand Up @@ -751,6 +754,8 @@ where
&mut self,
id: Resource<HostOutgoingResponse>,
) -> wasmtime::Result<Result<Resource<HostOutgoingBody>, ()>> {
let buffer_chunks = self.outgoing_body_buffer_chunks();
let chunk_size = self.outgoing_body_chunk_size();
let resp = self.table().get_mut(&id)?;

if resp.body.is_some() {
Expand All @@ -762,7 +767,8 @@ where
Err(e) => return Ok(Err(e)),
};

let (host, body) = HostOutgoingBody::new(StreamContext::Response, size);
let (host, body) =
HostOutgoingBody::new(StreamContext::Response, size, buffer_chunks, chunk_size);

resp.body.replace(body);

Expand Down