From a8b4b22429fabf65a69a30921b333beea3eb5e39 Mon Sep 17 00:00:00 2001 From: Jacob Swanner Date: Fri, 1 Sep 2023 13:56:06 -0700 Subject: [PATCH] Introduce :request_timeout option Intended to timeout a request with a chunked response after the elapsed time has surpassed the given value. See #243. --- lib/finch.ex | 5 +++- lib/finch/http1/conn.ex | 54 ++++++++++++++++++++++++++--------------- lib/finch/http1/pool.ex | 4 ++- test/finch_test.exs | 18 +++++++++----- 4 files changed, 54 insertions(+), 27 deletions(-) diff --git a/lib/finch.ex b/lib/finch.ex index 41a16ef1..24a33c0c 100644 --- a/lib/finch.ex +++ b/lib/finch.ex @@ -399,9 +399,12 @@ defmodule Finch do * `:pool_timeout` - This timeout is applied when we check out a connection from the pool. Default value is `5_000`. - * `:receive_timeout` - The maximum time to wait for a response before returning an error. + * `:receive_timeout` - The maximum time to wait for each chunk to be received before returning an error. Default value is `15_000`. + * `:request_timeout` - The maximum time to wait for a complete response before returning an error. + Default value is `:infinity`. + """ @spec request(Request.t(), name(), request_opts()) :: {:ok, Response.t()} diff --git a/lib/finch/http1/conn.ex b/lib/finch/http1/conn.ex index ed528d38..d71b996c 100644 --- a/lib/finch/http1/conn.ex +++ b/lib/finch/http1/conn.ex @@ -93,9 +93,9 @@ defmodule Finch.Conn do end end - def request(%{mint: nil} = conn, _, _, _, _, _), do: {:error, conn, "Could not connect"} + def request(%{mint: nil} = conn, _, _, _, _, _, _), do: {:error, conn, "Could not connect"} - def request(conn, req, acc, fun, receive_timeout, idle_time) do + def request(conn, req, acc, fun, receive_timeout, request_timeout, idle_time) do full_path = Finch.Request.request_path(req) metadata = %{request: req} @@ -113,11 +113,12 @@ defmodule Finch.Conn do stream_or_body(req.body) ) do {:ok, mint, ref} -> - case maybe_stream_request_body(mint, ref, req.body, receive_timeout) do + case maybe_stream_request_body(mint, ref, req.body) do {:ok, mint} -> Telemetry.stop(:send, start_time, metadata, extra_measurements) start_time = Telemetry.start(:recv, metadata, extra_measurements) resp_metadata = %{status: nil, headers: [], trailers: []} + timeouts = %{receive_timeout: receive_timeout, request_timeout: request_timeout} response = receive_response( @@ -126,7 +127,7 @@ defmodule Finch.Conn do fun, mint, ref, - receive_timeout, + timeouts, :headers, resp_metadata ) @@ -164,13 +165,13 @@ defmodule Finch.Conn do {:error, %{conn | mint: mint}, error} end - defp maybe_stream_request_body(mint, ref, {:stream, stream}, _timeout) do + defp maybe_stream_request_body(mint, ref, {:stream, stream}) do with {:ok, mint} <- stream_request_body(mint, ref, stream) do MintHTTP1.stream_request_body(mint, ref, :eof) end end - defp maybe_stream_request_body(mint, _, _, _), do: {:ok, mint} + defp maybe_stream_request_body(mint, _, _), do: {:ok, mint} defp stream_request_body(mint, ref, stream) do Enum.reduce_while(stream, {:ok, mint}, fn @@ -206,7 +207,7 @@ defmodule Finch.Conn do fun, mint, ref, - timeout, + timeouts, fields, resp_metadata ) @@ -217,7 +218,7 @@ defmodule Finch.Conn do _fun, mint, ref, - _timeout, + _timeouts, _fields, resp_metadata ) do @@ -230,21 +231,36 @@ defmodule Finch.Conn do _fun, mint, _ref, - timeout, + timeouts, _fields, resp_metadata ) - when timeout < 0 do + when timeouts.request_timeout < 0 do + {:ok, mint} = Mint.HTTP1.close(mint) {:error, mint, %Mint.TransportError{reason: :timeout}, resp_metadata} end - defp receive_response([], acc, fun, mint, ref, timeout, fields, resp_metadata) do + defp receive_response( + [], + acc, + fun, + mint, + ref, + timeouts, + fields, + resp_metadata + ) do start_time = System.monotonic_time(:millisecond) - case MintHTTP1.recv(mint, 0, timeout) do + case MintHTTP1.recv(mint, 0, timeouts.receive_timeout) do {:ok, mint, entries} -> - elapsed_time = System.monotonic_time(:millisecond) - start_time - timeout = timeout - elapsed_time + timeouts = + if is_integer(timeouts.request_timeout) do + elapsed_time = System.monotonic_time(:millisecond) - start_time + update_in(timeouts.request_timeout, &(&1 - elapsed_time)) + else + timeouts + end receive_response( entries, @@ -252,7 +268,7 @@ defmodule Finch.Conn do fun, mint, ref, - timeout, + timeouts, fields, resp_metadata ) @@ -268,7 +284,7 @@ defmodule Finch.Conn do fun, mint, ref, - timeout, + timeouts, fields, resp_metadata ) do @@ -282,7 +298,7 @@ defmodule Finch.Conn do fun, mint, ref, - timeout, + timeouts, fields, %{resp_metadata | status: value} ) @@ -306,7 +322,7 @@ defmodule Finch.Conn do fun, mint, ref, - timeout, + timeouts, fields, resp_metadata ) @@ -328,7 +344,7 @@ defmodule Finch.Conn do fun, mint, ref, - timeout, + timeouts, :trailers, resp_metadata ) diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index 0c97c8aa..f9a23816 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -29,6 +29,7 @@ defmodule Finch.HTTP1.Pool do def request(pool, req, acc, fun, opts) do pool_timeout = Keyword.get(opts, :pool_timeout, 5_000) receive_timeout = Keyword.get(opts, :receive_timeout, 15_000) + request_timeout = Keyword.get(opts, :request_timeout, :infinity) metadata = %{request: req, pool: pool} @@ -42,7 +43,8 @@ defmodule Finch.HTTP1.Pool do Telemetry.stop(:queue, start_time, metadata, %{idle_time: idle_time}) with {:ok, conn} <- Conn.connect(conn), - {:ok, conn, acc} <- Conn.request(conn, req, acc, fun, receive_timeout, idle_time) do + {:ok, conn, acc} <- + Conn.request(conn, req, acc, fun, receive_timeout, request_timeout, idle_time) do {{:ok, acc}, transfer_if_open(conn, state, from)} else {:error, conn, error} -> diff --git a/test/finch_test.exs b/test/finch_test.exs index 44c073b0..8c14d2dd 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -516,26 +516,32 @@ defmodule FinchTest do finch_name: finch_name } do start_supervised!({Finch, name: finch_name}) - timeout = 600 Bypass.expect(bypass, fn conn -> + Process.flag(:trap_exit, true) conn = Plug.Conn.send_chunked(conn, 200) - Enum.reduce(1..5, conn, fn _, conn -> + Enum.reduce_while(1..5, conn, fn _, conn -> Process.sleep(timeout - 100) - {_, conn} = Plug.Conn.chunk(conn, "chunk-data") - conn + + receive do + {:EXIT, _, _} -> {:halt, conn} + after + 0 -> + {_, conn} = Plug.Conn.chunk(conn, "chunk-data") + {:cont, conn} + end end) end) assert {:error, %{reason: :timeout}} = Finch.build(:get, endpoint(bypass)) - |> Finch.request(finch_name, receive_timeout: timeout) + |> Finch.request(finch_name, request_timeout: timeout) assert {:ok, %Response{}} = Finch.build(:get, endpoint(bypass)) - |> Finch.request(finch_name, receive_timeout: timeout * 10) + |> Finch.request(finch_name, request_timeout: timeout * 10) end test "returns error when requesting bad address", %{finch_name: finch_name} do