From c7864cc476ee42d174c0fd670c5f81618c756e76 Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Wed, 2 Oct 2024 08:57:28 +0200 Subject: [PATCH] Allow the worker to send timestamps for Runs and Steps (#2529) * Move authentication to WorkerSocket - Improve tests and test helps for channels/sockets - Bump ws-worker to 1.6.4 for local dev * cast timestamps from the worker * make credo happy * fix failing tests * Move worker authentication to socket Refactor tests Bump ws-worker * upgrade worker version * add updated_at to runs * fix failing janitor test * fix failing run lost test * update worker version * update changelog --------- Co-authored-by: Frank Midigo Co-authored-by: Midigo Frank <39288959+midigofrank@users.noreply.github.com> --- CHANGELOG.md | 3 + assets/package-lock.json | 16 +- assets/package.json | 2 +- lib/lightning/invocation/step.ex | 20 +- lib/lightning/runs.ex | 31 +- lib/lightning/runs/handlers.ex | 142 ++++++- lib/lightning/runs/run.ex | 16 +- lib/lightning_web/channels/run_channel.ex | 34 +- lib/lightning_web/channels/worker_channel.ex | 14 +- lib/lightning_web/channels/worker_socket.ex | 16 +- .../20241001053525_add_updated_at_to_runs.exs | 36 ++ test/integration/web_and_worker_test.exs | 2 +- test/lightning/failure_alert_test.exs | 4 +- test/lightning/runs_test.exs | 12 +- .../channels/run_channel_test.exs | 375 +++++++++++------- .../channels/worker_channel_test.exs | 19 +- .../channels/worker_socket_test.exs | 34 ++ .../lightning_web/live/run_live/show_test.exs | 2 +- .../live/workflow_live/editor_test.exs | 2 +- test/support/test_utils.ex | 26 ++ 20 files changed, 542 insertions(+), 264 deletions(-) create mode 100644 priv/repo/migrations/20241001053525_add_updated_at_to_runs.exs create mode 100644 test/lightning_web/channels/worker_socket_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index c7ec56714c..17a8bca91f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,9 @@ and this project adheres to ### Changed +- Use timestamps sent from worker when starting and completing runs + [#2434](https://github.com/OpenFn/lightning/issues/2434) + ### Fixed - User email change: Add debounce on blur to input forms to avoid validation after every keystroke diff --git a/assets/package-lock.json b/assets/package-lock.json index 38caba42d7..a92abc81a8 100644 --- a/assets/package-lock.json +++ b/assets/package-lock.json @@ -34,7 +34,7 @@ "zustand": "^4.3.7" }, "devDependencies": { - "@openfn/ws-worker": "^1.6", + "@openfn/ws-worker": "^1.7.0", "@types/marked": "^4.0.8", "@types/react": "^18.0.15", "@types/react-dom": "^18.0.6", @@ -600,9 +600,9 @@ } }, "node_modules/@openfn/engine-multi": { - "version": "1.2.5", - "resolved": "https://registry.npmjs.org/@openfn/engine-multi/-/engine-multi-1.2.5.tgz", - "integrity": "sha512-S5I0PcJKo767NC2sevedR51CHXZXwcRBt9prNrWUs8sjQiDpPcgp05eDHQl1wz05YQ2+dJ3DWfnwQbawqhUqZQ==", + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/@openfn/engine-multi/-/engine-multi-1.3.0.tgz", + "integrity": "sha512-ZZI9zc/zIjt1VSRLysTqhvHubb2WQqVjtFiDht/p8umW9QIf2GcTLfCfaZYFgOsDlBxJpzoXWCu/tYcg+L40SQ==", "dev": true, "dependencies": { "@openfn/compiler": "0.3.3", @@ -652,13 +652,13 @@ } }, "node_modules/@openfn/ws-worker": { - "version": "1.6.7", - "resolved": "https://registry.npmjs.org/@openfn/ws-worker/-/ws-worker-1.6.7.tgz", - "integrity": "sha512-MQ1JDgFT+7no1S8t/iCaBjqDI31cyRwrswUQM1RFqLfGjzJRJyZ0pSONK/ZUmyd+KPeCskSwuDWyum//9RT7Tw==", + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/@openfn/ws-worker/-/ws-worker-1.7.0.tgz", + "integrity": "sha512-0vCu9pNvsVE/EwhfyW5I04cHHk7OUVbTavoZgbKEViIeIQL0F8pNes5PD1a/bBW7NdYuMLEd74ybLZjlmEeIUw==", "dev": true, "dependencies": { "@koa/router": "^12.0.0", - "@openfn/engine-multi": "1.2.5", + "@openfn/engine-multi": "1.3.0", "@openfn/lexicon": "^1.1.0", "@openfn/logger": "1.0.2", "@openfn/runtime": "1.4.2", diff --git a/assets/package.json b/assets/package.json index 4010d780a1..b7f0e6f6ff 100644 --- a/assets/package.json +++ b/assets/package.json @@ -36,7 +36,7 @@ "zustand": "^4.3.7" }, "devDependencies": { - "@openfn/ws-worker": "^1.6", + "@openfn/ws-worker": "^1.7.0", "@types/marked": "^4.0.8", "@types/react": "^18.0.15", "@types/react-dom": "^18.0.6", diff --git a/lib/lightning/invocation/step.ex b/lib/lightning/invocation/step.ex index a2df6b6348..e1efad83f3 100644 --- a/lib/lightning/invocation/step.ex +++ b/lib/lightning/invocation/step.ex @@ -62,18 +62,14 @@ defmodule Lightning.Invocation.Step do |> validate() end - def finished( - step, - output_dataclip_id, - # Should this be a specified type? - {exit_reason, error_type, _error_message} - ) do - change(step, %{ - finished_at: DateTime.utc_now(), - output_dataclip_id: output_dataclip_id, - exit_reason: exit_reason, - error_type: error_type - }) + def finished(step, params) do + step + |> cast(params, [ + :output_dataclip_id, + :exit_reason, + :error_type, + :finished_at + ]) |> validate_required([:finished_at, :exit_reason]) end diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index 5116c6e5c3..b00e0debce 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -183,23 +183,14 @@ defmodule Lightning.Runs do |> Repo.one() end - def start_run(%Run{} = run) do - Run.start(run) - |> update_run() - |> tap(&track_run_queue_delay/1) + def start_run(%Run{} = run, params \\ %{}) do + Handlers.StartRun.call(run, params) end @spec complete_run(Run.t(), %{optional(any()) => any()}) :: {:ok, Run.t()} | {:error, Ecto.Changeset.t(Run.t())} def complete_run(run, params) do - Run.complete(run, params) - |> case do - %{valid?: false} = changeset -> - {:error, changeset} - - changeset -> - changeset |> update_run() - end + Handlers.CompleteRun.call(run, params) end @spec update_run(Ecto.Changeset.t(Run.t())) :: @@ -314,7 +305,7 @@ defmodule Lightning.Runs do end) Repo.transaction(fn -> - complete_run(run, %{state: :lost, error_type: error_type}) + {:ok, _run} = complete_run(run, %{state: "lost", error_type: error_type}) Ecto.assoc(run, :steps) |> where([r], is_nil(r.exit_reason)) @@ -337,18 +328,4 @@ defmodule Lightning.Runs do |> order_by([{^order, :timestamp}]) |> Repo.stream() end - - defp track_run_queue_delay({:ok, run}) do - %Run{inserted_at: inserted_at, started_at: started_at} = run - - delay = DateTime.diff(started_at, inserted_at, :millisecond) - - :telemetry.execute( - [:domain, :run, :queue], - %{delay: delay}, - %{} - ) - end - - defp track_run_queue_delay({:error, _changeset}), do: nil end diff --git a/lib/lightning/runs/handlers.ex b/lib/lightning/runs/handlers.ex index 7bff28f182..970982288b 100644 --- a/lib/lightning/runs/handlers.ex +++ b/lib/lightning/runs/handlers.ex @@ -11,6 +11,117 @@ defmodule Lightning.Runs.Handlers do alias Lightning.RunStep alias Lightning.WorkOrders + defmodule StartRun do + @moduledoc """ + Schema to validate the input attributes of a started run. + """ + use Lightning.Schema + + import Lightning.ChangesetUtils + + @primary_key false + embedded_schema do + field :timestamp, Lightning.UnixDateTime + end + + def call(run, params) do + with {:ok, start_run} <- params |> new() |> apply_action(:validate) do + run + |> Run.start(to_run_params(start_run)) + |> Runs.update_run() + |> tap(&track_run_queue_delay/1) + end + end + + def new(params) do + %__MODULE__{} + |> cast(params, [:timestamp]) + |> put_new_change(:timestamp, DateTime.utc_now()) + |> validate_required([:timestamp]) + end + + defp to_run_params(%__MODULE__{} = start_run) do + %{started_at: start_run.timestamp} + end + + defp track_run_queue_delay({:ok, run}) do + %Run{inserted_at: inserted_at, started_at: started_at} = run + + delay = DateTime.diff(started_at, inserted_at, :millisecond) + + :telemetry.execute( + [:domain, :run, :queue], + %{delay: delay}, + %{} + ) + end + + defp track_run_queue_delay({:error, _changeset}), do: nil + end + + defmodule CompleteRun do + @moduledoc """ + Schema to validate the input attributes of a completed run. + """ + use Lightning.Schema + + import Lightning.ChangesetUtils + + @primary_key false + embedded_schema do + field :state, :string + field :reason, :string + field :error_type, :string + field :timestamp, Lightning.UnixDateTime + end + + def call(run, params) do + with {:ok, complete_run} <- params |> new() |> apply_action(:validate) do + run + |> Run.complete(to_run_params(complete_run)) + |> case do + %{valid?: false} = changeset -> + {:error, changeset} + + changeset -> + Runs.update_run(changeset) + end + end + end + + def new(params) do + %__MODULE__{} + |> cast(params, [:state, :reason, :error_type, :timestamp]) + |> put_new_change(:timestamp, DateTime.utc_now()) + |> then(fn changeset -> + if reason = get_change(changeset, :reason) do + put_new_change(changeset, :state, reason_to_state(reason)) + else + changeset + end + end) + |> validate_required([:state, :timestamp]) + end + + defp reason_to_state(reason) do + case reason do + "ok" -> :success + "fail" -> :failed + "crash" -> :crashed + "cancel" -> :cancelled + "kill" -> :killed + "exception" -> :exception + unknown -> unknown + end + end + + defp to_run_params(%__MODULE__{} = complete_run) do + complete_run + |> Map.take([:state, :error_type]) + |> Map.put(:finished_at, complete_run.timestamp) + end + end + defmodule StartStep do @moduledoc """ Schema to validate the input attributes of a started step. @@ -27,7 +138,7 @@ defmodule Lightning.Runs.Handlers do field :job_id, Ecto.UUID field :run_id, Ecto.UUID field :snapshot_id, Ecto.UUID - field :started_at, :utc_datetime_usec + field :timestamp, Lightning.UnixDateTime field :step_id, Ecto.UUID end @@ -50,17 +161,17 @@ defmodule Lightning.Runs.Handlers do :credential_id, :input_dataclip_id, :job_id, - :started_at, + :timestamp, :step_id ]) |> put_change(:run_id, run.id) |> put_change(:snapshot_id, run.snapshot_id) - |> put_new_change(:started_at, DateTime.utc_now()) + |> put_new_change(:timestamp, DateTime.utc_now()) |> validate_required([ :job_id, :run_id, :snapshot_id, - :started_at, + :timestamp, :step_id ]) |> then(&validate_job_reachable/1) @@ -81,10 +192,10 @@ defmodule Lightning.Runs.Handlers do :credential_id, :input_dataclip_id, :job_id, - :started_at, :snapshot_id ]) |> Map.put(:id, step_id) + |> Map.put(:started_at, start_step.timestamp) |> Step.new() end @@ -156,7 +267,7 @@ defmodule Lightning.Runs.Handlers do field :error_type, :string field :error_message, :string field :step_id, Ecto.UUID - field :finished_at, :utc_datetime_usec + field :timestamp, Lightning.UnixDateTime end def new(params, options) do @@ -169,9 +280,9 @@ defmodule Lightning.Runs.Handlers do :error_type, :error_message, :step_id, - :finished_at + :timestamp ]) - |> put_new_change(:finished_at, DateTime.utc_now()) + |> put_new_change(:timestamp, DateTime.utc_now()) |> then(fn changeset -> output_dataclip_id = get_change(changeset, :output_dataclip_id) output_dataclip = get_change(changeset, :output_dataclip) @@ -190,7 +301,7 @@ defmodule Lightning.Runs.Handlers do end) |> validate_required([ :run_id, - :finished_at, + :timestamp, :project_id, :reason, :step_id @@ -214,11 +325,7 @@ defmodule Lightning.Runs.Handlers do {:ok, _} <- maybe_save_dataclip(complete_step, options) do step - |> Step.finished( - complete_step.output_dataclip_id, - {complete_step.reason, complete_step.error_type, - complete_step.error_message} - ) + |> Step.finished(to_step_params(complete_step)) |> Repo.update() else nil -> @@ -233,6 +340,13 @@ defmodule Lightning.Runs.Handlers do end) end + defp to_step_params(%__MODULE__{} = complete_step) do + complete_step + |> Map.take([:output_dataclip_id, :error_type, :error_message]) + |> Map.put(:exit_reason, complete_step.reason) + |> Map.put(:finished_at, complete_step.timestamp) + end + defp get_step(id) do from(s in Lightning.Invocation.Step, where: s.id == ^id) |> Repo.one() diff --git a/lib/lightning/runs/run.ex b/lib/lightning/runs/run.ex index 098a05a21d..3842b7dc1f 100644 --- a/lib/lightning/runs/run.ex +++ b/lib/lightning/runs/run.ex @@ -86,7 +86,7 @@ defmodule Lightning.Run do values: [immediate: 0, normal: 1], default: :normal - timestamps type: :utc_datetime_usec, updated_at: false + timestamps(type: :utc_datetime_usec) end def for(%Trigger{} = trigger, attrs) do @@ -144,12 +144,11 @@ defmodule Lightning.Run do ) end - def start(run) do + def start(run, params) do run - |> change( - state: :started, - started_at: DateTime.utc_now() - ) + |> cast(params, [:started_at]) + |> put_change(:state, :started) + |> validate_required([:started_at]) |> validate_state_change() end @@ -159,9 +158,8 @@ defmodule Lightning.Run do run |> change() |> put_change(:state, nil) - |> cast(params, [:state, :error_type]) - |> put_change(:finished_at, DateTime.utc_now()) - |> validate_required([:state]) + |> cast(params, [:state, :error_type, :finished_at]) + |> validate_required([:state, :finished_at]) |> validate_inclusion(:state, @final_states) |> validate_state_change() end diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex index 49a847e011..fcf7d6b0ce 100644 --- a/lib/lightning_web/channels/run_channel.ex +++ b/lib/lightning_web/channels/run_channel.ex @@ -20,10 +20,10 @@ defmodule LightningWeb.RunChannel do def join( "run:" <> id, %{"token" => token}, - %{assigns: %{token: worker_token}} = socket - ) do - with {:ok, _} <- Workers.verify_worker_token(worker_token), - {:ok, claims} <- Workers.verify_run_token(token, %{id: id}), + %{assigns: %{claims: worker_claims}} = socket + ) + when not is_nil(worker_claims) do + with {:ok, claims} <- Workers.verify_run_token(token, %{id: id}), run when is_map(run) <- Runs.get_for_worker(id) || {:error, :not_found}, project_id when is_binary(project_id) <- Runs.get_project_id_for_run(run) do @@ -58,8 +58,8 @@ defmodule LightningWeb.RunChannel do reply_with(socket, {:ok, RunWithOptions.render(run)}) end - def handle_in("run:start", _payload, socket) do - case Runs.start_run(socket.assigns.run) do + def handle_in("run:start", payload, socket) do + case Runs.start_run(socket.assigns.run, payload) do {:ok, run} -> socket |> assign(run: run) |> reply_with({:ok, nil}) @@ -69,9 +69,7 @@ defmodule LightningWeb.RunChannel do end def handle_in("run:complete", payload, socket) do - params = replace_reason_with_exit_reason(payload) - - case Runs.complete_run(socket.assigns.run, params) do + case Runs.complete_run(socket.assigns.run, payload) do {:ok, run} -> # TODO: Turn FailureAlerter into an Oban worker and process async # instead of blocking the channel. @@ -204,24 +202,6 @@ defmodule LightningWeb.RunChannel do end end - defp replace_reason_with_exit_reason(params) do - {reason, payload} = Map.pop(params, "reason") - - Map.put( - payload, - "state", - case reason do - "ok" -> :success - "fail" -> :failed - "crash" -> :crashed - "cancel" -> :cancelled - "kill" -> :killed - "exception" -> :exception - unknown -> unknown - end - ) - end - defp update_scrubber(nil, samples, basic_auth) do Scrubber.start_link( samples: samples, diff --git a/lib/lightning_web/channels/worker_channel.ex b/lib/lightning_web/channels/worker_channel.ex index 3d558627a4..fb68a9e122 100644 --- a/lib/lightning_web/channels/worker_channel.ex +++ b/lib/lightning_web/channels/worker_channel.ex @@ -10,14 +10,9 @@ defmodule LightningWeb.WorkerChannel do alias Lightning.Workers @impl true - def join("worker:queue", _payload, %{assigns: %{token: token}} = socket) do - case Workers.verify_worker_token(token) do - {:ok, claims} -> - {:ok, assign(socket, :claims, claims)} - - {:error, _} -> - {:error, %{reason: "unauthorized"}} - end + def join("worker:queue", _payload, %{assigns: %{claims: claims}} = socket) + when not is_nil(claims) do + {:ok, socket} end def join("worker:queue", _payload, _socket) do @@ -33,8 +28,7 @@ defmodule LightningWeb.WorkerChannel do |> Enum.map(fn run -> opts = run_options(run) - token = - Lightning.Workers.generate_run_token(run, opts) + token = Workers.generate_run_token(run, opts) %{ "id" => run.id, diff --git a/lib/lightning_web/channels/worker_socket.ex b/lib/lightning_web/channels/worker_socket.ex index ad0cc005cc..6613da8dfe 100644 --- a/lib/lightning_web/channels/worker_socket.ex +++ b/lib/lightning_web/channels/worker_socket.ex @@ -1,6 +1,7 @@ defmodule LightningWeb.WorkerSocket do use Phoenix.Socket + alias Lightning.Workers # A Socket handler # # It's possible to control the websocket connection and @@ -29,10 +30,17 @@ defmodule LightningWeb.WorkerSocket do # performing token verification on connect. @impl true def connect(params, socket, _connect_info) do - case params do - %{"token" => token} -> - {:ok, socket |> assign(token: token)} - + with %{"token" => token} <- params, + {:ok, claims} <- Workers.verify_worker_token(token) do + {:ok, + socket + |> assign( + claims: claims, + token: token, + worker_version: params["worker_version"], + api_version: params["api_version"] + )} + else _ -> {:error, :unauthorized} end diff --git a/priv/repo/migrations/20241001053525_add_updated_at_to_runs.exs b/priv/repo/migrations/20241001053525_add_updated_at_to_runs.exs new file mode 100644 index 0000000000..907e800649 --- /dev/null +++ b/priv/repo/migrations/20241001053525_add_updated_at_to_runs.exs @@ -0,0 +1,36 @@ +defmodule Lightning.Repo.Migrations.AddUpdatedAtToRuns do + use Ecto.Migration + + def change do + alter table("runs") do + add :updated_at, :utc_datetime_usec + end + + execute( + """ + WITH latest_step_updates AS ( + SELECT + runs.id AS run_id, + MAX(steps.updated_at) AS max_updated_at + FROM + runs + LEFT JOIN + run_steps ON runs.id = run_steps.run_id + LEFT JOIN + steps ON run_steps.step_id = steps.id + GROUP BY + runs.id + ) + UPDATE runs + SET updated_at = COALESCE(latest_step_updates.max_updated_at, runs.inserted_at) + FROM latest_step_updates + WHERE runs.id = latest_step_updates.run_id + """, + "SELECT true" + ) + + alter table("runs") do + modify :updated_at, :utc_datetime_usec, null: false, from: {:utc_datetime_usec, null: true} + end + end +end diff --git a/test/integration/web_and_worker_test.exs b/test/integration/web_and_worker_test.exs index 47d324f2a2..0cfe9d9877 100644 --- a/test/integration/web_and_worker_test.exs +++ b/test/integration/web_and_worker_test.exs @@ -248,7 +248,7 @@ defmodule Lightning.WebAndWorkerTest do end) assert version_logs =~ "▸ node.js 18.17" - assert version_logs =~ "▸ worker 1.6" + assert version_logs =~ "▸ worker 1.7" assert version_logs =~ "▸ @openfn/language-http 3.1.12" expected_lines = diff --git a/test/lightning/failure_alert_test.exs b/test/lightning/failure_alert_test.exs index 0a2c9a7c22..3b7d38884a 100644 --- a/test/lightning/failure_alert_test.exs +++ b/test/lightning/failure_alert_test.exs @@ -257,7 +257,7 @@ defmodule Lightning.FailureAlertTest do } do Lightning.Stub.reset_time() - {:ok, bearer, _} = + {:ok, bearer, claims} = Workers.Token.generate_and_sign( %{}, Lightning.Config.worker_token_signer() @@ -272,7 +272,7 @@ defmodule Lightning.FailureAlertTest do {:ok, %{}, socket} = LightningWeb.WorkerSocket - |> socket("socket_id", %{token: bearer}) + |> socket("socket_id", %{token: bearer, claims: claims}) |> subscribe_and_join( LightningWeb.RunChannel, "run:#{run.id}", diff --git a/test/lightning/runs_test.exs b/test/lightning/runs_test.exs index c90232ba16..34b0c293d0 100644 --- a/test/lightning/runs_test.exs +++ b/test/lightning/runs_test.exs @@ -601,7 +601,7 @@ defmodule Lightning.RunsTest do |> insert() {:error, changeset} = - Runs.complete_run(run, %{state: :success}) + Runs.complete_run(run, %{state: "success"}) assert {:state, {"cannot mark run success that has not been claimed by a worker", @@ -618,7 +618,7 @@ defmodule Lightning.RunsTest do Lightning.WorkOrders.subscribe(workflow.project_id) - {:ok, run} = Runs.complete_run(run, %{state: :success}) + {:ok, run} = Runs.complete_run(run, %{state: "success"}) assert run.state == :success assert DateTime.after?(DateTime.utc_now(), run.finished_at) @@ -644,7 +644,7 @@ defmodule Lightning.RunsTest do |> Repo.update() {:error, changeset} = - Runs.complete_run(run, %{state: :lost, error_type: "Lost"}) + Runs.complete_run(run, %{state: "lost", error_type: "Lost"}) assert changeset.errors == [ state: @@ -667,7 +667,7 @@ defmodule Lightning.RunsTest do |> Repo.update() {:ok, run} = - Runs.complete_run(run, %{state: :lost, error_type: "Lost"}) + Runs.complete_run(run, %{state: "lost", error_type: "Lost"}) assert run.state == :lost end @@ -815,7 +815,9 @@ defmodule Lightning.RunsTest do insert(:run, work_order: work_order, starting_trigger: trigger, - dataclip: dataclip + dataclip: dataclip, + state: :started, + claimed_at: DateTime.utc_now() |> DateTime.add(-3600) ) finished_step = diff --git a/test/lightning_web/channels/run_channel_test.exs b/test/lightning_web/channels/run_channel_test.exs index 4dc96a0ebc..a8c5ea0455 100644 --- a/test/lightning_web/channels/run_channel_test.exs +++ b/test/lightning_web/channels/run_channel_test.exs @@ -40,21 +40,7 @@ defmodule LightningWeb.RunChannelTest do end describe "joining the run:* channel" do - setup do - Lightning.Stub.reset_time() - - {:ok, bearer, _} = - Workers.Token.generate_and_sign( - %{}, - Lightning.Config.worker_token_signer() - ) - - Lightning.Stub.freeze_time(DateTime.utc_now() |> DateTime.add(5, :second)) - - socket = LightningWeb.WorkerSocket |> socket("socket_id", %{token: bearer}) - - %{socket: socket} - end + setup :create_socket test "rejects joining when the token isn't valid", %{socket: socket} do assert {:error, %{reason: "unauthorized"}} = @@ -185,22 +171,13 @@ defmodule LightningWeb.RunChannelTest do } end + @tag project_retention_policy: :erase_all test "fetch:plan for project with erase_all retention setting", %{ - credential: credential + credential: credential, + socket: socket, + run: run, + workflow: workflow } do - project = insert(:project, retention_policy: :erase_all) - - workflow_context = - create_workflow(%{project: project, credential: credential}) - - %{run: run, workflow: workflow} = - create_run( - %{project: project, credential: credential} - |> Map.merge(workflow_context) - ) - - %{socket: socket} = create_socket(%{run: run}) - id = run.id ref = push(socket, "fetch:plan", %{}) @@ -252,11 +229,9 @@ defmodule LightningWeb.RunChannelTest do } end - test "fetch:plan includes options from usage limiter", %{ - credential: credential - } do - project = insert(:project, retention_policy: :erase_all) - project_id = project.id + @tag project_retention_policy: :erase_all + test "fetch:plan includes options from usage limiter", context do + project_id = context.project.id extra_options = [run_timeout_ms: 5000, save_dataclips: false] expected_worker_options = %{run_timeout_ms: 5000, output_dataclips: false} @@ -264,21 +239,15 @@ defmodule LightningWeb.RunChannelTest do Mox.expect( Lightning.Extensions.MockUsageLimiter, :get_run_options, - fn %{project_id: ^project_id} -> - extra_options - end + fn %{project_id: ^project_id} -> extra_options end ) - workflow_context = - create_workflow(%{project: project, credential: credential}) - - %{run: run} = - create_run( - %{project: project, credential: credential} - |> Map.merge(workflow_context) - ) - - %{socket: socket} = create_socket(%{run: run}) + %{socket: socket} = + merge_setups(context, [ + :create_run, + :create_socket, + :join_run_channel + ]) ref = push(socket, "fetch:plan", %{}) @@ -324,11 +293,7 @@ defmodule LightningWeb.RunChannelTest do @tag project_retention_policy: :erase_all test "fetch:dataclip wipes dataclip body for projects with erase_all retention policy", - context do - %{run: run, dataclip: dataclip} = create_run(context) - - %{socket: socket} = create_socket(%{run: run}) - + %{socket: socket, dataclip: dataclip} do ref = push(socket, "fetch:dataclip", %{}) assert_reply ref, :ok, {:binary, _payload} @@ -345,10 +310,7 @@ defmodule LightningWeb.RunChannelTest do @tag project_retention_policy: :retain_all test "fetch:dataclip wipes dataclip body for projects with retain_all retention policy", context do - # retain_all - %{run: run, dataclip: dataclip} = create_run(context) - - %{socket: socket} = create_socket(%{run: run}) + %{socket: socket, dataclip: dataclip} = context ref = push(socket, "fetch:dataclip", %{}) @@ -473,6 +435,8 @@ defmodule LightningWeb.RunChannelTest do end describe "marking steps as started and finished" do + setup :create_socket + setup context do user = insert(:user) @@ -517,15 +481,7 @@ defmodule LightningWeb.RunChannelTest do Lightning.Extensions.MockUsageLimiter.get_run_options(%Context{ project_id: project.id }) - |> Enum.into(%{}) - ) - - Lightning.Stub.reset_time() - - {:ok, bearer, _} = - Workers.Token.generate_and_sign( - %{}, - Lightning.Config.worker_token_signer() + |> Map.new() ) run_options = @@ -533,15 +489,12 @@ defmodule LightningWeb.RunChannelTest do project_id: project.id }) - {:ok, %{}, socket} = - LightningWeb.WorkerSocket - |> socket("socket_id", %{token: bearer}) + {:ok, _, socket} = + context.socket |> subscribe_and_join( LightningWeb.RunChannel, "run:#{run.id}", - %{ - "token" => Workers.generate_run_token(run, run_options) - } + %{"token" => Workers.generate_run_token(run, run_options)} ) %{ @@ -556,6 +509,44 @@ defmodule LightningWeb.RunChannelTest do } end + @tag api_version: "1.2" + test "step:start with API v1.2", %{ + socket: socket, + run: %{dataclip_id: dataclip_id}, + workflow: workflow, + credential: %{id: credential_id}, + project: project + } do + # { id, job_id, input_dataclip_id } + step_id = Ecto.UUID.generate() + [%{id: job_id}] = workflow.jobs + + timestamp = 1_727_423_491_748_984 + + ref = + push(socket, "step:start", %{ + "step_id" => step_id, + "credential_id" => credential_id, + "job_id" => job_id, + "input_dataclip_id" => dataclip_id, + "timestamp" => to_string(timestamp) + }) + + assert_reply ref, :ok, %{step_id: ^step_id}, 1_000 + + assert project.retention_policy == :retain_all + + assert %{ + credential_id: ^credential_id, + job_id: ^job_id, + input_dataclip_id: ^dataclip_id, + started_at: started_at + } = + Repo.get!(Step, step_id) + + assert DateTime.to_unix(started_at, :microsecond) == timestamp + end + test "step:start", %{ socket: socket, run: %{dataclip_id: dataclip_id}, @@ -588,13 +579,16 @@ defmodule LightningWeb.RunChannelTest do end @tag project_retention_policy: :erase_all - test "step:start for a project with erase_all retention policy", - %{ - credential: %{id: credential_id}, - project: project, - workflow: workflow - } = context do - %{socket: socket, run: %{dataclip_id: dataclip_id}} = context + test "step:start providing a dataclip for a project with erase_all retention policy", + context do + %{ + socket: socket, + run: %{dataclip_id: dataclip_id}, + credential: %{id: credential_id}, + project: project, + workflow: workflow + } = context + # input dataclip is saved if provided by the worker assert project.retention_policy == :erase_all @@ -618,9 +612,16 @@ defmodule LightningWeb.RunChannelTest do } = Repo.get!(Step, step_id), "dataclip is saved if provided" + end - %{run: run} = create_run(context) - %{socket: socket} = create_socket(%{context | run: run}) + @tag project_retention_policy: :erase_all + test "step:start without a dataclip for a project with erase_all retention policy", + context do + %{ + socket: socket, + credential: %{id: credential_id}, + workflow: workflow + } = context step_id = Ecto.UUID.generate() [%{id: job_id}] = workflow.jobs @@ -641,6 +642,34 @@ defmodule LightningWeb.RunChannelTest do } = Repo.get!(Step, step_id) end + @tag api_version: "1.2" + test "step:complete succeeds with API v1.2", %{ + socket: socket, + run: run, + workflow: workflow + } do + [job] = workflow.jobs + %{id: step_id} = step = insert(:step, runs: [run], job: job) + + timestamp = 1_727_423_491_748_984 + + ref = + push(socket, "step:complete", %{ + "step_id" => step.id, + "output_dataclip_id" => Ecto.UUID.generate(), + "output_dataclip" => ~s({"foo": "bar"}), + "reason" => "normal", + "timestamp" => to_string(timestamp) + }) + + assert_reply ref, :ok, %{step_id: ^step_id} + + assert %{exit_reason: "normal", finished_at: finished_at} = + Repo.get(Step, step.id) + + assert DateTime.to_unix(finished_at, :microsecond) == timestamp + end + test "step:complete succeeds with normal reason", %{ socket: socket, run: run, @@ -741,8 +770,13 @@ defmodule LightningWeb.RunChannelTest do assert is_nil(dataclip.body), "body is wiped" assert is_struct(dataclip.wiped_at, DateTime) - %{run: run} = create_run(context) - %{socket: socket} = create_socket(%{run: run}) + %{socket: socket} = + context + |> merge_setups([ + :create_run, + :create_socket, + :join_run_channel + ]) [job] = workflow.jobs %{id: step_id} = insert(:step, runs: [run], job: job) @@ -791,29 +825,14 @@ defmodule LightningWeb.RunChannelTest do Lightning.Extensions.MockUsageLimiter.get_run_options(%Context{ project_id: project.id }) - |> Enum.into(%{}) + |> Map.new() ) - Lightning.Stub.reset_time() - - {:ok, bearer, _} = - Workers.Token.generate_and_sign( - %{}, - Lightning.Config.worker_token_signer() - ) - - {:ok, %{}, socket} = - LightningWeb.WorkerSocket - |> socket("socket_id", %{token: bearer}) - |> subscribe_and_join( - LightningWeb.RunChannel, - "run:#{run.id}", - %{"token" => Workers.generate_run_token(run, run_timeout_ms: 2)} - ) - - %{socket: socket, run: run, workflow: workflow} + %{run: run, workflow: workflow} end + setup [:create_socket, :join_run_channel] + test "run:log missing message can't be blank", %{ socket: socket, run: run, @@ -915,11 +934,13 @@ defmodule LightningWeb.RunChannelTest do end end - describe "marking runs as started and finished" do + describe "run:start" do + setup [:create_user, :create_project] + setup context do run_state = Map.get(context, :run_state, :available) - project = insert(:project) + project = context.project dataclip = insert(:http_request_dataclip, project: project) %{triggers: [trigger]} = @@ -942,34 +963,18 @@ defmodule LightningWeb.RunChannelTest do Lightning.Extensions.MockUsageLimiter.get_run_options(%Context{ project_id: project.id }) - |> Enum.into(%{}) - ) - - Lightning.Stub.reset_time() - - {:ok, bearer, _} = - Workers.Token.generate_and_sign( - %{}, - Lightning.Config.worker_token_signer() - ) - - {:ok, %{}, socket} = - LightningWeb.WorkerSocket - |> socket("socket_id", %{token: bearer}) - |> subscribe_and_join( - LightningWeb.RunChannel, - "run:#{run.id}", - %{"token" => Workers.generate_run_token(run, run_timeout_ms: 2)} + |> Map.new() ) %{ - socket: socket, run: run, workflow: workflow, work_order: work_order } end + setup [:create_socket, :join_run_channel] + @tag run_state: :claimed test "run:start", %{ socket: socket, @@ -984,6 +989,67 @@ defmodule LightningWeb.RunChannelTest do assert %{state: :running} = Lightning.Repo.reload!(work_order) end + @tag run_state: :claimed, api_version: "1.2" + test "run:start with API v1.2", %{ + socket: socket, + run: run, + work_order: work_order + } do + timestamp = 1_727_423_491_748_984 + + ref = push(socket, "run:start", %{"timestamp" => to_string(timestamp)}) + + assert_reply ref, :ok, nil + + assert %{state: :started, started_at: started_at} = + Lightning.Repo.reload!(run) + + assert DateTime.to_unix(started_at, :microsecond) == timestamp + assert %{state: :running} = Lightning.Repo.reload!(work_order) + end + end + + describe "run:complete" do + setup [:create_user, :create_project] + + setup context do + run_state = Map.get(context, :run_state, :available) + + project = context.project + dataclip = insert(:http_request_dataclip, project: project) + + %{triggers: [trigger]} = + workflow = insert(:simple_workflow, project: project) + + work_order = + insert(:workorder, + workflow: workflow, + trigger: trigger, + dataclip: dataclip + ) + + run = + insert(:run, + work_order: work_order, + starting_trigger: trigger, + dataclip: dataclip, + state: run_state, + options: + Lightning.Extensions.MockUsageLimiter.get_run_options(%Context{ + project_id: project.id + }) + |> Map.new() + ) + + %{ + run: run, + workflow: workflow, + work_order: work_order + } + end + + setup [:create_socket, :join_run_channel] + @tag run_state: :claimed test "run:complete when claimed", %{socket: socket} do ref = @@ -1009,6 +1075,29 @@ defmodule LightningWeb.RunChannelTest do } end + @tag run_state: :started, api_version: "1.2" + test "run:complete with API v1.2", %{ + socket: socket, + run: run + } do + timestamp = 1_727_423_491_748_984 + + ref = + push(socket, "run:complete", %{ + "timestamp" => to_string(timestamp), + "reason" => "success", + "error_type" => nil, + "error_message" => nil + }) + + assert_reply ref, :ok, nil + + assert %{state: :success, finished_at: finished_at} = + Lightning.Repo.reload!(run) + + assert DateTime.to_unix(finished_at, :microsecond) == timestamp + end + @tag run_state: :started test "run:complete when started", %{ socket: socket, @@ -1106,10 +1195,17 @@ defmodule LightningWeb.RunChannelTest do end defp create_socket_and_run(context) do - [&create_project/1, &create_workflow/1, &create_run/1, &create_socket/1] - |> Enum.reduce(context, fn f, context -> - Map.merge(context, f.(context)) - end) + merge_setups(context, [ + :create_project, + :create_workflow, + :create_run, + :create_socket, + :join_run_channel + ]) + end + + defp create_user(_context) do + %{user: insert(:user)} end defp create_project(%{user: user} = context) do @@ -1191,7 +1287,7 @@ defmodule LightningWeb.RunChannelTest do Lightning.Extensions.MockUsageLimiter.get_run_options(%Context{ project_id: project.id }) - |> Enum.into(%{}) + |> Map.new() ) %{ @@ -1202,18 +1298,29 @@ defmodule LightningWeb.RunChannelTest do } end - defp create_socket(%{run: run}) do - Lightning.Stub.reset_time() - - {:ok, bearer, _} = + defp create_socket(context) do + {:ok, bearer, claims} = Workers.Token.generate_and_sign( %{}, Lightning.Config.worker_token_signer() ) - {:ok, %{}, socket} = + assigns = %{ + token: bearer, + claims: claims, + api_version: context[:api_version] + } + + socket = LightningWeb.WorkerSocket - |> socket("socket_id", %{token: bearer}) + |> socket("socket_id", assigns) + + %{socket: socket} + end + + defp join_run_channel(%{run: run, socket: socket}) do + {:ok, _, socket} = + socket |> subscribe_and_join( LightningWeb.RunChannel, "run:#{run.id}", @@ -1225,7 +1332,7 @@ defmodule LightningWeb.RunChannelTest do defp stringify_keys(map) do Enum.map(map, fn {k, v} -> {Atom.to_string(k), v} end) - |> Enum.into(%{}) + |> Map.new() end defp set_google_credential(_context) do diff --git a/test/lightning_web/channels/worker_channel_test.exs b/test/lightning_web/channels/worker_channel_test.exs index 65e9345ef2..ff4435dd6d 100644 --- a/test/lightning_web/channels/worker_channel_test.exs +++ b/test/lightning_web/channels/worker_channel_test.exs @@ -6,27 +6,30 @@ defmodule LightningWeb.WorkerChannelTest do import Lightning.Factories describe "joining" do - test "with an invalid token" do - assert LightningWeb.UserSocket + test "with an invalid claim" do + assert LightningWeb.WorkerSocket |> socket("socket_id", %{}) |> subscribe_and_join(LightningWeb.WorkerChannel, "worker:queue") == {:error, %{reason: "unauthorized"}} + + assert LightningWeb.WorkerSocket + |> socket("socket_id", %{token: "foo"}) + |> subscribe_and_join(LightningWeb.WorkerChannel, "worker:queue") == + {:error, %{reason: "unauthorized"}} end end describe "worker:queue channel" do setup do - Lightning.Stub.reset_time() - - {:ok, bearer, _} = + {:ok, bearer, claims} = Workers.Token.generate_and_sign( %{}, Lightning.Config.worker_token_signer() ) - Lightning.Stub.freeze_time(DateTime.utc_now() |> DateTime.add(5, :second)) - - socket = LightningWeb.WorkerSocket |> socket("socket_id", %{token: bearer}) + socket = + LightningWeb.WorkerSocket + |> socket("socket_id", %{token: bearer, claims: claims}) {:ok, _, socket} = socket |> subscribe_and_join(LightningWeb.WorkerChannel, "worker:queue") diff --git a/test/lightning_web/channels/worker_socket_test.exs b/test/lightning_web/channels/worker_socket_test.exs new file mode 100644 index 0000000000..ad42144d5a --- /dev/null +++ b/test/lightning_web/channels/worker_socket_test.exs @@ -0,0 +1,34 @@ +defmodule LightningWeb.WorkerSocketTest do + use LightningWeb.ChannelCase, async: true + alias Lightning.Workers + + describe "connect" do + test "without a valid token" do + assert LightningWeb.WorkerSocket |> connect(%{}) == {:error, :unauthorized} + + assert LightningWeb.WorkerSocket |> connect(%{token: "foo"}) == + {:error, :unauthorized} + end + + test "with a valid token" do + {:ok, bearer, _} = + Workers.Token.generate_and_sign( + %{}, + Lightning.Config.worker_token_signer() + ) + + Lightning.Stub.freeze_time(DateTime.utc_now() |> DateTime.add(5, :second)) + + assert {:ok, socket} = + LightningWeb.WorkerSocket + |> connect(%{ + token: bearer, + worker_version: "1.5.0", + api_version: "1.1" + }) + + assert %{token: ^bearer, worker_version: "1.5.0", api_version: "1.1"} = + socket.assigns + end + end +end diff --git a/test/lightning_web/live/run_live/show_test.exs b/test/lightning_web/live/run_live/show_test.exs index bdfb626912..7c7a933100 100644 --- a/test/lightning_web/live/run_live/show_test.exs +++ b/test/lightning_web/live/run_live/show_test.exs @@ -186,7 +186,7 @@ defmodule LightningWeb.RunLive.ShowTest do ) |> Enum.count() == 1 - {:ok, _} = Lightning.Runs.complete_run(run, %{state: :failed}) + {:ok, _} = Lightning.Runs.complete_run(run, %{state: "failed"}) assert view |> element("#run-detail-#{run_id}") diff --git a/test/lightning_web/live/workflow_live/editor_test.exs b/test/lightning_web/live/workflow_live/editor_test.exs index 22036f36f2..5255b32ed6 100644 --- a/test/lightning_web/live/workflow_live/editor_test.exs +++ b/test/lightning_web/live/workflow_live/editor_test.exs @@ -1573,7 +1573,7 @@ defmodule LightningWeb.WorkflowLive.EditorTest do "error_message" => "Unexpected token (6:9)", "error_type" => "CompileError", "final_dataclip_id" => "", - "state" => :crashed + "state" => "crashed" }) assert_received %Lightning.Runs.Events.RunUpdated{ diff --git a/test/support/test_utils.ex b/test/support/test_utils.ex index 5e22bceec6..b7791f5015 100644 --- a/test/support/test_utils.ex +++ b/test/support/test_utils.ex @@ -18,4 +18,30 @@ defmodule Lightning.TestUtils do :ok end + + @doc """ + Merge the given setups into the given context. + Just works a bit like `setup` on ExUnit.Case. + + Useful when writing tests, which have some nice setups but you need to + make a new context inside the test. + + Use with care, chances are you should be writing another test or refactoring + the tests to use tags. + """ + defmacro merge_setups(context, fns) do + fns = + Enum.map(fns, fn f -> + quote do + fn c -> unquote(f)(c) end + end + end) + + quote do + unquote(fns) + |> Enum.reduce(unquote(context), fn f, context -> + Map.merge(context, f.(context)) + end) + end + end end