Skip to content

Commit

Permalink
Add Finch.stream_while/5
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtekmach committed Aug 21, 2023
1 parent d83efa7 commit 4e8885d
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 68 deletions.
76 changes: 72 additions & 4 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ defmodule Finch do
@type stream(acc) ::
({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary}, acc -> acc)

@typedoc """
The stream function given to `stream_while/5`.
"""
@type stream_while(acc) ::
({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary}, acc ->
{:cont, acc} | {:halt, acc})

@doc """
Start an instance of Finch.
Expand Down Expand Up @@ -268,6 +275,8 @@ defmodule Finch do
accumulator. The function must return a potentially updated
accumulator.
See also `stream_while/5`.
## Stream commands
* `{:status, status}` - the status of the http response
Expand Down Expand Up @@ -302,12 +311,71 @@ defmodule Finch do
{:ok, acc} | {:error, Exception.t()}
when acc: term()
def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do
fun =
fn entry, acc ->
{:cont, fun.(entry, acc)}
end

stream_while(req, name, acc, fun, opts)
end

@doc """
Streams an HTTP request until it finishes or `fun` returns `{:halt, acc}`.
A function of arity 2 is expected as argument. The first argument
is a tuple, as listed below, and the second argument is the
accumulator.
The function must return:
* `{:cont, acc}` to continue streaming
* `{:halt, acc}` to halt streaming
See also `stream/5`.
## Stream commands
* `{:status, status}` - the status of the http response
* `{:headers, headers}` - the headers of the http response
* `{:data, data}` - a streaming section of the http body
## Options
Shares options with `request/3`.
## Examples
path = "/tmp/big-file.zip"
file = File.open!(path, [:write, :exclusive])
url = "https://domain.com/url/big-file.zip"
request = Finch.build(:get, url)
Finch.stream_while(request, MyFinch, nil, fn
{:status, status}, acc ->
IO.inspect(status)
{:cont, acc}
{:headers, headers}, acc ->
IO.inspect(headers)
{:cont, acc}
{:data, data}, acc ->
IO.binwrite(file, data)
{:cont, acc}
end)
File.close(file)
"""
@spec stream_while(Request.t(), name(), acc, stream_while(acc), request_opts()) ::
{:ok, acc} | {:error, Exception.t()}
when acc: term()
def stream_while(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do
request_span req, name do
__stream__(req, name, acc, fun, opts)
end
end

defp __stream__(%Request{} = req, name, acc, fun, opts) when is_function(fun, 2) do
defp __stream__(%Request{} = req, name, acc, fun, opts) do
{pool, pool_mod} = get_pool(req, name)
pool_mod.request(pool, req, acc, fun, opts)
end
Expand All @@ -334,9 +402,9 @@ defmodule Finch do
acc = {nil, [], []}

fun = fn
{:status, value}, {_, headers, body} -> {value, headers, body}
{:headers, value}, {status, headers, body} -> {status, headers ++ value, body}
{:data, value}, {status, headers, body} -> {status, headers, [value | body]}
{:status, value}, {_, headers, body} -> {:cont, {value, headers, body}}
{:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}}
{:data, value}, {status, headers, body} -> {:cont, {status, headers, [value | body]}}
end

with {:ok, {status, headers, body}} <- __stream__(req, name, acc, fun, opts) do
Expand Down
90 changes: 60 additions & 30 deletions lib/finch/http1/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -214,40 +214,70 @@ defmodule Finch.Conn do
defp receive_response([entry | entries], acc, fun, mint, ref, timeout, status, headers) do
case entry do
{:status, ^ref, value} ->
receive_response(
entries,
fun.({:status, value}, acc),
fun,
mint,
ref,
timeout,
value,
headers
)
case fun.({:status, value}, acc) do
{:cont, acc} ->
receive_response(
entries,
acc,
fun,
mint,
ref,
timeout,
value,
headers
)

{:halt, acc} ->
{:ok, mint} = Mint.HTTP1.close(mint)
{:ok, mint, acc, {status, headers}}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{:headers, ^ref, value} ->
receive_response(
entries,
fun.({:headers, value}, acc),
fun,
mint,
ref,
timeout,
status,
headers ++ value
)
case fun.({:headers, value}, acc) do
{:cont, acc} ->
receive_response(
entries,
acc,
fun,
mint,
ref,
timeout,
status,
headers ++ value
)

{:halt, acc} ->
{:ok, mint} = Mint.HTTP1.close(mint)
{:ok, mint, acc, {status, headers}}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{:data, ^ref, value} ->
receive_response(
entries,
fun.({:data, value}, acc),
fun,
mint,
ref,
timeout,
status,
headers
)
case fun.({:data, value}, acc) do
{:cont, acc} ->
receive_response(
entries,
acc,
fun,
mint,
ref,
timeout,
status,
headers
)

{:halt, acc} ->
{:ok, mint} = Mint.HTTP1.close(mint)
{:ok, mint, acc, {status, headers}}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{:error, ^ref, error} ->
{:error, mint, error, {status, headers}}
Expand Down
2 changes: 1 addition & 1 deletion lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ defmodule Finch.HTTP1.Pool do
end

send(owner, {request_ref, response})
{owner, monitor, request_ref}
{:cont, {owner, monitor, request_ref}}
end

defp process_down?(monitor) do
Expand Down
75 changes: 54 additions & 21 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,31 +82,64 @@ defmodule Finch.HTTP2.Pool do
defp response_waiting_loop(acc, fun, request_ref, monitor_ref, fail_safe_timeout) do
receive do
{^request_ref, {:status, value}} ->
response_waiting_loop(
fun.({:status, value}, acc),
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)
case fun.({:status, value}, acc) do
{:cont, acc} ->
response_waiting_loop(
acc,
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)

{:halt, acc} ->
cancel_async_request(request_ref)
Process.demonitor(monitor_ref)
{:ok, acc}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{^request_ref, {:headers, value}} ->
response_waiting_loop(
fun.({:headers, value}, acc),
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)
case fun.({:headers, value}, acc) do
{:cont, acc} ->
response_waiting_loop(
acc,
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)

{:halt, acc} ->
cancel_async_request(request_ref)
Process.demonitor(monitor_ref)
{:ok, acc}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{^request_ref, {:data, value}} ->
response_waiting_loop(
fun.({:data, value}, acc),
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)
case fun.({:data, value}, acc) do
{:cont, acc} ->
response_waiting_loop(
acc,
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)

{:halt, acc} ->
cancel_async_request(request_ref)
Process.demonitor(monitor_ref)
{:ok, acc}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{^request_ref, :done} ->
Process.demonitor(monitor_ref)
Expand Down
5 changes: 3 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
"castore": {:hex, :castore, "0.1.14", "3f6d7c7c1574c402fef29559d3f1a7389ba3524bc6a090a5e9e6abc3af65dcca", [:mix], [], "hexpm", "b34af542eadb727e6c8b37fdf73e18b2e02eb483a4ea0b52fd500bc23f052b7b"},
"castore": {:hex, :castore, "1.0.3", "7130ba6d24c8424014194676d608cb989f62ef8039efd50ff4b3f33286d06db8", [:mix], [], "hexpm", "680ab01ef5d15b161ed6a95449fac5c6b8f60055677a8e79acf01b27baa4390b"},
"cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"},
Expand All @@ -11,12 +11,13 @@
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.28.4", "001a0ea6beac2f810f1abc3dbf4b123e9593eaa5f00dd13ded024eae7c523298", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bf85d003dd34911d89c8ddb8bda1a958af3471a274a4c2150a9c01c78ac3f8ed"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"},
"mint": {:hex, :mint, "1.4.0", "cd7d2451b201fc8e4a8fd86257fb3878d9e3752899eb67b0c5b25b180bde1212", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "10a99e144b815cbf8522dccbc8199d15802440fc7a64d67b6853adb6fa170217"},
"mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"},
"nimble_options": {:hex, :nimble_options, "1.0.0", "0aa403a873456a0720b7f1be0ff6ad0248a9d6efadd0a7e673adb1e0480c01da", [:mix], [], "hexpm", "20985bc07f7fa5b5a995937ca5258b4c8ef55e715274139ee687802e1b249267"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
Expand Down
8 changes: 4 additions & 4 deletions test/finch/http2/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ defmodule Finch.HTTP2.PoolTest do

assert_receive {^ref, {:status, 200}}
Finch.HTTP2.Pool.cancel_async_request(ref)
refute_receive {^ref, {:data, _}}
refute_receive _
end

test "canceled if calling process exits normally", %{test: finch_name} do
Expand Down Expand Up @@ -485,9 +485,9 @@ defmodule Finch.HTTP2.PoolTest do
acc = {nil, [], ""}

fun = fn
{:status, value}, {_, headers, body} -> {value, headers, body}
{:headers, value}, {status, headers, body} -> {status, headers ++ value, body}
{:data, value}, {status, headers, body} -> {status, headers, body <> value}
{:status, value}, {_, headers, body} -> {:cont, {value, headers, body}}
{:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}}
{:data, value}, {status, headers, body} -> {:cont, {status, headers, body <> value}}
end

Pool.request(pool, req, acc, fun, opts)
Expand Down
Loading

0 comments on commit 4e8885d

Please sign in to comment.