From a7ba1c1269c5ec09c8c353f8edc63592b458da34 Mon Sep 17 00:00:00 2001 From: Denys Metelov Date: Fri, 8 Nov 2024 18:25:21 +0200 Subject: [PATCH] Add partition_remove_hook logging and metrics --- app/jobs/jobs/partition_removing.rb | 29 +++ .../partition_remove_hook_collector.rb | 49 +++++ .../partition_remove_hook_processor.rb | 24 +++ lib/prometheus_collectors.rb | 4 + spec/jobs/jobs/partition_removing_spec.rb | 192 +++++++++++++----- .../partition_remove_hook_collector_spec.rb | 67 ++++++ .../partition_remove_hook_processor_spec.rb | 29 +++ spec/support/helpers/custom_rspec_helper.rb | 26 +++ 8 files changed, 372 insertions(+), 48 deletions(-) create mode 100644 lib/prometheus/partition_remove_hook_collector.rb create mode 100644 lib/prometheus/partition_remove_hook_processor.rb create mode 100644 spec/lib/prometheus/partition_remove_hook_collector_spec.rb create mode 100644 spec/lib/prometheus/partition_remove_hook_processor_spec.rb diff --git a/app/jobs/jobs/partition_removing.rb b/app/jobs/jobs/partition_removing.rb index 02b34107b..d95932112 100644 --- a/app/jobs/jobs/partition_removing.rb +++ b/app/jobs/jobs/partition_removing.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require 'prometheus/partition_remove_hook_processor' + module Jobs class PartitionRemoving < ::BaseJob self.cron_line = '20 * * * *' @@ -51,15 +53,23 @@ def remove_partition!(partition_class, model_class) cdr_remove_candidate = cdr_collection.first if YetiConfig.partition_remove_hook.present? + start_time = get_time cmd = "#{YetiConfig.partition_remove_hook} #{cdr_remove_candidate.class} #{cdr_remove_candidate.parent_table} #{cdr_remove_candidate.name}" + logger.info { "Running partition removing hook: #{cmd}" } + collect_prometheus_executions_metric! + return_value, sout, serr = execute_cmd(cmd) + logger.info { "Partition remove hook stdout:\n#{sout}" } logger.info { "Partition remove hook stderr:\n#{serr}" } + collect_prometheus_duration_metric!(start_time) + if return_value.success? logger.info { "Partition remove hook succeed, exit code: #{return_value.exitstatus}" } else logger.info { "Partition remove hook failed: #{return_value.exitstatus}. Stopping removing procedure" } + collect_prometheus_errors_metric! return end end @@ -77,5 +87,24 @@ def remove_partition!(partition_class, model_class) capture_error(e, extra: { partition_class: partition_class.name, model_class: model_class.name }) raise e end + + def collect_prometheus_executions_metric! + PartitionRemoveHookProcessor.collect_executions_metric if PrometheusConfig.enabled? + end + + def collect_prometheus_errors_metric! + PartitionRemoveHookProcessor.collect_errors_metric if PrometheusConfig.enabled? + end + + def collect_prometheus_duration_metric!(start_time) + return unless PrometheusConfig.enabled? + + duration = duration = (get_time - start_time).round(6) + PartitionRemoveHookProcessor.collect_duration_metric(duration) + end + + def get_time + ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + end end end diff --git a/lib/prometheus/partition_remove_hook_collector.rb b/lib/prometheus/partition_remove_hook_collector.rb new file mode 100644 index 000000000..e49e7c25b --- /dev/null +++ b/lib/prometheus/partition_remove_hook_collector.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +class PartitionRemoveHookCollector < PrometheusExporter::Server::TypeCollector + MAX_METRIC_AGE = 30 + + def initialize + @data = [] + @observers = { + 'executions' => PrometheusExporter::Metric::Counter.new('ruby_yeti_partition_removing_hook_executions', 'Sum of Yeti Partition Remove Hook execution runs'), + 'errors' => PrometheusExporter::Metric::Counter.new('ruby_yeti_partition_removing_hook_errors', 'Sum of Yeti Partition Remove Hook execution fails'), + 'duration' => PrometheusExporter::Metric::Counter.new('ruby_yeti_partition_removing_hook_duration', 'Sum of Yeti Partition Remove Hook execution duration in seconds') + } + end + + def type + 'ruby_yeti_partition_removing_hook' + end + + def metrics + return [] if @data.empty? + + metrics = {} + @data.each do |obj| + labels = {} + # labels are passed by processor + labels.merge!(obj['metric_labels']) if obj['metric_labels'] + # custom_labels are passed by PrometheusExporter::Client + labels.merge!(obj['custom_labels']) if obj['custom_labels'] + + @observers.each do |name, observer| + value = obj[name] + if value + observer.observe(value, labels) + metrics[name] = observer + end + end + end + + metrics.values + end + + def collect(obj) + now = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + + obj['created_at'] = now + @data.delete_if { |m| m['created_at'] + MAX_METRIC_AGE < now } + @data << obj + end +end diff --git a/lib/prometheus/partition_remove_hook_processor.rb b/lib/prometheus/partition_remove_hook_processor.rb new file mode 100644 index 000000000..2d5619a1d --- /dev/null +++ b/lib/prometheus/partition_remove_hook_processor.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +require_relative './base_processor' + +class PartitionRemoveHookProcessor < BaseProcessor + self.logger = Rails.logger + self.type = 'ruby_yeti_partition_removing_hook' + + def collect(metric) + format_metric(metric) + end + + def self.collect_executions_metric + collect(executions: 1) + end + + def self.collect_errors_metric + collect(errors: 1) + end + + def self.collect_duration_metric(duration) + collect(duration:) + end +end diff --git a/lib/prometheus_collectors.rb b/lib/prometheus_collectors.rb index 1d62c73bd..cdc0935fd 100644 --- a/lib/prometheus_collectors.rb +++ b/lib/prometheus_collectors.rb @@ -2,6 +2,10 @@ # Custom prometheus type collectors should be defined as lib/prometheus/*_collector.rb files. +# Disabling buffering for stdout and stderr +$stderr.sync = true +$stdout.sync = true + require 'active_support/all' Dir[File.join(__dir__, 'prometheus/*_collector.rb')].each do |filename| diff --git a/spec/jobs/jobs/partition_removing_spec.rb b/spec/jobs/jobs/partition_removing_spec.rb index 939ff53b5..4766b6c4c 100644 --- a/spec/jobs/jobs/partition_removing_spec.rb +++ b/spec/jobs/jobs/partition_removing_spec.rb @@ -1,11 +1,14 @@ # frozen_string_literal: true +require_relative Rails.root.join('lib/prometheus/partition_remove_hook_processor') + RSpec.describe Jobs::PartitionRemoving, '#call' do subject do job.call end let(:job) { described_class.new(double) } + let(:prometheus_enabled) { false } before do expect(job).to receive(:partition_remove_delay).exactly(5).times.and_return( @@ -16,72 +19,165 @@ 'logs.api_requests' => '5 days' ) PartitionModel::Cdr.to_a.each(&:destroy!) + + allow(PrometheusConfig).to receive(:enabled?).and_return(prometheus_enabled) end - context 'when there are old Cdr::Cdr partitions' do - before do - Cdr::Cdr.add_partition_for Time.now.utc - Cdr::Cdr.add_partition_for 1.day.from_now.utc - Cdr::Cdr.add_partition_for 1.day.ago.utc - Cdr::Cdr.add_partition_for 2.days.ago.utc - Cdr::Cdr.add_partition_for 3.days.ago.utc - Cdr::Cdr.add_partition_for 4.days.ago.utc - Cdr::Cdr.add_partition_for 5.days.ago.utc + shared_examples :should_not_collect_prometheus_metrics do + it 'should NOT collect Prometheus metrics' do + expect(PartitionRemoveHookProcessor).to_not receive(:collect) + subject end + end - it 'removes correct cdr.cdr partition table' do - date = 5.days.ago.utc.to_date - expect { subject }.to change { - SqlCaller::Cdr.table_exist?("cdr.cdr_#{date.strftime('%Y_%m_%d')}") - }.from(true).to(false) - end + context 'without partition_remove_hook' do + context 'when there are old Cdr::Cdr partitions' do + before do + Cdr::Cdr.add_partition_for Time.current + Cdr::Cdr.add_partition_for 1.day.from_now + Cdr::Cdr.add_partition_for 1.day.ago + Cdr::Cdr.add_partition_for 2.days.ago + Cdr::Cdr.add_partition_for 3.days.ago + Cdr::Cdr.add_partition_for 4.days.ago + Cdr::Cdr.add_partition_for 5.days.ago + end - it 'does not remove any other cdr partitions' do - expect { subject }.to change { PartitionModel::Cdr.all.size }.by(-1) - end - end + include_examples :should_not_collect_prometheus_metrics - context 'when there are no old Cdr::Cdr partitions' do - before do - Cdr::Cdr.add_partition_for Time.now.utc - Cdr::Cdr.add_partition_for 1.day.from_now.utc - Cdr::Cdr.add_partition_for 1.day.ago.utc - Cdr::Cdr.add_partition_for 2.days.ago.utc - Cdr::Cdr.add_partition_for 3.days.ago.utc + it 'removes correct cdr.cdr partition table' do + date = 5.days.ago.to_date + expect { subject }.to change { + SqlCaller::Cdr.table_exist?("cdr.cdr_#{date.strftime('%Y_%m_%d')}") + }.from(true).to(false) + end + + it 'does not remove any other cdr partitions' do + expect { subject }.to change { PartitionModel::Cdr.all.size }.by(-1) + end + + context 'when Prometheus is enabled' do + let(:prometheus_enabled) { true } + + include_examples :should_not_collect_prometheus_metrics + end end - it 'does not remove last cdr.cdr partition table' do - date = 3.days.ago.utc.to_date - expect { subject }.not_to change { - SqlCaller::Cdr.table_exist?("cdr.cdr_#{date.strftime('%Y_%m_%d')}") - }.from(true) + context 'when there are no old Cdr::Cdr partitions' do + before do + Cdr::Cdr.add_partition_for Time.current + Cdr::Cdr.add_partition_for 1.day.from_now + Cdr::Cdr.add_partition_for 1.day.ago + Cdr::Cdr.add_partition_for 2.days.ago + Cdr::Cdr.add_partition_for 3.days.ago + end + + include_examples :should_not_collect_prometheus_metrics + + it 'does not remove last cdr.cdr partition table' do + date = 3.days.ago.to_date + expect { subject }.not_to change { + SqlCaller::Cdr.table_exist?("cdr.cdr_#{date.strftime('%Y_%m_%d')}") + }.from(true) + end + + it 'does not remove any other cdr partitions' do + expect { subject }.not_to change { PartitionModel::Cdr.all.size } + end + + context 'when Prometheus is enabled' do + let(:prometheus_enabled) { true } + + include_examples :should_not_collect_prometheus_metrics + end end - it 'does not remove any other cdr partitions' do - expect { subject }.not_to change { PartitionModel::Cdr.all.size } + context 'when there are old Log::ApiLog partitions' do + before do + Log::ApiLog.add_partition_for Date.parse('2019-01-02') + Log::ApiLog.add_partition_for Time.current + Log::ApiLog.add_partition_for 1.day.from_now + Log::ApiLog.add_partition_for 1.day.ago + Log::ApiLog.add_partition_for 2.days.ago + Log::ApiLog.add_partition_for 3.days.ago + Log::ApiLog.add_partition_for 4.days.ago + Log::ApiLog.add_partition_for Time.parse('2018-10-02 00:00:00 UTC') + end + + include_examples :should_not_collect_prometheus_metrics + + it 'removes correct logs.api_requests partition table' do + expect { subject }.to change { + SqlCaller::Yeti.table_exist?('logs.api_requests_2018_10_02') + }.from(true).to(false) + end + + it 'does not remove any other yeti partitions' do + expect { subject }.to change { PartitionModel::Log.all.size }.by(-1) + end + + context 'when Prometheus is enabled' do + let(:prometheus_enabled) { true } + + include_examples :should_not_collect_prometheus_metrics + end end end - context 'when there are old Log::ApiLog partitions' do + context 'with partition_remove_hook' do + let(:hook_value) { 'echo' } + before do - Log::ApiLog.add_partition_for Date.parse('2019-01-02') - Log::ApiLog.add_partition_for Time.now.utc - Log::ApiLog.add_partition_for 1.day.from_now.utc - Log::ApiLog.add_partition_for 1.day.ago.utc - Log::ApiLog.add_partition_for 2.days.ago.utc - Log::ApiLog.add_partition_for 3.days.ago.utc - Log::ApiLog.add_partition_for 4.days.ago.utc - Log::ApiLog.add_partition_for Time.parse('2018-10-02 00:00:00 UTC') + Cdr::Cdr.add_partition_for Time.current + Cdr::Cdr.add_partition_for 1.day.from_now + Cdr::Cdr.add_partition_for 1.day.ago + Cdr::Cdr.add_partition_for 2.days.ago + Cdr::Cdr.add_partition_for 3.days.ago + Cdr::Cdr.add_partition_for 4.days.ago + Cdr::Cdr.add_partition_for 5.days.ago + + allow(YetiConfig).to receive(:partition_remove_hook).and_return(hook_value) end - it 'removes correct logs.api_requests partition table' do - expect { subject }.to change { - SqlCaller::Yeti.table_exist?('logs.api_requests_2018_10_02') - }.from(true).to(false) + shared_examples :should_drop_partition do + it 'shoudl drop correct cdr.cdr partition table' do + expect { subject }.to change { + SqlCaller::Cdr.table_exist?("cdr.cdr_#{5.days.ago.to_date.strftime('%Y_%m_%d')}") + }.from(true).to(false) + end end - it 'does not remove any other yeti partitions' do - expect { subject }.to change { PartitionModel::Log.all.size }.by(-1) + context 'when there are old Cdr::Cdr partitions' do + include_examples :should_drop_partition + include_examples :should_not_collect_prometheus_metrics + + context 'when Prometheus is enabled' do + let(:prometheus_enabled) { true } + + context 'when hook successful' do + include_examples :should_drop_partition + + it 'should collect Prometheus metrics' do + expect(PartitionRemoveHookProcessor).to receive(:collect).with(executions: 1) + expect(PartitionRemoveHookProcessor).to receive(:collect).with(duration: be_present) + subject + end + end + + context 'when hook unsuccessful' do + let(:hook_value) { 'false' } + + it 'should NOT drop partition' do + expect { subject }.not_to change { PartitionModel::Cdr.all.size } + end + + it 'should collect Prometheus metrics' do + expect(PartitionRemoveHookProcessor).to receive(:collect).with(executions: 1) + expect(PartitionRemoveHookProcessor).to receive(:collect).with(duration: be_present) + expect(PartitionRemoveHookProcessor).to receive(:collect).with(errors: 1) + subject + end + end + end end end end diff --git a/spec/lib/prometheus/partition_remove_hook_collector_spec.rb b/spec/lib/prometheus/partition_remove_hook_collector_spec.rb new file mode 100644 index 000000000..c336d83f4 --- /dev/null +++ b/spec/lib/prometheus/partition_remove_hook_collector_spec.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +require 'prometheus_exporter/server' +require_relative Rails.root.join('lib/prometheus/partition_remove_hook_collector') + +RSpec.describe PartitionRemoveHookCollector, '#metrics' do + subject { described_instance.metrics.map(&:metric_text).compact_blank.map { |metrics| metrics.split("\n") }.flatten } + + let(:described_instance) { described_class.new } + let(:data_collect_interval) { 0 } + let(:data) { [metric_executions, metric_errors, metric_duration] } + let(:metric_executions) { { executions: 1 } } + let(:metric_errors) { { errors: 1 } } + let(:metric_duration) { { duration: 13 } } + let(:expected_metric_executions_lines) { ['ruby_yeti_partition_removing_hook_executions 1'] } + let(:expected_metric_errors_lines) { ['ruby_yeti_partition_removing_hook_errors 1'] } + let(:expected_metric_duarion_lines) { ['ruby_yeti_partition_removing_hook_duration 13'] } + + before do + travel_process_clock(data_collect_interval) do + data.map { |obj| described_instance.collect(obj.deep_stringify_keys) } + end + end + + context 'when metric_executions, metric_errors and metric_duration collected now' do + let(:data_collect_interval) { 0 } + + it 'responds with filled metric_executions, metric_errors and metric_duration' do + expect(subject).to match_array(expected_metric_executions_lines + expected_metric_errors_lines + expected_metric_duarion_lines) + end + end + + context 'when metric_executions, metric_errors and metric_duration collected 10 seconds ago' do + let(:data_collect_interval) { -10 } + + it 'responds with filled metric_executions, metric_errors and metric_duration' do + expect(subject).to match_array(expected_metric_executions_lines + expected_metric_errors_lines + expected_metric_duarion_lines) + end + end + + context 'when metric_executions, metric_errors and metric_duration collected 35 seconds ago' do + let(:data_collect_interval) { -35 } + + it 'responds with filled metric_executions, metric_errors and metric_duration' do + expect(subject).to match_array(expected_metric_executions_lines + expected_metric_errors_lines + expected_metric_duarion_lines) + end + end + + context 'when metric_executions and metric_duration collected 35 seconds ago and metric_errors collected 2 seconds ago' do + let(:data_collect_interval) { -35 } + let(:data) { [metric_executions, metric_duration] } + + before { travel_process_clock(-2) { described_instance.collect(metric_errors.deep_stringify_keys) } } + + it 'responds with empty metric_executions and metric_duration but filled metric_errors' do + expect(subject).to eq(expected_metric_errors_lines) + end + end + + context 'without metrics collected' do + let(:data) { [] } + + it 'responds without any metrics' do + expect(subject).to be_empty + end + end +end diff --git a/spec/lib/prometheus/partition_remove_hook_processor_spec.rb b/spec/lib/prometheus/partition_remove_hook_processor_spec.rb new file mode 100644 index 000000000..22b0a9953 --- /dev/null +++ b/spec/lib/prometheus/partition_remove_hook_processor_spec.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +require_relative Rails.root.join('lib/prometheus/partition_remove_hook_processor') + +RSpec.describe PartitionRemoveHookProcessor do + describe '.collect_executions_metric' do + subject { PartitionRemoveHookProcessor.collect_executions_metric } + + it 'responds with correct metrics' do + expect(subject).to eq({ type: 'ruby_yeti_partition_removing_hook', executions: 1, metric_labels: {} }) + end + end + + describe '.collect_errors_metric' do + subject { PartitionRemoveHookProcessor.collect_errors_metric } + + it 'responds with correct metrics' do + expect(subject).to eq({ type: 'ruby_yeti_partition_removing_hook', errors: 1, metric_labels: {} }) + end + end + + describe '.collect_duration_metric' do + subject { PartitionRemoveHookProcessor.collect_duration_metric(13) } + + it 'responds with correct metrics' do + expect(subject).to eq({ type: 'ruby_yeti_partition_removing_hook', duration: 13, metric_labels: {} }) + end + end +end diff --git a/spec/support/helpers/custom_rspec_helper.rb b/spec/support/helpers/custom_rspec_helper.rb index 366887b41..173a1135f 100644 --- a/spec/support/helpers/custom_rspec_helper.rb +++ b/spec/support/helpers/custom_rspec_helper.rb @@ -16,4 +16,30 @@ def with_real_active_job_adapter queue_adapter_changed_jobs.each { |klass| klass.enable_test_adapter(queue_adapter_for_test) } end end + + # Allows to move Process.clock_gettime within block + # @param interval [Float,Integer] will change Process.clock_gettime by interval + # @yield within interval shift. + # ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + # will freeze on current + interval inside block + # @example Usage: + # + # ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) # will be for example 1900 + # travel_process_clock(-100) do + # ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) # will be freezed on 1800 + # end + # ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) # will be greater than 1900 and unfreezed + # + def travel_process_clock(interval) + raise ArgumentError, 'block must be passed' unless block_given? + + now = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + stubs = ActiveSupport::Testing::SimpleStubs.new + stubs.stub_object(Process, :clock_gettime) { |*_args| now + interval } + begin + yield + ensure + stubs.unstub_all! + end + end end