From 4e8885d63016c4812dd193c49e1a501920e04e3b Mon Sep 17 00:00:00 2001 From: Wojtek Mach Date: Mon, 21 Aug 2023 14:55:10 +0200 Subject: [PATCH] Add `Finch.stream_while/5` --- lib/finch.ex | 76 +++++++++++- lib/finch/http1/conn.ex | 90 +++++++++----- lib/finch/http1/pool.ex | 2 +- lib/finch/http2/pool.ex | 75 ++++++++---- mix.lock | 5 +- test/finch/http2/pool_test.exs | 8 +- test/finch_test.exs | 195 ++++++++++++++++++++++++++++++ test/support/mock_http2_server.ex | 11 +- 8 files changed, 394 insertions(+), 68 deletions(-) diff --git a/lib/finch.ex b/lib/finch.ex index f5fd7cd2..6ae79884 100644 --- a/lib/finch.ex +++ b/lib/finch.ex @@ -102,6 +102,13 @@ defmodule Finch do @type stream(acc) :: ({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary}, acc -> acc) + @typedoc """ + The stream function given to `stream_while/5`. + """ + @type stream_while(acc) :: + ({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary}, acc -> + {:cont, acc} | {:halt, acc}) + @doc """ Start an instance of Finch. @@ -268,6 +275,8 @@ defmodule Finch do accumulator. The function must return a potentially updated accumulator. + See also `stream_while/5`. + ## Stream commands * `{:status, status}` - the status of the http response @@ -302,12 +311,71 @@ defmodule Finch do {:ok, acc} | {:error, Exception.t()} when acc: term() def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do + fun = + fn entry, acc -> + {:cont, fun.(entry, acc)} + end + + stream_while(req, name, acc, fun, opts) + end + + @doc """ + Streams an HTTP request until it finishes or `fun` returns `{:halt, acc}`. + + A function of arity 2 is expected as argument. The first argument + is a tuple, as listed below, and the second argument is the + accumulator. + + The function must return: + + * `{:cont, acc}` to continue streaming + * `{:halt, acc}` to halt streaming + + See also `stream/5`. + + ## Stream commands + + * `{:status, status}` - the status of the http response + * `{:headers, headers}` - the headers of the http response + * `{:data, data}` - a streaming section of the http body + + ## Options + + Shares options with `request/3`. + + ## Examples + + path = "/tmp/big-file.zip" + file = File.open!(path, [:write, :exclusive]) + url = "https://domain.com/url/big-file.zip" + request = Finch.build(:get, url) + + Finch.stream_while(request, MyFinch, nil, fn + {:status, status}, acc -> + IO.inspect(status) + {:cont, acc} + + {:headers, headers}, acc -> + IO.inspect(headers) + {:cont, acc} + + {:data, data}, acc -> + IO.binwrite(file, data) + {:cont, acc} + end) + + File.close(file) + """ + @spec stream_while(Request.t(), name(), acc, stream_while(acc), request_opts()) :: + {:ok, acc} | {:error, Exception.t()} + when acc: term() + def stream_while(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do request_span req, name do __stream__(req, name, acc, fun, opts) end end - defp __stream__(%Request{} = req, name, acc, fun, opts) when is_function(fun, 2) do + defp __stream__(%Request{} = req, name, acc, fun, opts) do {pool, pool_mod} = get_pool(req, name) pool_mod.request(pool, req, acc, fun, opts) end @@ -334,9 +402,9 @@ defmodule Finch do acc = {nil, [], []} fun = fn - {:status, value}, {_, headers, body} -> {value, headers, body} - {:headers, value}, {status, headers, body} -> {status, headers ++ value, body} - {:data, value}, {status, headers, body} -> {status, headers, [value | body]} + {:status, value}, {_, headers, body} -> {:cont, {value, headers, body}} + {:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}} + {:data, value}, {status, headers, body} -> {:cont, {status, headers, [value | body]}} end with {:ok, {status, headers, body}} <- __stream__(req, name, acc, fun, opts) do diff --git a/lib/finch/http1/conn.ex b/lib/finch/http1/conn.ex index 903f27cb..f00a22c4 100644 --- a/lib/finch/http1/conn.ex +++ b/lib/finch/http1/conn.ex @@ -214,40 +214,70 @@ defmodule Finch.Conn do defp receive_response([entry | entries], acc, fun, mint, ref, timeout, status, headers) do case entry do {:status, ^ref, value} -> - receive_response( - entries, - fun.({:status, value}, acc), - fun, - mint, - ref, - timeout, - value, - headers - ) + case fun.({:status, value}, acc) do + {:cont, acc} -> + receive_response( + entries, + acc, + fun, + mint, + ref, + timeout, + value, + headers + ) + + {:halt, acc} -> + {:ok, mint} = Mint.HTTP1.close(mint) + {:ok, mint, acc, {status, headers}} + + other -> + raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" + end {:headers, ^ref, value} -> - receive_response( - entries, - fun.({:headers, value}, acc), - fun, - mint, - ref, - timeout, - status, - headers ++ value - ) + case fun.({:headers, value}, acc) do + {:cont, acc} -> + receive_response( + entries, + acc, + fun, + mint, + ref, + timeout, + status, + headers ++ value + ) + + {:halt, acc} -> + {:ok, mint} = Mint.HTTP1.close(mint) + {:ok, mint, acc, {status, headers}} + + other -> + raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" + end {:data, ^ref, value} -> - receive_response( - entries, - fun.({:data, value}, acc), - fun, - mint, - ref, - timeout, - status, - headers - ) + case fun.({:data, value}, acc) do + {:cont, acc} -> + receive_response( + entries, + acc, + fun, + mint, + ref, + timeout, + status, + headers + ) + + {:halt, acc} -> + {:ok, mint} = Mint.HTTP1.close(mint) + {:ok, mint, acc, {status, headers}} + + other -> + raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" + end {:error, ^ref, error} -> {:error, mint, error, {status, headers}} diff --git a/lib/finch/http1/pool.ex b/lib/finch/http1/pool.ex index b89609a3..0c97c8aa 100644 --- a/lib/finch/http1/pool.ex +++ b/lib/finch/http1/pool.ex @@ -98,7 +98,7 @@ defmodule Finch.HTTP1.Pool do end send(owner, {request_ref, response}) - {owner, monitor, request_ref} + {:cont, {owner, monitor, request_ref}} end defp process_down?(monitor) do diff --git a/lib/finch/http2/pool.ex b/lib/finch/http2/pool.ex index 10e1709a..da6390af 100644 --- a/lib/finch/http2/pool.ex +++ b/lib/finch/http2/pool.ex @@ -82,31 +82,64 @@ defmodule Finch.HTTP2.Pool do defp response_waiting_loop(acc, fun, request_ref, monitor_ref, fail_safe_timeout) do receive do {^request_ref, {:status, value}} -> - response_waiting_loop( - fun.({:status, value}, acc), - fun, - request_ref, - monitor_ref, - fail_safe_timeout - ) + case fun.({:status, value}, acc) do + {:cont, acc} -> + response_waiting_loop( + acc, + fun, + request_ref, + monitor_ref, + fail_safe_timeout + ) + + {:halt, acc} -> + cancel_async_request(request_ref) + Process.demonitor(monitor_ref) + {:ok, acc} + + other -> + raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" + end {^request_ref, {:headers, value}} -> - response_waiting_loop( - fun.({:headers, value}, acc), - fun, - request_ref, - monitor_ref, - fail_safe_timeout - ) + case fun.({:headers, value}, acc) do + {:cont, acc} -> + response_waiting_loop( + acc, + fun, + request_ref, + monitor_ref, + fail_safe_timeout + ) + + {:halt, acc} -> + cancel_async_request(request_ref) + Process.demonitor(monitor_ref) + {:ok, acc} + + other -> + raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" + end {^request_ref, {:data, value}} -> - response_waiting_loop( - fun.({:data, value}, acc), - fun, - request_ref, - monitor_ref, - fail_safe_timeout - ) + case fun.({:data, value}, acc) do + {:cont, acc} -> + response_waiting_loop( + acc, + fun, + request_ref, + monitor_ref, + fail_safe_timeout + ) + + {:halt, acc} -> + cancel_async_request(request_ref) + Process.demonitor(monitor_ref) + {:ok, acc} + + other -> + raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}" + end {^request_ref, :done} -> Process.demonitor(monitor_ref) diff --git a/mix.lock b/mix.lock index ecda5e8f..a65e4c21 100644 --- a/mix.lock +++ b/mix.lock @@ -1,7 +1,7 @@ %{ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, - "castore": {:hex, :castore, "0.1.14", "3f6d7c7c1574c402fef29559d3f1a7389ba3524bc6a090a5e9e6abc3af65dcca", [:mix], [], "hexpm", "b34af542eadb727e6c8b37fdf73e18b2e02eb483a4ea0b52fd500bc23f052b7b"}, + "castore": {:hex, :castore, "1.0.3", "7130ba6d24c8424014194676d608cb989f62ef8039efd50ff4b3f33286d06db8", [:mix], [], "hexpm", "680ab01ef5d15b161ed6a95449fac5c6b8f60055677a8e79acf01b27baa4390b"}, "cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"}, @@ -11,12 +11,13 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.28.4", "001a0ea6beac2f810f1abc3dbf4b123e9593eaa5f00dd13ded024eae7c523298", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bf85d003dd34911d89c8ddb8bda1a958af3471a274a4c2150a9c01c78ac3f8ed"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"}, - "mint": {:hex, :mint, "1.4.0", "cd7d2451b201fc8e4a8fd86257fb3878d9e3752899eb67b0c5b25b180bde1212", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "10a99e144b815cbf8522dccbc8199d15802440fc7a64d67b6853adb6fa170217"}, + "mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"}, "nimble_options": {:hex, :nimble_options, "1.0.0", "0aa403a873456a0720b7f1be0ff6ad0248a9d6efadd0a7e673adb1e0480c01da", [:mix], [], "hexpm", "20985bc07f7fa5b5a995937ca5258b4c8ef55e715274139ee687802e1b249267"}, "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, "nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"}, diff --git a/test/finch/http2/pool_test.exs b/test/finch/http2/pool_test.exs index 8fa56555..e6212dc3 100644 --- a/test/finch/http2/pool_test.exs +++ b/test/finch/http2/pool_test.exs @@ -307,7 +307,7 @@ defmodule Finch.HTTP2.PoolTest do assert_receive {^ref, {:status, 200}} Finch.HTTP2.Pool.cancel_async_request(ref) - refute_receive {^ref, {:data, _}} + refute_receive _ end test "canceled if calling process exits normally", %{test: finch_name} do @@ -485,9 +485,9 @@ defmodule Finch.HTTP2.PoolTest do acc = {nil, [], ""} fun = fn - {:status, value}, {_, headers, body} -> {value, headers, body} - {:headers, value}, {status, headers, body} -> {status, headers ++ value, body} - {:data, value}, {status, headers, body} -> {status, headers, body <> value} + {:status, value}, {_, headers, body} -> {:cont, {value, headers, body}} + {:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}} + {:data, value}, {status, headers, body} -> {:cont, {status, headers, body <> value}} end Pool.request(pool, req, acc, fun, opts) diff --git a/test/finch_test.exs b/test/finch_test.exs index cf02b19f..cf7f67ca 100644 --- a/test/finch_test.exs +++ b/test/finch_test.exs @@ -724,6 +724,201 @@ defmodule FinchTest do end end + describe "stream_while/5" do + test "successful get request with HTTP/1", %{bypass: bypass, finch_name: finch_name} do + start_supervised!({Finch, name: finch_name}) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + acc = {nil, [], ""} + + fun = fn + {:status, value}, {_, headers, body} -> {:cont, {value, headers, body}} + {:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}} + {:data, value}, {status, headers, body} -> {:cont, {status, headers, body <> value}} + end + + assert {:ok, {200, [_ | _], "OK"}} = + Finch.build(:get, endpoint(bypass)) + |> Finch.stream_while(finch_name, acc, fun) + end + + defmodule InfiniteStream do + def init(options), do: options + + def call(conn, []) do + conn = Plug.Conn.send_chunked(conn, 200) + + Enum.reduce(Stream.cycle(["chunk"]), conn, fn chunk, conn -> + {:ok, conn} = Plug.Conn.chunk(conn, chunk) + conn + end) + end + end + + test "function halts on HTTP/1", %{test: test, finch_name: finch_name} do + start_supervised!({Finch, name: finch_name}) + + # Start custom server, as opposed to Bypass, because the latter would complain when + # it outlives test process. + start_supervised!({Plug.Cowboy, scheme: :http, plug: InfiniteStream, port: 0, ref: test}) + url = "http://localhost:#{:ranch.get_port(test)}" + + acc = {nil, [], ""} + + fun = fn + {:status, value}, {_, headers, body} -> {:halt, {value, headers, body}} + end + + assert {:ok, {200, [], ""}} = + Finch.build(:get, url) + |> Finch.stream_while(finch_name, acc, fun) + + fun = fn + {:status, value}, {_, headers, body} -> {:cont, {value, headers, body}} + {:headers, value}, {status, headers, body} -> {:halt, {status, headers ++ value, body}} + end + + assert {:ok, {200, [_ | _], ""}} = + Finch.build(:get, url) + |> Finch.stream_while(finch_name, acc, fun) + + fun = fn + {:status, value}, {_, headers, body} -> {:cont, {value, headers, body}} + {:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}} + {:data, value}, {status, headers, body} -> {:halt, {status, headers, body <> value}} + end + + assert {:ok, {200, [_ | _], "chunk"}} = + Finch.build(:get, url) + |> Finch.stream_while(finch_name, acc, fun) + end + + test "invalid return value on HTTP/1", %{bypass: bypass, finch_name: finch_name} do + start_supervised!({Finch, name: finch_name}) + + Bypass.stub(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + acc = {nil, [], ""} + + fun = fn + {:status, _value}, _acc -> :bad + end + + assert_raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: :bad", fn -> + Finch.build(:get, endpoint(bypass)) + |> Finch.stream_while(finch_name, acc, fun) + end + end + + test "successful get request with HTTP/2", %{bypass: bypass, finch_name: finch_name} do + start_supervised!( + {Finch, + name: finch_name, + pools: %{ + default: [ + protocol: :http2 + ] + }} + ) + + Bypass.expect_once(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + acc = {nil, [], ""} + + fun = fn + {:status, value}, {_, headers, body} -> {:cont, {value, headers, body}} + {:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}} + {:data, value}, {status, headers, body} -> {:cont, {status, headers, body <> value}} + end + + assert {:ok, {200, [_ | _], "OK"}} = + Finch.build(:get, endpoint(bypass)) + |> Finch.stream_while(finch_name, acc, fun) + end + + test "function halts on HTTP/2", %{test: test, finch_name: finch_name} do + start_supervised!( + {Finch, + name: finch_name, + pools: %{ + default: [ + protocol: :http2 + ] + }} + ) + + # Start custom server, as opposed to Bypass, because the latter would complain when + # it outlives test process. + start_supervised!({Plug.Cowboy, scheme: :http, plug: InfiniteStream, port: 0, ref: test}) + url = "http://localhost:#{:ranch.get_port(test)}" + + acc = {nil, [], ""} + + fun = fn + {:status, value}, {_, headers, body} -> {:halt, {value, headers, body}} + end + + assert {:ok, {200, [], ""}} = + Finch.build(:get, url) + |> Finch.stream_while(finch_name, acc, fun) + + fun = fn + {:status, value}, {_, headers, body} -> {:cont, {value, headers, body}} + {:headers, value}, {status, headers, body} -> {:halt, {status, headers ++ value, body}} + end + + assert {:ok, {200, [_ | _], ""}} = + Finch.build(:get, url) + |> Finch.stream_while(finch_name, acc, fun) + + fun = fn + {:status, value}, {_, headers, body} -> {:cont, {value, headers, body}} + {:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}} + {:data, value}, {status, headers, body} -> {:halt, {status, headers, body <> value}} + end + + assert {:ok, {200, [_ | _], "chunk"}} = + Finch.build(:get, url) + |> Finch.stream_while(finch_name, acc, fun) + + Process.sleep(5000) + end + + test "invalid return value on HTTP/2", %{bypass: bypass, finch_name: finch_name} do + start_supervised!( + {Finch, + name: finch_name, + pools: %{ + default: [ + protocol: :http2 + ] + }} + ) + + Bypass.stub(bypass, "GET", "/", fn conn -> + Plug.Conn.send_resp(conn, 200, "OK") + end) + + acc = {nil, [], ""} + + fun = fn + {:status, _value}, _acc -> :bad + end + + assert_raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: :bad", fn -> + Finch.build(:get, endpoint(bypass)) + |> Finch.stream_while(finch_name, acc, fun) + end + end + end + describe "async_request/3 with HTTP/1" do test "sends response messages to calling process", %{bypass: bypass, finch_name: finch_name} do start_supervised!({Finch, name: finch_name}) diff --git a/test/support/mock_http2_server.ex b/test/support/mock_http2_server.ex index 0577f246..3ed2575b 100644 --- a/test/support/mock_http2_server.ex +++ b/test/support/mock_http2_server.ex @@ -1,8 +1,7 @@ defmodule Finch.MockHTTP2Server do @moduledoc false import ExUnit.Assertions - - alias Mint.{HTTP2.Frame, HTTP2.HPACK} + alias Mint.HTTP2.Frame defstruct [:socket, :encode_table, :decode_table] @@ -35,8 +34,8 @@ defmodule Finch.MockHTTP2Server do server = %__MODULE__{ socket: server_socket, - encode_table: HPACK.new(4096), - decode_table: HPACK.new(4096) + encode_table: HPAX.new(4096), + decode_table: HPAX.new(4096) } {result, server} @@ -84,14 +83,14 @@ defmodule Finch.MockHTTP2Server do @spec encode_headers(%__MODULE__{}, Mint.Types.headers()) :: {%__MODULE__{}, hbf :: binary()} def encode_headers(%__MODULE__{} = server, headers) when is_list(headers) do headers = for {name, value} <- headers, do: {:store_name, name, value} - {hbf, encode_table} = HPACK.encode(headers, server.encode_table) + {hbf, encode_table} = HPAX.encode(headers, server.encode_table) server = put_in(server.encode_table, encode_table) {server, IO.iodata_to_binary(hbf)} end @spec decode_headers(%__MODULE__{}, binary()) :: {%__MODULE__{}, Mint.Types.headers()} def decode_headers(%__MODULE__{} = server, hbf) when is_binary(hbf) do - assert {:ok, headers, decode_table} = HPACK.decode(hbf, server.decode_table) + assert {:ok, headers, decode_table} = HPAX.decode(hbf, server.decode_table) server = put_in(server.decode_table, decode_table) {server, headers} end