Skip to content

Commit

Permalink
Introduce :request_timeout option
Browse files Browse the repository at this point in the history
Intended to timeout a request with a chunked response after the elapsed
time has surpassed the given value. See #243.
  • Loading branch information
jswanner committed Sep 24, 2023
1 parent 08085ca commit a8b4b22
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 27 deletions.
5 changes: 4 additions & 1 deletion lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,12 @@ defmodule Finch do
* `:pool_timeout` - This timeout is applied when we check out a connection from the pool.
Default value is `5_000`.
* `:receive_timeout` - The maximum time to wait for a response before returning an error.
* `:receive_timeout` - The maximum time to wait for each chunk to be received before returning an error.
Default value is `15_000`.
* `:request_timeout` - The maximum time to wait for a complete response before returning an error.
Default value is `:infinity`.
"""
@spec request(Request.t(), name(), request_opts()) ::
{:ok, Response.t()}
Expand Down
54 changes: 35 additions & 19 deletions lib/finch/http1/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ defmodule Finch.Conn do
end
end

def request(%{mint: nil} = conn, _, _, _, _, _), do: {:error, conn, "Could not connect"}
def request(%{mint: nil} = conn, _, _, _, _, _, _), do: {:error, conn, "Could not connect"}

def request(conn, req, acc, fun, receive_timeout, idle_time) do
def request(conn, req, acc, fun, receive_timeout, request_timeout, idle_time) do
full_path = Finch.Request.request_path(req)

metadata = %{request: req}
Expand All @@ -113,11 +113,12 @@ defmodule Finch.Conn do
stream_or_body(req.body)
) do
{:ok, mint, ref} ->
case maybe_stream_request_body(mint, ref, req.body, receive_timeout) do
case maybe_stream_request_body(mint, ref, req.body) do
{:ok, mint} ->
Telemetry.stop(:send, start_time, metadata, extra_measurements)
start_time = Telemetry.start(:recv, metadata, extra_measurements)
resp_metadata = %{status: nil, headers: [], trailers: []}
timeouts = %{receive_timeout: receive_timeout, request_timeout: request_timeout}

response =
receive_response(
Expand All @@ -126,7 +127,7 @@ defmodule Finch.Conn do
fun,
mint,
ref,
receive_timeout,
timeouts,
:headers,
resp_metadata
)
Expand Down Expand Up @@ -164,13 +165,13 @@ defmodule Finch.Conn do
{:error, %{conn | mint: mint}, error}
end

defp maybe_stream_request_body(mint, ref, {:stream, stream}, _timeout) do
defp maybe_stream_request_body(mint, ref, {:stream, stream}) do
with {:ok, mint} <- stream_request_body(mint, ref, stream) do
MintHTTP1.stream_request_body(mint, ref, :eof)
end
end

defp maybe_stream_request_body(mint, _, _, _), do: {:ok, mint}
defp maybe_stream_request_body(mint, _, _), do: {:ok, mint}

defp stream_request_body(mint, ref, stream) do
Enum.reduce_while(stream, {:ok, mint}, fn
Expand Down Expand Up @@ -206,7 +207,7 @@ defmodule Finch.Conn do
fun,
mint,
ref,
timeout,
timeouts,
fields,
resp_metadata
)
Expand All @@ -217,7 +218,7 @@ defmodule Finch.Conn do
_fun,
mint,
ref,
_timeout,
_timeouts,
_fields,
resp_metadata
) do
Expand All @@ -230,29 +231,44 @@ defmodule Finch.Conn do
_fun,
mint,
_ref,
timeout,
timeouts,
_fields,
resp_metadata
)
when timeout < 0 do
when timeouts.request_timeout < 0 do
{:ok, mint} = Mint.HTTP1.close(mint)
{:error, mint, %Mint.TransportError{reason: :timeout}, resp_metadata}
end

defp receive_response([], acc, fun, mint, ref, timeout, fields, resp_metadata) do
defp receive_response(
[],
acc,
fun,
mint,
ref,
timeouts,
fields,
resp_metadata
) do
start_time = System.monotonic_time(:millisecond)

case MintHTTP1.recv(mint, 0, timeout) do
case MintHTTP1.recv(mint, 0, timeouts.receive_timeout) do
{:ok, mint, entries} ->
elapsed_time = System.monotonic_time(:millisecond) - start_time
timeout = timeout - elapsed_time
timeouts =
if is_integer(timeouts.request_timeout) do
elapsed_time = System.monotonic_time(:millisecond) - start_time
update_in(timeouts.request_timeout, &(&1 - elapsed_time))
else
timeouts
end

receive_response(
entries,
acc,
fun,
mint,
ref,
timeout,
timeouts,
fields,
resp_metadata
)
Expand All @@ -268,7 +284,7 @@ defmodule Finch.Conn do
fun,
mint,
ref,
timeout,
timeouts,
fields,
resp_metadata
) do
Expand All @@ -282,7 +298,7 @@ defmodule Finch.Conn do
fun,
mint,
ref,
timeout,
timeouts,
fields,
%{resp_metadata | status: value}
)
Expand All @@ -306,7 +322,7 @@ defmodule Finch.Conn do
fun,
mint,
ref,
timeout,
timeouts,
fields,
resp_metadata
)
Expand All @@ -328,7 +344,7 @@ defmodule Finch.Conn do
fun,
mint,
ref,
timeout,
timeouts,
:trailers,
resp_metadata
)
Expand Down
4 changes: 3 additions & 1 deletion lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Finch.HTTP1.Pool do
def request(pool, req, acc, fun, opts) do
pool_timeout = Keyword.get(opts, :pool_timeout, 5_000)
receive_timeout = Keyword.get(opts, :receive_timeout, 15_000)
request_timeout = Keyword.get(opts, :request_timeout, :infinity)

metadata = %{request: req, pool: pool}

Expand All @@ -42,7 +43,8 @@ defmodule Finch.HTTP1.Pool do
Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time})

with {:ok, conn} <- Conn.connect(conn),
{:ok, conn, acc} <- Conn.request(conn, req, acc, fun, receive_timeout, idle_time) do
{:ok, conn, acc} <-
Conn.request(conn, req, acc, fun, receive_timeout, request_timeout, idle_time) do
{{:ok, acc}, transfer_if_open(conn, state, from)}
else
{:error, conn, error} ->
Expand Down
18 changes: 12 additions & 6 deletions test/finch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -516,26 +516,32 @@ defmodule FinchTest do
finch_name: finch_name
} do
start_supervised!({Finch, name: finch_name})

timeout = 600

Bypass.expect(bypass, fn conn ->
Process.flag(:trap_exit, true)
conn = Plug.Conn.send_chunked(conn, 200)

Enum.reduce(1..5, conn, fn _, conn ->
Enum.reduce_while(1..5, conn, fn _, conn ->
Process.sleep(timeout - 100)
{_, conn} = Plug.Conn.chunk(conn, "chunk-data")
conn

receive do
{:EXIT, _, _} -> {:halt, conn}
after
0 ->
{_, conn} = Plug.Conn.chunk(conn, "chunk-data")
{:cont, conn}
end
end)
end)

assert {:error, %{reason: :timeout}} =
Finch.build(:get, endpoint(bypass))
|> Finch.request(finch_name, receive_timeout: timeout)
|> Finch.request(finch_name, request_timeout: timeout)

assert {:ok, %Response{}} =
Finch.build(:get, endpoint(bypass))
|> Finch.request(finch_name, receive_timeout: timeout * 10)
|> Finch.request(finch_name, request_timeout: timeout * 10)
end

test "returns error when requesting bad address", %{finch_name: finch_name} do
Expand Down

0 comments on commit a8b4b22

Please sign in to comment.