Skip to content

Commit

Permalink
Merge pull request #1610 from Skumring/Add-partition-remove-hook-logg…
Browse files Browse the repository at this point in the history
…ing-and-metrics

Add partition_remove_hook logging and metrics
  • Loading branch information
dmitry-sinina authored Nov 9, 2024
2 parents 33cde73 + a7ba1c1 commit 265fad8
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 48 deletions.
29 changes: 29 additions & 0 deletions app/jobs/jobs/partition_removing.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require 'prometheus/partition_remove_hook_processor'

module Jobs
class PartitionRemoving < ::BaseJob
self.cron_line = '20 * * * *'
Expand Down Expand Up @@ -51,15 +53,23 @@ def remove_partition!(partition_class, model_class)
cdr_remove_candidate = cdr_collection.first

if YetiConfig.partition_remove_hook.present?
start_time = get_time
cmd = "#{YetiConfig.partition_remove_hook} #{cdr_remove_candidate.class} #{cdr_remove_candidate.parent_table} #{cdr_remove_candidate.name}"

logger.info { "Running partition removing hook: #{cmd}" }
collect_prometheus_executions_metric!

return_value, sout, serr = execute_cmd(cmd)

logger.info { "Partition remove hook stdout:\n#{sout}" }
logger.info { "Partition remove hook stderr:\n#{serr}" }
collect_prometheus_duration_metric!(start_time)

if return_value.success?
logger.info { "Partition remove hook succeed, exit code: #{return_value.exitstatus}" }
else
logger.info { "Partition remove hook failed: #{return_value.exitstatus}. Stopping removing procedure" }
collect_prometheus_errors_metric!
return
end
end
Expand All @@ -77,5 +87,24 @@ def remove_partition!(partition_class, model_class)
capture_error(e, extra: { partition_class: partition_class.name, model_class: model_class.name })
raise e
end

def collect_prometheus_executions_metric!
PartitionRemoveHookProcessor.collect_executions_metric if PrometheusConfig.enabled?
end

def collect_prometheus_errors_metric!
PartitionRemoveHookProcessor.collect_errors_metric if PrometheusConfig.enabled?
end

def collect_prometheus_duration_metric!(start_time)
return unless PrometheusConfig.enabled?

duration = duration = (get_time - start_time).round(6)
PartitionRemoveHookProcessor.collect_duration_metric(duration)
end

def get_time
::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
end
end
end
49 changes: 49 additions & 0 deletions lib/prometheus/partition_remove_hook_collector.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# frozen_string_literal: true

class PartitionRemoveHookCollector < PrometheusExporter::Server::TypeCollector
MAX_METRIC_AGE = 30

def initialize
@data = []
@observers = {
'executions' => PrometheusExporter::Metric::Counter.new('ruby_yeti_partition_removing_hook_executions', 'Sum of Yeti Partition Remove Hook execution runs'),
'errors' => PrometheusExporter::Metric::Counter.new('ruby_yeti_partition_removing_hook_errors', 'Sum of Yeti Partition Remove Hook execution fails'),
'duration' => PrometheusExporter::Metric::Counter.new('ruby_yeti_partition_removing_hook_duration', 'Sum of Yeti Partition Remove Hook execution duration in seconds')
}
end

def type
'ruby_yeti_partition_removing_hook'
end

def metrics
return [] if @data.empty?

metrics = {}
@data.each do |obj|
labels = {}
# labels are passed by processor
labels.merge!(obj['metric_labels']) if obj['metric_labels']
# custom_labels are passed by PrometheusExporter::Client
labels.merge!(obj['custom_labels']) if obj['custom_labels']

@observers.each do |name, observer|
value = obj[name]
if value
observer.observe(value, labels)
metrics[name] = observer
end
end
end

metrics.values
end

def collect(obj)
now = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)

obj['created_at'] = now
@data.delete_if { |m| m['created_at'] + MAX_METRIC_AGE < now }
@data << obj
end
end
24 changes: 24 additions & 0 deletions lib/prometheus/partition_remove_hook_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

require_relative './base_processor'

class PartitionRemoveHookProcessor < BaseProcessor
self.logger = Rails.logger
self.type = 'ruby_yeti_partition_removing_hook'

def collect(metric)
format_metric(metric)
end

def self.collect_executions_metric
collect(executions: 1)
end

def self.collect_errors_metric
collect(errors: 1)
end

def self.collect_duration_metric(duration)
collect(duration:)
end
end
4 changes: 4 additions & 0 deletions lib/prometheus_collectors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

# Custom prometheus type collectors should be defined as lib/prometheus/*_collector.rb files.

# Disabling buffering for stdout and stderr
$stderr.sync = true
$stdout.sync = true

require 'active_support/all'

Dir[File.join(__dir__, 'prometheus/*_collector.rb')].each do |filename|
Expand Down
192 changes: 144 additions & 48 deletions spec/jobs/jobs/partition_removing_spec.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# frozen_string_literal: true

require_relative Rails.root.join('lib/prometheus/partition_remove_hook_processor')

RSpec.describe Jobs::PartitionRemoving, '#call' do
subject do
job.call
end

let(:job) { described_class.new(double) }
let(:prometheus_enabled) { false }

before do
expect(job).to receive(:partition_remove_delay).exactly(5).times.and_return(
Expand All @@ -16,72 +19,165 @@
'logs.api_requests' => '5 days'
)
PartitionModel::Cdr.to_a.each(&:destroy!)

allow(PrometheusConfig).to receive(:enabled?).and_return(prometheus_enabled)
end

context 'when there are old Cdr::Cdr partitions' do
before do
Cdr::Cdr.add_partition_for Time.now.utc
Cdr::Cdr.add_partition_for 1.day.from_now.utc
Cdr::Cdr.add_partition_for 1.day.ago.utc
Cdr::Cdr.add_partition_for 2.days.ago.utc
Cdr::Cdr.add_partition_for 3.days.ago.utc
Cdr::Cdr.add_partition_for 4.days.ago.utc
Cdr::Cdr.add_partition_for 5.days.ago.utc
shared_examples :should_not_collect_prometheus_metrics do
it 'should NOT collect Prometheus metrics' do
expect(PartitionRemoveHookProcessor).to_not receive(:collect)
subject
end
end

it 'removes correct cdr.cdr partition table' do
date = 5.days.ago.utc.to_date
expect { subject }.to change {
SqlCaller::Cdr.table_exist?("cdr.cdr_#{date.strftime('%Y_%m_%d')}")
}.from(true).to(false)
end
context 'without partition_remove_hook' do
context 'when there are old Cdr::Cdr partitions' do
before do
Cdr::Cdr.add_partition_for Time.current
Cdr::Cdr.add_partition_for 1.day.from_now
Cdr::Cdr.add_partition_for 1.day.ago
Cdr::Cdr.add_partition_for 2.days.ago
Cdr::Cdr.add_partition_for 3.days.ago
Cdr::Cdr.add_partition_for 4.days.ago
Cdr::Cdr.add_partition_for 5.days.ago
end

it 'does not remove any other cdr partitions' do
expect { subject }.to change { PartitionModel::Cdr.all.size }.by(-1)
end
end
include_examples :should_not_collect_prometheus_metrics

context 'when there are no old Cdr::Cdr partitions' do
before do
Cdr::Cdr.add_partition_for Time.now.utc
Cdr::Cdr.add_partition_for 1.day.from_now.utc
Cdr::Cdr.add_partition_for 1.day.ago.utc
Cdr::Cdr.add_partition_for 2.days.ago.utc
Cdr::Cdr.add_partition_for 3.days.ago.utc
it 'removes correct cdr.cdr partition table' do
date = 5.days.ago.to_date
expect { subject }.to change {
SqlCaller::Cdr.table_exist?("cdr.cdr_#{date.strftime('%Y_%m_%d')}")
}.from(true).to(false)
end

it 'does not remove any other cdr partitions' do
expect { subject }.to change { PartitionModel::Cdr.all.size }.by(-1)
end

context 'when Prometheus is enabled' do
let(:prometheus_enabled) { true }

include_examples :should_not_collect_prometheus_metrics
end
end

it 'does not remove last cdr.cdr partition table' do
date = 3.days.ago.utc.to_date
expect { subject }.not_to change {
SqlCaller::Cdr.table_exist?("cdr.cdr_#{date.strftime('%Y_%m_%d')}")
}.from(true)
context 'when there are no old Cdr::Cdr partitions' do
before do
Cdr::Cdr.add_partition_for Time.current
Cdr::Cdr.add_partition_for 1.day.from_now
Cdr::Cdr.add_partition_for 1.day.ago
Cdr::Cdr.add_partition_for 2.days.ago
Cdr::Cdr.add_partition_for 3.days.ago
end

include_examples :should_not_collect_prometheus_metrics

it 'does not remove last cdr.cdr partition table' do
date = 3.days.ago.to_date
expect { subject }.not_to change {
SqlCaller::Cdr.table_exist?("cdr.cdr_#{date.strftime('%Y_%m_%d')}")
}.from(true)
end

it 'does not remove any other cdr partitions' do
expect { subject }.not_to change { PartitionModel::Cdr.all.size }
end

context 'when Prometheus is enabled' do
let(:prometheus_enabled) { true }

include_examples :should_not_collect_prometheus_metrics
end
end

it 'does not remove any other cdr partitions' do
expect { subject }.not_to change { PartitionModel::Cdr.all.size }
context 'when there are old Log::ApiLog partitions' do
before do
Log::ApiLog.add_partition_for Date.parse('2019-01-02')
Log::ApiLog.add_partition_for Time.current
Log::ApiLog.add_partition_for 1.day.from_now
Log::ApiLog.add_partition_for 1.day.ago
Log::ApiLog.add_partition_for 2.days.ago
Log::ApiLog.add_partition_for 3.days.ago
Log::ApiLog.add_partition_for 4.days.ago
Log::ApiLog.add_partition_for Time.parse('2018-10-02 00:00:00 UTC')
end

include_examples :should_not_collect_prometheus_metrics

it 'removes correct logs.api_requests partition table' do
expect { subject }.to change {
SqlCaller::Yeti.table_exist?('logs.api_requests_2018_10_02')
}.from(true).to(false)
end

it 'does not remove any other yeti partitions' do
expect { subject }.to change { PartitionModel::Log.all.size }.by(-1)
end

context 'when Prometheus is enabled' do
let(:prometheus_enabled) { true }

include_examples :should_not_collect_prometheus_metrics
end
end
end

context 'when there are old Log::ApiLog partitions' do
context 'with partition_remove_hook' do
let(:hook_value) { 'echo' }

before do
Log::ApiLog.add_partition_for Date.parse('2019-01-02')
Log::ApiLog.add_partition_for Time.now.utc
Log::ApiLog.add_partition_for 1.day.from_now.utc
Log::ApiLog.add_partition_for 1.day.ago.utc
Log::ApiLog.add_partition_for 2.days.ago.utc
Log::ApiLog.add_partition_for 3.days.ago.utc
Log::ApiLog.add_partition_for 4.days.ago.utc
Log::ApiLog.add_partition_for Time.parse('2018-10-02 00:00:00 UTC')
Cdr::Cdr.add_partition_for Time.current
Cdr::Cdr.add_partition_for 1.day.from_now
Cdr::Cdr.add_partition_for 1.day.ago
Cdr::Cdr.add_partition_for 2.days.ago
Cdr::Cdr.add_partition_for 3.days.ago
Cdr::Cdr.add_partition_for 4.days.ago
Cdr::Cdr.add_partition_for 5.days.ago

allow(YetiConfig).to receive(:partition_remove_hook).and_return(hook_value)
end

it 'removes correct logs.api_requests partition table' do
expect { subject }.to change {
SqlCaller::Yeti.table_exist?('logs.api_requests_2018_10_02')
}.from(true).to(false)
shared_examples :should_drop_partition do
it 'shoudl drop correct cdr.cdr partition table' do
expect { subject }.to change {
SqlCaller::Cdr.table_exist?("cdr.cdr_#{5.days.ago.to_date.strftime('%Y_%m_%d')}")
}.from(true).to(false)
end
end

it 'does not remove any other yeti partitions' do
expect { subject }.to change { PartitionModel::Log.all.size }.by(-1)
context 'when there are old Cdr::Cdr partitions' do
include_examples :should_drop_partition
include_examples :should_not_collect_prometheus_metrics

context 'when Prometheus is enabled' do
let(:prometheus_enabled) { true }

context 'when hook successful' do
include_examples :should_drop_partition

it 'should collect Prometheus metrics' do
expect(PartitionRemoveHookProcessor).to receive(:collect).with(executions: 1)
expect(PartitionRemoveHookProcessor).to receive(:collect).with(duration: be_present)
subject
end
end

context 'when hook unsuccessful' do
let(:hook_value) { 'false' }

it 'should NOT drop partition' do
expect { subject }.not_to change { PartitionModel::Cdr.all.size }
end

it 'should collect Prometheus metrics' do
expect(PartitionRemoveHookProcessor).to receive(:collect).with(executions: 1)
expect(PartitionRemoveHookProcessor).to receive(:collect).with(duration: be_present)
expect(PartitionRemoveHookProcessor).to receive(:collect).with(errors: 1)
subject
end
end
end
end
end
end
Loading

0 comments on commit 265fad8

Please sign in to comment.