Skip to content

Commit

Permalink
Perform data retention purging in batches to avoid timeouts (#2547)
Browse files Browse the repository at this point in the history
* Perform data retention purging in batches to avoid timeouts

---------

Co-authored-by: Stuart Corbishley <corbish@gmail.com>
  • Loading branch information
midigofrank and stuartc authored Oct 8, 2024
1 parent 8db94e3 commit 1d89c4b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 49 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ and this project adheres to

### Fixed

- Fix Oban erros not getting logged in Sentry
[#2542](https://github.com/OpenFn/lightning/issues/2542)
- Perform data retention purging in batches to avoid timeouts
[#2528](https://github.com/OpenFn/lightning/issues/2528)

## [v2.9.8] - 2024-10-03

### Added
Expand Down
5 changes: 1 addition & 4 deletions lib/lightning/oban_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@ defmodule Lightning.ObanManager do
if timeout? do
Sentry.capture_message("Processor Timeout",
level: "warning",
message: error,
extra: context,
extra: Map.merge(context, %{exception: inspect(error)}),
tags: %{type: "timeout"}
)
else
Sentry.capture_exception(error,
stacktrace: meta.stacktrace,
message: error,
error: error,
extra: context,
tags: %{type: "oban"}
)
Expand Down
121 changes: 76 additions & 45 deletions lib/lightning/projects.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ defmodule Lightning.Projects do
alias Lightning.Accounts.UserToken
alias Lightning.ExportUtils
alias Lightning.Invocation.Dataclip
alias Lightning.Invocation.LogLine
alias Lightning.Invocation.Step
alias Lightning.Projects.Events
alias Lightning.Projects.File
alias Lightning.Projects.Project
alias Lightning.Projects.ProjectCredential
alias Lightning.Projects.ProjectUser
Expand Down Expand Up @@ -52,12 +52,26 @@ defmodule Lightning.Projects do
{:ok, %{projects_deleted: projects_to_delete}}
end

def perform(%Oban.Job{
args: %{"project_id" => project_id, "type" => "data_retention"}
}) do
project = get_project!(project_id)
delete_history_for(project)
wipe_dataclips_for(project)

:ok
end

def perform(%Oban.Job{args: %{"type" => "data_retention"}}) do
list_projects_having_history_retention()
|> Enum.each(fn project ->
delete_history_for(project)
wipe_dataclips_for(project)
end)
jobs =
list_projects_having_history_retention()
|> Enum.map(fn project ->
new(%{project_id: project.id, type: "data_retention"}, max_attempts: 3)
end)

Oban.insert_all(Lightning.Oban, jobs)

:ok
end

@doc """
Expand Down Expand Up @@ -679,63 +693,80 @@ defmodule Lightning.Projects do

defp delete_history_for(%Project{history_retention_period: period} = project)
when is_integer(period) do
workflows_query = from w in Ecto.assoc(project, :workflows), select: w.id

workorders_query =
from wo in WorkOrder,
where:
wo.workflow_id in subquery(workflows_query) and
wo.last_activity < ago(^period, "day"),
join: wf in assoc(wo, :workflow),
on: wf.project_id == ^project.id,
where: wo.last_activity < ago(^period, "day"),
select: wo.id

runs_query =
from r in Run,
where: r.work_order_id in subquery(workorders_query),
select: r.id
workorders_count = Repo.aggregate(workorders_query, :count)
batch_size = 1000

run_steps_query =
from rs in RunStep,
where: rs.run_id in subquery(runs_query),
select: rs.step_id
workorders_delete_query =
WorkOrder
|> with_cte("workorders_to_delete",
as: ^limit(workorders_query, ^batch_size)
)
|> join(:inner, [wo], wtd in "workorders_to_delete", on: wo.id == wtd.id)

log_lines_query = from l in LogLine, where: l.run_id in subquery(runs_query)
steps_delete_query =
Step
|> join(:inner, [s], assoc(s, :runs), as: :runs)
|> with_cte("workorders_to_delete",
as: ^limit(workorders_query, ^batch_size)
)
|> join(:inner, [runs: r], wtd in "workorders_to_delete",
on: r.work_order_id == wtd.id
)

for _i <- 1..ceil(workorders_count / batch_size) do
Repo.transaction(
fn ->
{_count, _} = Repo.delete_all(steps_delete_query, returning: false)

dataclips_subset_query =
{_count, _} =
Repo.delete_all(workorders_delete_query, returning: false)
end,
timeout: 50_000
)
end

dataclips_query =
from d in Dataclip,
as: :dataclip,
where: d.project_id == ^project.id,
where: d.inserted_at < ago(^period, "day"),
left_join: wo in WorkOrder,
on: d.id == wo.dataclip_id,
left_join: r in Run,
on: d.id == r.dataclip_id,
left_join: s in Step,
on: d.id == s.input_dataclip_id or d.id == s.output_dataclip_id,
where: d.project_id == ^project.id,
where: d.inserted_at < ago(^period, "day"),
where: is_nil(wo.dataclip_id),
where: is_nil(r.dataclip_id),
where: is_nil(s.input_dataclip_id),
where: is_nil(s.output_dataclip_id),
where: is_nil(wo.id) and is_nil(r.id) and is_nil(s.id),
select: d.id

dataclips_query =
from d in Dataclip, where: d.id in subquery(dataclips_subset_query)
dataclips_count = Repo.aggregate(dataclips_query, :count)
dataclips_batch_size = 500

Multi.new()
|> Multi.delete_all(:log_lines, log_lines_query)
|> Multi.delete_all(:run_steps, run_steps_query)
|> Multi.delete_all(:steps, fn %{run_steps: {_, step_ids}} ->
from s in Step, where: s.id in ^step_ids
end)
|> Multi.delete_all(:runs, runs_query)
|> Multi.delete_all(:workorders, workorders_query)
|> Multi.delete_all(:dataclips, dataclips_query)
|> Repo.transaction(timeout: 100_000)
|> case do
{:ok, result} ->
{:ok, result}
for i <- 1..ceil(dataclips_count / dataclips_batch_size) do
count_to_delete = dataclips_batch_size * i

dataclips_delete_query =
Dataclip
|> with_cte("dataclips_to_delete",
as: ^limit(dataclips_query, ^count_to_delete)
)
|> join(:inner, [d], dtd in "dataclips_to_delete", on: d.id == dtd.id)

{:error, _operation, failed_value, _changes} ->
{:error, failed_value}
{_count, _dataclips} =
Repo.delete_all(dataclips_delete_query,
returning: false,
timeout: 20_000
)
end

:ok
end

defp delete_history_for(_project) do
Expand Down Expand Up @@ -859,7 +890,7 @@ defmodule Lightning.Projects do
def list_project_files(%Project{id: project_id}, opts \\ []) do
sort_order = Keyword.get(opts, :sort, :desc)

from(pf in __MODULE__.File,
from(pf in File,
where: pf.project_id == ^project_id,
order_by: [{^sort_order, pf.inserted_at}],
preload: [:created_by]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule Lightning.Repo.Migrations.CreateWorkOrderLastActivityIndex do
use Ecto.Migration

def change do
create index("work_orders", [:last_activity])
end
end

0 comments on commit 1d89c4b

Please sign in to comment.