Skip to content

Commit

Permalink
Allow the worker to send timestamps for Runs and Steps (#2529)
Browse files Browse the repository at this point in the history
* Move authentication to WorkerSocket

- Improve tests and test helps for channels/sockets
- Bump ws-worker to 1.6.4 for local dev

* cast timestamps from the worker

* make credo happy

* fix failing tests

* Move worker authentication to socket

Refactor tests
Bump ws-worker

* upgrade worker version

* add updated_at to runs

* fix failing janitor test

* fix failing run lost test

* update worker version

* update changelog

---------

Co-authored-by: Frank Midigo <midigofrank@gmail.com>
Co-authored-by: Midigo Frank <39288959+midigofrank@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 2, 2024
1 parent 5e0083c commit c7864cc
Show file tree
Hide file tree
Showing 20 changed files with 542 additions and 264 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ and this project adheres to

### Changed

- Use timestamps sent from worker when starting and completing runs
[#2434](https://github.com/OpenFn/lightning/issues/2434)

### Fixed

- User email change: Add debounce on blur to input forms to avoid validation after every keystroke
Expand Down
16 changes: 8 additions & 8 deletions assets/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion assets/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"zustand": "^4.3.7"
},
"devDependencies": {
"@openfn/ws-worker": "^1.6",
"@openfn/ws-worker": "^1.7.0",
"@types/marked": "^4.0.8",
"@types/react": "^18.0.15",
"@types/react-dom": "^18.0.6",
Expand Down
20 changes: 8 additions & 12 deletions lib/lightning/invocation/step.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,14 @@ defmodule Lightning.Invocation.Step do
|> validate()
end

def finished(
step,
output_dataclip_id,
# Should this be a specified type?
{exit_reason, error_type, _error_message}
) do
change(step, %{
finished_at: DateTime.utc_now(),
output_dataclip_id: output_dataclip_id,
exit_reason: exit_reason,
error_type: error_type
})
def finished(step, params) do
step
|> cast(params, [
:output_dataclip_id,
:exit_reason,
:error_type,
:finished_at
])
|> validate_required([:finished_at, :exit_reason])
end

Expand Down
31 changes: 4 additions & 27 deletions lib/lightning/runs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,14 @@ defmodule Lightning.Runs do
|> Repo.one()
end

def start_run(%Run{} = run) do
Run.start(run)
|> update_run()
|> tap(&track_run_queue_delay/1)
def start_run(%Run{} = run, params \\ %{}) do
Handlers.StartRun.call(run, params)
end

@spec complete_run(Run.t(), %{optional(any()) => any()}) ::
{:ok, Run.t()} | {:error, Ecto.Changeset.t(Run.t())}
def complete_run(run, params) do
Run.complete(run, params)
|> case do
%{valid?: false} = changeset ->
{:error, changeset}

changeset ->
changeset |> update_run()
end
Handlers.CompleteRun.call(run, params)
end

@spec update_run(Ecto.Changeset.t(Run.t())) ::
Expand Down Expand Up @@ -314,7 +305,7 @@ defmodule Lightning.Runs do
end)

Repo.transaction(fn ->
complete_run(run, %{state: :lost, error_type: error_type})
{:ok, _run} = complete_run(run, %{state: "lost", error_type: error_type})

Ecto.assoc(run, :steps)
|> where([r], is_nil(r.exit_reason))
Expand All @@ -337,18 +328,4 @@ defmodule Lightning.Runs do
|> order_by([{^order, :timestamp}])
|> Repo.stream()
end

defp track_run_queue_delay({:ok, run}) do
%Run{inserted_at: inserted_at, started_at: started_at} = run

delay = DateTime.diff(started_at, inserted_at, :millisecond)

:telemetry.execute(
[:domain, :run, :queue],
%{delay: delay},
%{}
)
end

defp track_run_queue_delay({:error, _changeset}), do: nil
end
142 changes: 128 additions & 14 deletions lib/lightning/runs/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,117 @@ defmodule Lightning.Runs.Handlers do
alias Lightning.RunStep
alias Lightning.WorkOrders

defmodule StartRun do
@moduledoc """
Schema to validate the input attributes of a started run.
"""
use Lightning.Schema

import Lightning.ChangesetUtils

@primary_key false
embedded_schema do
field :timestamp, Lightning.UnixDateTime
end

def call(run, params) do
with {:ok, start_run} <- params |> new() |> apply_action(:validate) do
run
|> Run.start(to_run_params(start_run))
|> Runs.update_run()
|> tap(&track_run_queue_delay/1)
end
end

def new(params) do
%__MODULE__{}
|> cast(params, [:timestamp])
|> put_new_change(:timestamp, DateTime.utc_now())
|> validate_required([:timestamp])
end

defp to_run_params(%__MODULE__{} = start_run) do
%{started_at: start_run.timestamp}
end

defp track_run_queue_delay({:ok, run}) do
%Run{inserted_at: inserted_at, started_at: started_at} = run

delay = DateTime.diff(started_at, inserted_at, :millisecond)

:telemetry.execute(
[:domain, :run, :queue],
%{delay: delay},
%{}
)
end

defp track_run_queue_delay({:error, _changeset}), do: nil
end

defmodule CompleteRun do
@moduledoc """
Schema to validate the input attributes of a completed run.
"""
use Lightning.Schema

import Lightning.ChangesetUtils

@primary_key false
embedded_schema do
field :state, :string
field :reason, :string
field :error_type, :string
field :timestamp, Lightning.UnixDateTime
end

def call(run, params) do
with {:ok, complete_run} <- params |> new() |> apply_action(:validate) do
run
|> Run.complete(to_run_params(complete_run))
|> case do
%{valid?: false} = changeset ->
{:error, changeset}

changeset ->
Runs.update_run(changeset)
end
end
end

def new(params) do
%__MODULE__{}
|> cast(params, [:state, :reason, :error_type, :timestamp])
|> put_new_change(:timestamp, DateTime.utc_now())
|> then(fn changeset ->
if reason = get_change(changeset, :reason) do
put_new_change(changeset, :state, reason_to_state(reason))
else
changeset
end
end)
|> validate_required([:state, :timestamp])
end

defp reason_to_state(reason) do
case reason do
"ok" -> :success
"fail" -> :failed
"crash" -> :crashed
"cancel" -> :cancelled
"kill" -> :killed
"exception" -> :exception
unknown -> unknown
end
end

defp to_run_params(%__MODULE__{} = complete_run) do
complete_run
|> Map.take([:state, :error_type])
|> Map.put(:finished_at, complete_run.timestamp)
end
end

defmodule StartStep do
@moduledoc """
Schema to validate the input attributes of a started step.
Expand All @@ -27,7 +138,7 @@ defmodule Lightning.Runs.Handlers do
field :job_id, Ecto.UUID
field :run_id, Ecto.UUID
field :snapshot_id, Ecto.UUID
field :started_at, :utc_datetime_usec
field :timestamp, Lightning.UnixDateTime
field :step_id, Ecto.UUID
end

Expand All @@ -50,17 +161,17 @@ defmodule Lightning.Runs.Handlers do
:credential_id,
:input_dataclip_id,
:job_id,
:started_at,
:timestamp,
:step_id
])
|> put_change(:run_id, run.id)
|> put_change(:snapshot_id, run.snapshot_id)
|> put_new_change(:started_at, DateTime.utc_now())
|> put_new_change(:timestamp, DateTime.utc_now())
|> validate_required([
:job_id,
:run_id,
:snapshot_id,
:started_at,
:timestamp,
:step_id
])
|> then(&validate_job_reachable/1)
Expand All @@ -81,10 +192,10 @@ defmodule Lightning.Runs.Handlers do
:credential_id,
:input_dataclip_id,
:job_id,
:started_at,
:snapshot_id
])
|> Map.put(:id, step_id)
|> Map.put(:started_at, start_step.timestamp)
|> Step.new()
end

Expand Down Expand Up @@ -156,7 +267,7 @@ defmodule Lightning.Runs.Handlers do
field :error_type, :string
field :error_message, :string
field :step_id, Ecto.UUID
field :finished_at, :utc_datetime_usec
field :timestamp, Lightning.UnixDateTime
end

def new(params, options) do
Expand All @@ -169,9 +280,9 @@ defmodule Lightning.Runs.Handlers do
:error_type,
:error_message,
:step_id,
:finished_at
:timestamp
])
|> put_new_change(:finished_at, DateTime.utc_now())
|> put_new_change(:timestamp, DateTime.utc_now())
|> then(fn changeset ->
output_dataclip_id = get_change(changeset, :output_dataclip_id)
output_dataclip = get_change(changeset, :output_dataclip)
Expand All @@ -190,7 +301,7 @@ defmodule Lightning.Runs.Handlers do
end)
|> validate_required([
:run_id,
:finished_at,
:timestamp,
:project_id,
:reason,
:step_id
Expand All @@ -214,11 +325,7 @@ defmodule Lightning.Runs.Handlers do
{:ok, _} <-
maybe_save_dataclip(complete_step, options) do
step
|> Step.finished(
complete_step.output_dataclip_id,
{complete_step.reason, complete_step.error_type,
complete_step.error_message}
)
|> Step.finished(to_step_params(complete_step))
|> Repo.update()
else
nil ->
Expand All @@ -233,6 +340,13 @@ defmodule Lightning.Runs.Handlers do
end)
end

defp to_step_params(%__MODULE__{} = complete_step) do
complete_step
|> Map.take([:output_dataclip_id, :error_type, :error_message])
|> Map.put(:exit_reason, complete_step.reason)
|> Map.put(:finished_at, complete_step.timestamp)
end

defp get_step(id) do
from(s in Lightning.Invocation.Step, where: s.id == ^id)
|> Repo.one()
Expand Down
16 changes: 7 additions & 9 deletions lib/lightning/runs/run.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ defmodule Lightning.Run do
values: [immediate: 0, normal: 1],
default: :normal

timestamps type: :utc_datetime_usec, updated_at: false
timestamps(type: :utc_datetime_usec)
end

def for(%Trigger{} = trigger, attrs) do
Expand Down Expand Up @@ -144,12 +144,11 @@ defmodule Lightning.Run do
)
end

def start(run) do
def start(run, params) do
run
|> change(
state: :started,
started_at: DateTime.utc_now()
)
|> cast(params, [:started_at])
|> put_change(:state, :started)
|> validate_required([:started_at])
|> validate_state_change()
end

Expand All @@ -159,9 +158,8 @@ defmodule Lightning.Run do
run
|> change()
|> put_change(:state, nil)
|> cast(params, [:state, :error_type])
|> put_change(:finished_at, DateTime.utc_now())
|> validate_required([:state])
|> cast(params, [:state, :error_type, :finished_at])
|> validate_required([:state, :finished_at])
|> validate_inclusion(:state, @final_states)
|> validate_state_change()
end
Expand Down
Loading

0 comments on commit c7864cc

Please sign in to comment.