Skip to content

Commit

Permalink
Mark unfinished steps having finished runs as (#2554)
Browse files Browse the repository at this point in the history
  • Loading branch information
midigofrank authored Oct 9, 2024
1 parent fd5a419 commit 6f187fc
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ and this project adheres to
[#2542](https://github.com/OpenFn/lightning/issues/2542)
- Perform data retention purging in batches to avoid timeouts
[#2528](https://github.com/OpenFn/lightning/issues/2528)
- Mark unfinished steps having finished runs as `lost`
[#2416](https://github.com/OpenFn/lightning/issues/2416)

## [v2.9.8] - 2024-10-03

Expand Down
2 changes: 2 additions & 0 deletions lib/lightning/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ defmodule Lightning.Janitor do
Runs.mark_run_lost(run)
end)
|> Stream.run()

Runs.Query.lost_steps() |> Runs.mark_steps_lost()
end)
end
end
16 changes: 13 additions & 3 deletions lib/lightning/runs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,22 @@ defmodule Lightning.Runs do

Ecto.assoc(run, :steps)
|> where([r], is_nil(r.exit_reason))
|> Repo.update_all(
set: [exit_reason: "lost", finished_at: DateTime.utc_now()]
)
|> mark_steps_lost()
end)
end

@spec mark_steps_lost(Ecto.Queryable.t()) :: {:ok, non_neg_integer()}
def mark_steps_lost(steps_query) do
{updated_count, nil} =
Repo.update_all(
steps_query,
[set: [exit_reason: "lost", finished_at: DateTime.utc_now()]],
returning: false
)

{:ok, updated_count}
end

defdelegate subscribe(run), to: Events

def get_project_id_for_run(run) do
Expand Down
11 changes: 11 additions & 0 deletions lib/lightning/runs/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Lightning.Runs.Query do
"""
import Ecto.Query

alias Lightning.Invocation.Step
alias Lightning.Run

require Lightning.Run
Expand Down Expand Up @@ -48,6 +49,16 @@ defmodule Lightning.Runs.Query do
)
end

@spec lost_steps() :: Ecto.Queryable.t()
def lost_steps do
final_states = Run.final_states()

from s in Step,
join: r in assoc(s, :runs),
on: r.state in ^final_states,
where: is_nil(s.exit_reason) and is_nil(s.finished_at)
end

@doc """
Query to return a list of runs that are either in progress (started or claimed)
or available.
Expand Down
103 changes: 103 additions & 0 deletions test/lightning/janitor_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Lightning.JanitorTest do
require Lightning.Run
use Lightning.DataCase, async: true
alias Lightning.Janitor
alias Lightning.Invocation
Expand Down Expand Up @@ -93,5 +94,107 @@ defmodule Lightning.JanitorTest do
reloaded_lost_long_run = Repo.get(Run, lost_long_run.id)
assert reloaded_lost_long_run.state == :lost
end

test "updates steps whose run has finished but step hasn't" do
%{triggers: [trigger], jobs: [job_1 | _]} =
workflow = insert(:simple_workflow)

dataclip = insert(:dataclip)

work_order =
insert(:workorder,
workflow: workflow,
trigger: trigger,
dataclip: dataclip
)

finished_runs_with_unfinished_steps =
Run.final_states()
|> Enum.map(fn state ->
insert(:run,
work_order: work_order,
starting_trigger: trigger,
dataclip: dataclip,
state: state,
steps: [
build(:step,
job: job_1,
finished_at: nil,
exit_reason: nil
)
]
)
end)

finished_runs_with_finished_steps =
Run.final_states()
|> Enum.map(fn state ->
insert(:run,
work_order: work_order,
starting_trigger: trigger,
dataclip: dataclip,
state: state,
steps: [
build(:step,
job: job_1,
finished_at: DateTime.utc_now(),
exit_reason: to_string(state)
)
]
)
end)

unfinished_runs_with_unfinished_steps =
Enum.map(
[:available, :claimed, :started],
fn state ->
insert(:run,
work_order: work_order,
starting_trigger: trigger,
dataclip: dataclip,
state: state,
steps: [
build(:step,
job: job_1,
finished_at: nil,
exit_reason: nil
)
]
)
end
)

Janitor.find_and_update_lost()

# unfinished steps having finished runs gets updated
for run <- finished_runs_with_unfinished_steps do
step = hd(run.steps)
reloaded_step = Repo.reload(step)

assert is_nil(step.finished_at)
assert is_nil(step.exit_reason)

assert is_struct(reloaded_step.finished_at, DateTime)
assert reloaded_step.exit_reason == "lost"
end

# finished steps having finished runs don't get updated
for run <- finished_runs_with_finished_steps do
step = hd(run.steps)
reloaded_step = Repo.reload(step)

assert step.finished_at == reloaded_step.finished_at
assert step.exit_reason == reloaded_step.exit_reason
end

# unfinished steps having unfinished runs don't get updated
for run <- unfinished_runs_with_unfinished_steps do
step = hd(run.steps)
reloaded_step = Repo.reload(step)

assert step.finished_at == reloaded_step.finished_at
assert step.exit_reason == reloaded_step.exit_reason
end
end
end
end

0 comments on commit 6f187fc

Please sign in to comment.