Skip to content

Commit

Permalink
feat: upgrade cachex to v4 (#2253)
Browse files Browse the repository at this point in the history
* feat: upgrade cachex to v4

* chore: bump to v1.9.5

* chore: fix failing tests
  • Loading branch information
Ziinc authored Nov 6, 2024
1 parent eaf447e commit 0ca5a2b
Show file tree
Hide file tree
Showing 17 changed files with 246 additions and 31 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.9.4
1.9.5
13 changes: 12 additions & 1 deletion lib/logflare/auth/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,18 @@ defmodule Logflare.Auth.Cache do
id: __MODULE__,
start:
{Cachex, :start_link,
[__MODULE__, [stats: stats, expiration: Utils.cache_expiration_sec(30, 15)]]}
[
__MODULE__,
[
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(100_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_sec(30, 15)
]
]}
}
end

Expand Down
13 changes: 12 additions & 1 deletion lib/logflare/backends/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@ defmodule Logflare.Backends.Cache do
id: __MODULE__,
start:
{Cachex, :start_link,
[__MODULE__, [stats: stats, expiration: Utils.cache_expiration_min(), limit: 100_000]]}
[
__MODULE__,
[
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(100_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_min()
]
]}
}
end

Expand Down
71 changes: 71 additions & 0 deletions lib/logflare/backends/source_metrics_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
defmodule Logflare.Backends.SourceMetricsCache do
@moduledoc false
alias Logflare.Logs.LogEvents
alias Logflare.ContextCache
alias Logflare.LogEvent, as: LE
alias Logflare.Utils

@cache __MODULE__

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)

%{
id: __MODULE__,
name: __MODULE__,
start: {
Cachex,
:start_link,
[
@cache,
[
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(5_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_min(60)
]
]
}
}
end

@fetch_event_by_id_and_timestamp_key {:fetch_event_by_id_and_timestamp, 2}
@spec put_event_with_id_and_timestamp(atom, keyword, LE.t()) :: term
def put_event_with_id_and_timestamp(source_token, kw, %LE{} = log_event) do
cache_key = {@fetch_event_by_id_and_timestamp_key, [source_token, kw]}
Cachex.put(@cache, cache_key, {:ok, log_event})
end

@spec fetch_event_by_id_and_timestamp(atom, keyword) :: {:ok, map()} | {:error, map()}
def fetch_event_by_id_and_timestamp(source_token, kw) when is_atom(source_token) do
ContextCache.apply_fun(LogEvents, @fetch_event_by_id_and_timestamp_key, [source_token, kw])
end

@fetch_event_by_id {:fetch_event_by_id, 2}
@spec fetch_event_by_id(atom, binary()) :: {:ok, LE.t() | nil} | {:error, map()}
def fetch_event_by_id(source_token, id) when is_atom(source_token) and is_binary(id) do
ContextCache.apply_fun(LogEvents, @fetch_event_by_id, [source_token, id])
end

@fetch_event_by_path {:fetch_event_by_path, 3}
@spec fetch_event_by_path(atom, binary(), term()) :: {:ok, LE.t() | nil} | {:error, map()}
def fetch_event_by_path(source_token, path, value)
when is_atom(source_token) and is_binary(path) do
ContextCache.apply_fun(LogEvents, @fetch_event_by_path, [source_token, path, value])
end

@spec put(atom(), term(), LE.t()) :: {:error, boolean} | {:ok, boolean}
def put(source_token, key, log_event) do
Cachex.put(__MODULE__, {source_token, key}, log_event)
end

@spec get!(atom(), term()) :: LE.t() | nil
def get!(source_token, log_id) do
Cachex.get!(__MODULE__, {source_token, log_id})
end

def name, do: __MODULE__
end
13 changes: 12 additions & 1 deletion lib/logflare/billing/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,18 @@ defmodule Logflare.Billing.Cache do
id: __MODULE__,
start:
{Cachex, :start_link,
[__MODULE__, [stats: stats, expiration: Utils.cache_expiration_min(), limit: 100_000]]}
[
__MODULE__,
[
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(100_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_min()
]
]}
}
end

Expand Down
9 changes: 7 additions & 2 deletions lib/logflare/context_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ defmodule Logflare.ContextCache do
[
@cache,
[
stats: stats,
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(100_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_min()
]
]}
Expand Down Expand Up @@ -93,7 +98,7 @@ defmodule Logflare.ContextCache do
def bust_keys(values) when is_list(values) do
for {context, primary_key} <- values do
filter = {:==, {:element, 1, :key}, {{context, primary_key}}}
query = Cachex.Query.create(filter, {:key, :value})
query = Cachex.Query.build(where: filter, output: {:key, :value})
context_cache = cache_name(context)

Logflare.ContextCache
Expand Down
15 changes: 11 additions & 4 deletions lib/logflare/logs/log_events_cache.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
defmodule Logflare.Logs.LogEvents.Cache do
@moduledoc false
import Cachex.Spec
alias Logflare.Logs.LogEvents
alias Logflare.ContextCache
alias Logflare.LogEvent, as: LE
@ttl :timer.hours(1)
alias Logflare.Utils

@cache __MODULE__

Expand All @@ -19,7 +18,15 @@ defmodule Logflare.Logs.LogEvents.Cache do
:start_link,
[
@cache,
[expiration: expiration(default: @ttl), limit: limit(size: 5_000), stats: stats]
[
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(5_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_min(60)
]
]
}
}
Expand All @@ -29,7 +36,7 @@ defmodule Logflare.Logs.LogEvents.Cache do
@spec put_event_with_id_and_timestamp(atom, keyword, LE.t()) :: term
def put_event_with_id_and_timestamp(source_token, kw, %LE{} = log_event) do
cache_key = {@fetch_event_by_id_and_timestamp_key, [source_token, kw]}
Cachex.put(@cache, cache_key, {:ok, log_event}, ttl: @ttl)
Cachex.put(@cache, cache_key, {:ok, log_event})
end

@spec fetch_event_by_id_and_timestamp(atom, keyword) :: {:ok, map()} | {:error, map()}
Expand Down
40 changes: 30 additions & 10 deletions lib/logflare/logs/rejected_log_events.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,42 @@ defmodule Logflare.Logs.RejectedLogEvents do
}'
```
"""
require Ex2ms
alias Logflare.Source
alias Logflare.LogEvent, as: LE
alias Logflare.Utils

@cache __MODULE__

def child_spec(_) do
stats = Application.get_env(:logflare, :cache_stats, false)
%{id: @cache, start: {Cachex, :start_link, [@cache, [limit: 10_000, stats: stats]]}}

%{
id: @cache,
start:
{Cachex, :start_link,
[
@cache,
[
expiration: Utils.cache_expiration_min(60),
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(10_000)
]
|> Enum.filter(& &1)
]
]}
}
end

@spec get_by_source(Source.t()) :: list(LE.t())
def get_by_source(%Source{token: token}) do
get!(token).log_events
get!(token)
|> case do
%{log_events: events} -> events
other -> other
end
end

def count(%Source{} = s) do
Expand Down Expand Up @@ -63,15 +86,12 @@ defmodule Logflare.Logs.RejectedLogEvents do
:ok
end

def query(source_id) when is_atom(source_id) do
def query(token) when is_atom(token) do
filter = {:==, {:element, 1, :key}, {:const, token}}
query = Cachex.Query.build(where: filter, output: :value)

@cache
|> Cachex.stream!()
|> Stream.filter(fn x ->
match?({:entry, {^source_id, _le_id}, _ts, _, _le}, x)
end)
|> Stream.map(fn {:entry, {^source_id, _le_id}, _ts, _, le} ->
le
end)
|> Cachex.stream!(query)
|> Enum.reverse()
|> Enum.take(100)
end
Expand Down
13 changes: 12 additions & 1 deletion lib/logflare/partners/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,18 @@ defmodule Logflare.Partners.Cache do
id: __MODULE__,
start:
{Cachex, :start_link,
[__MODULE__, [stats: stats, expiration: Utils.cache_expiration_min(), limit: 100_000]]}
[
__MODULE__,
[
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(100_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_min()
]
]}
}
end

Expand Down
14 changes: 13 additions & 1 deletion lib/logflare/pubsub_rates/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,19 @@ defmodule Logflare.PubSubRates.Cache do
%{
id: __MODULE__,
start:
{Cachex, :start_link, [@cache, [stats: stats, expiration: Utils.cache_expiration_min(5)]]}
{Cachex, :start_link,
[
@cache,
[
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(100_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_min(5)
]
]}
}
end

Expand Down
10 changes: 7 additions & 3 deletions lib/logflare/source_schemas/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ defmodule Logflare.SourceSchemas.Cache do
[
__MODULE__,
[
stats: stats,
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(100_000)
]
|> Enum.filter(& &1),
# shorter expiration for schemas
expiration: Utils.cache_expiration_min(10, 2),
limit: 100_000
expiration: Utils.cache_expiration_min(10, 2)
]
]}
}
Expand Down
13 changes: 12 additions & 1 deletion lib/logflare/sources/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@ defmodule Logflare.Sources.Cache do
id: __MODULE__,
start:
{Cachex, :start_link,
[__MODULE__, [stats: stats, expiration: Utils.cache_expiration_min(), limit: 100_000]]}
[
__MODULE__,
[
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(100_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_min()
]
]}
}
end

Expand Down
13 changes: 12 additions & 1 deletion lib/logflare/team_users/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,18 @@ defmodule Logflare.TeamUsers.Cache do
id: __MODULE__,
start:
{Cachex, :start_link,
[__MODULE__, [stats: stats, expiration: Utils.cache_expiration_min(), limit: 100_000]]}
[
__MODULE__,
[
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(100_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_min()
]
]}
}
end

Expand Down
13 changes: 12 additions & 1 deletion lib/logflare/users/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,18 @@ defmodule Logflare.Users.Cache do
id: __MODULE__,
start:
{Cachex, :start_link,
[__MODULE__, [stats: stats, expiration: Utils.cache_expiration_min(), limit: 100_000]]}
[
__MODULE__,
[
hooks:
[
if(stats, do: Utils.cache_stats()),
Utils.cache_limit(100_000)
]
|> Enum.filter(& &1),
expiration: Utils.cache_expiration_min()
]
]}
}
end

Expand Down
Loading

0 comments on commit 0ca5a2b

Please sign in to comment.