Skip to content

Commit

Permalink
Use receive/after to implement request_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
jswanner committed Sep 10, 2023
1 parent d3c406b commit 0ca2a91
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 96 deletions.
5 changes: 4 additions & 1 deletion lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,12 @@ defmodule Finch do
* `:pool_timeout` - This timeout is applied when we check out a connection from the pool.
Default value is `5_000`.
* `:receive_timeout` - The maximum time to wait for a response before returning an error.
* `:receive_timeout` - The maximum time to wait for each chunk to be received before returning an error.
Default value is `15_000`.
* `:request_timeout` - The maximum time to wait for a complete response before returning an error.
Default value is `:infinity`.
"""
@spec request(Request.t(), name(), request_opts()) ::
{:ok, Response.t()}
Expand Down
105 changes: 13 additions & 92 deletions lib/finch/http1/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ defmodule Finch.Conn do
end
end

def request(%{mint: nil} = conn, _, _, _, _, _, _), do: {:error, conn, "Could not connect"}
def request(%{mint: nil} = conn, _, _, _, _, _), do: {:error, conn, "Could not connect"}

def request(conn, req, acc, fun, receive_timeout, request_timeout, idle_time) do
def request(conn, req, acc, fun, receive_timeout, idle_time) do
full_path = Finch.Request.request_path(req)

metadata = %{request: req}
Expand All @@ -118,8 +118,7 @@ defmodule Finch.Conn do
Telemetry.stop(:send, start_time, metadata, extra_measurements)
start_time = Telemetry.start(:recv, metadata, extra_measurements)

response =
receive_response([], acc, fun, mint, ref, receive_timeout, request_timeout)
response = receive_response([], acc, fun, mint, ref, receive_timeout)

handle_response(response, conn, metadata, start_time, extra_measurements)

Expand Down Expand Up @@ -190,98 +189,23 @@ defmodule Finch.Conn do
end
end

defp receive_response(
entries,
acc,
fun,
mint,
ref,
receive_timeout,
request_timeout,
status \\ nil,
headers \\ []
)

defp receive_response(
[{:done, ref} | _],
acc,
_fun,
mint,
ref,
_receive_timeout,
_request_timeout,
status,
headers
) do
{:ok, mint, acc, {status, headers}}
end
defp receive_response(entries, acc, fun, mint, ref, timeout, status \\ nil, headers \\ [])

defp receive_response(
_,
_acc,
_fun,
mint,
_ref,
_receive_timeout,
request_timeout,
status,
headers
)
when request_timeout < 0 do
{:error, mint, %Mint.TransportError{reason: :timeout}, {status, headers}}
defp receive_response([{:done, ref} | _], acc, _fun, mint, ref, _timeout, status, headers) do
{:ok, mint, acc, {status, headers}}
end

defp receive_response(
[],
acc,
fun,
mint,
ref,
receive_timeout,
request_timeout,
status,
headers
) do
start_time = System.monotonic_time(:millisecond)

case MintHTTP1.recv(mint, 0, receive_timeout) do
defp receive_response([], acc, fun, mint, ref, timeout, status, headers) do
case MintHTTP1.recv(mint, 0, timeout) do
{:ok, mint, entries} ->
request_timeout =
if is_integer(request_timeout) do
elapsed_time = System.monotonic_time(:millisecond) - start_time
request_timeout - elapsed_time
else
request_timeout
end

receive_response(
entries,
acc,
fun,
mint,
ref,
receive_timeout,
request_timeout,
status,
headers
)
receive_response(entries, acc, fun, mint, ref, timeout, status, headers)

{:error, mint, error, _responses} ->
{:error, mint, error, {status, headers}}
end
end

defp receive_response(
[entry | entries],
acc,
fun,
mint,
ref,
receive_timeout,
request_timeout,
status,
headers
) do
defp receive_response([entry | entries], acc, fun, mint, ref, timeout, status, headers) do
case entry do
{:status, ^ref, value} ->
case fun.({:status, value}, acc) do
Expand All @@ -292,8 +216,7 @@ defmodule Finch.Conn do
fun,
mint,
ref,
receive_timeout,
request_timeout,
timeout,
value,
headers
)
Expand All @@ -315,8 +238,7 @@ defmodule Finch.Conn do
fun,
mint,
ref,
receive_timeout,
request_timeout,
timeout,
status,
headers ++ value
)
Expand All @@ -338,8 +260,7 @@ defmodule Finch.Conn do
fun,
mint,
ref,
receive_timeout,
request_timeout,
timeout,
status,
headers
)
Expand Down
27 changes: 24 additions & 3 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Finch.HTTP1.Pool do
@behaviour Finch.Pool

alias Finch.Conn
alias Finch.Error
alias Finch.Telemetry

def child_spec(opts) do
Expand All @@ -27,9 +28,29 @@ defmodule Finch.HTTP1.Pool do

@impl Finch.Pool
def request(pool, req, acc, fun, opts) do
case Keyword.get(opts, :request_timeout, :infinity) do
timeout when is_integer(timeout) and timeout > 0 ->
ref = make_ref()
owner = self()

spawn_link(fn ->
send(owner, {ref, __request__(pool, req, acc, fun, opts)})
end)

receive do
{^ref, response} -> response
after
timeout -> {:error, Error.exception(:timeout)}
end

_ ->
__request__(pool, req, acc, fun, opts)
end
end

def __request__(pool, req, acc, fun, opts) do
pool_timeout = Keyword.get(opts, :pool_timeout, 5_000)
receive_timeout = Keyword.get(opts, :receive_timeout, 15_000)
request_timeout = Keyword.get(opts, :request_timeout, :infinity)

metadata = %{request: req, pool: pool}

Expand All @@ -44,7 +65,7 @@ defmodule Finch.HTTP1.Pool do

with {:ok, conn} <- Conn.connect(conn),
{:ok, conn, acc} <-
Conn.request(conn, req, acc, fun, receive_timeout, request_timeout, idle_time) do
Conn.request(conn, req, acc, fun, receive_timeout, idle_time) do
{{:ok, acc}, transfer_if_open(conn, state, from)}
else
{:error, conn, error} ->
Expand Down Expand Up @@ -85,7 +106,7 @@ defmodule Finch.HTTP1.Pool do
monitor = Process.monitor(owner)
request_ref = {__MODULE__, self()}

case request(pool, req, {owner, monitor, request_ref}, &send_async_response/2, opts) do
case __request__(pool, req, {owner, monitor, request_ref}, &send_async_response/2, opts) do
{:ok, _} -> send(owner, {request_ref, :done})
{:error, error} -> send(owner, {request_ref, {:error, error}})
end
Expand Down

0 comments on commit 0ca2a91

Please sign in to comment.