Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Finch.stream_while/5 #239

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ defmodule Finch do
@type stream(acc) ::
({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary}, acc -> acc)

@typedoc """
The stream function given to `stream_while/5`.
"""
@type stream_while(acc) ::
({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary}, acc ->
{:cont, acc} | {:halt, acc})

@doc """
Start an instance of Finch.

Expand Down Expand Up @@ -268,6 +275,8 @@ defmodule Finch do
accumulator. The function must return a potentially updated
accumulator.

See also `stream_while/5`.

## Stream commands

* `{:status, status}` - the status of the http response
Expand Down Expand Up @@ -302,12 +311,71 @@ defmodule Finch do
{:ok, acc} | {:error, Exception.t()}
when acc: term()
def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do
fun =
fn entry, acc ->
{:cont, fun.(entry, acc)}
end

stream_while(req, name, acc, fun, opts)
end

@doc """
Streams an HTTP request until it finishes or `fun` returns `{:halt, acc}`.

A function of arity 2 is expected as argument. The first argument
is a tuple, as listed below, and the second argument is the
accumulator.

The function must return:

* `{:cont, acc}` to continue streaming
* `{:halt, acc}` to halt streaming

See also `stream/5`.

## Stream commands

* `{:status, status}` - the status of the http response
* `{:headers, headers}` - the headers of the http response
* `{:data, data}` - a streaming section of the http body

## Options

Shares options with `request/3`.

## Examples

path = "/tmp/big-file.zip"
file = File.open!(path, [:write, :exclusive])
url = "https://domain.com/url/big-file.zip"
request = Finch.build(:get, url)

Finch.stream_while(request, MyFinch, nil, fn
{:status, status}, acc ->
IO.inspect(status)
{:cont, acc}

{:headers, headers}, acc ->
IO.inspect(headers)
{:cont, acc}

{:data, data}, acc ->
IO.binwrite(file, data)
{:cont, acc}
end)

File.close(file)
"""
@spec stream_while(Request.t(), name(), acc, stream_while(acc), request_opts()) ::
{:ok, acc} | {:error, Exception.t()}
when acc: term()
def stream_while(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do
request_span req, name do
__stream__(req, name, acc, fun, opts)
end
end

defp __stream__(%Request{} = req, name, acc, fun, opts) when is_function(fun, 2) do
defp __stream__(%Request{} = req, name, acc, fun, opts) do
{pool, pool_mod} = get_pool(req, name)
pool_mod.request(pool, req, acc, fun, opts)
end
Expand All @@ -334,9 +402,9 @@ defmodule Finch do
acc = {nil, [], []}

fun = fn
{:status, value}, {_, headers, body} -> {value, headers, body}
{:headers, value}, {status, headers, body} -> {status, headers ++ value, body}
{:data, value}, {status, headers, body} -> {status, headers, [value | body]}
{:status, value}, {_, headers, body} -> {:cont, {value, headers, body}}
{:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}}
{:data, value}, {status, headers, body} -> {:cont, {status, headers, [value | body]}}
end

with {:ok, {status, headers, body}} <- __stream__(req, name, acc, fun, opts) do
Expand Down
90 changes: 60 additions & 30 deletions lib/finch/http1/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -214,40 +214,70 @@ defmodule Finch.Conn do
defp receive_response([entry | entries], acc, fun, mint, ref, timeout, status, headers) do
case entry do
{:status, ^ref, value} ->
receive_response(
entries,
fun.({:status, value}, acc),
fun,
mint,
ref,
timeout,
value,
headers
)
case fun.({:status, value}, acc) do
{:cont, acc} ->
receive_response(
entries,
acc,
fun,
mint,
ref,
timeout,
value,
headers
)

{:halt, acc} ->
{:ok, mint} = Mint.HTTP1.close(mint)
{:ok, mint, acc, {status, headers}}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{:headers, ^ref, value} ->
receive_response(
entries,
fun.({:headers, value}, acc),
fun,
mint,
ref,
timeout,
status,
headers ++ value
)
case fun.({:headers, value}, acc) do
{:cont, acc} ->
receive_response(
entries,
acc,
fun,
mint,
ref,
timeout,
status,
headers ++ value
)

{:halt, acc} ->
{:ok, mint} = Mint.HTTP1.close(mint)
{:ok, mint, acc, {status, headers}}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{:data, ^ref, value} ->
receive_response(
entries,
fun.({:data, value}, acc),
fun,
mint,
ref,
timeout,
status,
headers
)
case fun.({:data, value}, acc) do
{:cont, acc} ->
receive_response(
entries,
acc,
fun,
mint,
ref,
timeout,
status,
headers
)

{:halt, acc} ->
{:ok, mint} = Mint.HTTP1.close(mint)
{:ok, mint, acc, {status, headers}}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{:error, ^ref, error} ->
{:error, mint, error, {status, headers}}
Expand Down
2 changes: 1 addition & 1 deletion lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ defmodule Finch.HTTP1.Pool do
end

send(owner, {request_ref, response})
{owner, monitor, request_ref}
{:cont, {owner, monitor, request_ref}}
end

defp process_down?(monitor) do
Expand Down
75 changes: 54 additions & 21 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,31 +82,64 @@ defmodule Finch.HTTP2.Pool do
defp response_waiting_loop(acc, fun, request_ref, monitor_ref, fail_safe_timeout) do
receive do
{^request_ref, {:status, value}} ->
response_waiting_loop(
fun.({:status, value}, acc),
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)
case fun.({:status, value}, acc) do
{:cont, acc} ->
response_waiting_loop(
acc,
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)

{:halt, acc} ->
cancel_async_request(request_ref)
Process.demonitor(monitor_ref)
{:ok, acc}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{^request_ref, {:headers, value}} ->
response_waiting_loop(
fun.({:headers, value}, acc),
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)
case fun.({:headers, value}, acc) do
{:cont, acc} ->
response_waiting_loop(
acc,
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)

{:halt, acc} ->
cancel_async_request(request_ref)
Process.demonitor(monitor_ref)
{:ok, acc}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{^request_ref, {:data, value}} ->
response_waiting_loop(
fun.({:data, value}, acc),
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)
case fun.({:data, value}, acc) do
{:cont, acc} ->
response_waiting_loop(
acc,
fun,
request_ref,
monitor_ref,
fail_safe_timeout
)

{:halt, acc} ->
cancel_async_request(request_ref)
Process.demonitor(monitor_ref)
{:ok, acc}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{^request_ref, :done} ->
Process.demonitor(monitor_ref)
Expand Down
5 changes: 3 additions & 2 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
"castore": {:hex, :castore, "0.1.14", "3f6d7c7c1574c402fef29559d3f1a7389ba3524bc6a090a5e9e6abc3af65dcca", [:mix], [], "hexpm", "b34af542eadb727e6c8b37fdf73e18b2e02eb483a4ea0b52fd500bc23f052b7b"},
"castore": {:hex, :castore, "1.0.3", "7130ba6d24c8424014194676d608cb989f62ef8039efd50ff4b3f33286d06db8", [:mix], [], "hexpm", "680ab01ef5d15b161ed6a95449fac5c6b8f60055677a8e79acf01b27baa4390b"},
"cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"},
Expand All @@ -11,12 +11,13 @@
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.28.4", "001a0ea6beac2f810f1abc3dbf4b123e9593eaa5f00dd13ded024eae7c523298", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bf85d003dd34911d89c8ddb8bda1a958af3471a274a4c2150a9c01c78ac3f8ed"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"},
"mint": {:hex, :mint, "1.4.0", "cd7d2451b201fc8e4a8fd86257fb3878d9e3752899eb67b0c5b25b180bde1212", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "10a99e144b815cbf8522dccbc8199d15802440fc7a64d67b6853adb6fa170217"},
"mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"},
"nimble_options": {:hex, :nimble_options, "1.0.0", "0aa403a873456a0720b7f1be0ff6ad0248a9d6efadd0a7e673adb1e0480c01da", [:mix], [], "hexpm", "20985bc07f7fa5b5a995937ca5258b4c8ef55e715274139ee687802e1b249267"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
Expand Down
8 changes: 4 additions & 4 deletions test/finch/http2/pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ defmodule Finch.HTTP2.PoolTest do

assert_receive {^ref, {:status, 200}}
Finch.HTTP2.Pool.cancel_async_request(ref)
refute_receive {^ref, {:data, _}}
refute_receive _
end

test "canceled if calling process exits normally", %{test: finch_name} do
Expand Down Expand Up @@ -485,9 +485,9 @@ defmodule Finch.HTTP2.PoolTest do
acc = {nil, [], ""}

fun = fn
{:status, value}, {_, headers, body} -> {value, headers, body}
{:headers, value}, {status, headers, body} -> {status, headers ++ value, body}
{:data, value}, {status, headers, body} -> {status, headers, body <> value}
{:status, value}, {_, headers, body} -> {:cont, {value, headers, body}}
{:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}}
{:data, value}, {status, headers, body} -> {:cont, {status, headers, body <> value}}
end

Pool.request(pool, req, acc, fun, opts)
Expand Down
Loading
Loading