Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve warning for insufficient file resources for PQ max_bytes #16656

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 63 additions & 12 deletions logstash-core/lib/logstash/persisted_queue_config_validator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def check(running_pipelines, pipeline_configs)
warn_msg = []
err_msg = []
queue_path_file_system = Hash.new # (String: queue path, String: file system)
required_free_bytes = Hash.new # (String: file system, Integer: size)
required_free_bytes = Hash.new # (String: file system, Integer: size)
current_usage_bytes = Hash.new # (String: file system, Integer: size)

pipeline_configs.select { |config| config.settings.get('queue.type') == 'persisted'}
.select { |config| config.settings.get('queue.max_bytes').to_i != 0 }
Expand All @@ -60,12 +61,13 @@ def check(running_pipelines, pipeline_configs)
check_queue_usage(warn_msg, pipeline_id, max_bytes, used_bytes)

queue_path_file_system[queue_path] = file_system
if used_bytes < max_bytes
required_free_bytes[file_system] = required_free_bytes.fetch(file_system, 0) + max_bytes - used_bytes
end
# Add max_bytes to required total for this filesystem
required_free_bytes[file_system] = required_free_bytes.fetch(file_system, 0) + max_bytes
# Track current usage separately
current_usage_bytes[file_system] = current_usage_bytes.fetch(file_system, 0) + used_bytes
end

check_disk_space(warn_msg, queue_path_file_system, required_free_bytes)
check_disk_space(warn_msg, queue_path_file_system, required_free_bytes, current_usage_bytes)

@last_check_pass = err_msg.empty? && warn_msg.empty?

Expand All @@ -85,15 +87,64 @@ def check_queue_usage(warn_msg, pipeline_id, max_bytes, used_bytes)
end
end

# Check disk has sufficient space for all queues reach their max bytes. Queues may config with different paths/ devices.
# Check disk has sufficient space for all queues reach their max bytes. Queues may config with different paths/devices.
# It uses the filesystem of the path and count the required bytes by filesystem
def check_disk_space(warn_msg, queue_path_file_system, required_free_bytes)
disk_warn_msg =
queue_path_file_system
.select { |queue_path, file_system| !FsUtil.hasFreeSpace(Paths.get(queue_path), required_free_bytes.fetch(file_system, 0)) }
.map { |queue_path, file_system| "The persistent queue on path \"#{queue_path}\" won't fit in file system \"#{file_system}\" when full. Please free or allocate #{required_free_bytes.fetch(file_system, 0)} more bytes." }
def check_disk_space(warn_msg, queue_path_file_system, required_free_bytes, current_usage_bytes)
# Group paths by filesystem
paths_by_filesystem = queue_path_file_system.group_by { |_, fs| fs }

# Only process filesystems that need more space
filesystems_needing_space = paths_by_filesystem.select do |file_system, paths|
additional_needed = required_free_bytes.fetch(file_system, 0) - current_usage_bytes.fetch(file_system, 0)
!FsUtil.hasFreeSpace(Paths.get(paths.first.first), additional_needed)
end

return if filesystems_needing_space.empty?

message_parts = [
"Persistent queues require more disk space than is available on #{filesystems_needing_space.size > 1 ? 'multiple filesystems' : 'a filesystem'}:",
""
]

# Add filesystem-specific information
filesystems_needing_space.each do |file_system, paths|
total_required = required_free_bytes.fetch(file_system, 0)
current_usage = current_usage_bytes.fetch(file_system, 0)
additional_needed = total_required - current_usage
fs_path = Paths.get(paths.first.first)
free_space = Files.getFileStore(fs_path).getUsableSpace

message_parts.concat([
"Filesystem '#{file_system}':",
"- Total space required: #{LogStash::Util::ByteValue.human_readable(total_required)}",
"- Currently free space: #{LogStash::Util::ByteValue.human_readable(free_space)}",
"- Current PQ usage: #{LogStash::Util::ByteValue.human_readable(current_usage)}",
"- Additional space needed: #{LogStash::Util::ByteValue.human_readable(additional_needed)}",
"",
"Individual queue requirements:",
*paths.map { |path, _|
used = get_page_size(::File.join(path, "page.*"))
[
" #{path}:",
" Current size: #{LogStash::Util::ByteValue.human_readable(used)}",
" Maximum size: #{LogStash::Util::ByteValue.human_readable(total_required / paths.size)}"
]
}.flatten,
"" # Empty line between filesystems
])
end

warn_msg << disk_warn_msg unless disk_warn_msg.empty?
# Add common footer
message_parts.concat([
"Please either:",
"1. Free up disk space",
"2. Reduce queue.max_bytes in your pipeline configurations",
"3. Move PQ storage to a filesystem with more available space",
"Note: Logstash may fail to start if this is not resolved.",
""
])

warn_msg << message_parts.join("\n")
end

def get_file_system(queue_path)
Expand Down
169 changes: 159 additions & 10 deletions logstash-core/spec/logstash/persisted_queue_config_validator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,173 @@
end

context("disk does not have sufficient space") do
# two pq with different paths
let(:settings1) { settings.dup.merge("queue.max_bytes" => "1000pb") }
let(:settings2) { settings1.dup.merge("path.queue" => Stud::Temporary.directory) }
let(:pipeline_id) { "main" }
# Create a pipeline config double that matches what the class expects
let(:pipeline_config) do
double("PipelineConfig").tap do |config|
allow(config).to receive(:pipeline_id).and_return(pipeline_id)
allow(config).to receive(:settings).and_return(
double("Settings").tap do |s|
allow(s).to receive(:get).with("queue.type").and_return("persisted")
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024) # 300GB
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024) # 64MB
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id)
allow(s).to receive(:get).with("path.queue").and_return(queue_path)
end
)
end
end

before do
allow(Dir).to receive(:glob).and_return(["page.1"])
allow(File).to receive(:size).and_return(25 * 1024 * 1024 * 1024)
allow(FsUtil).to receive(:hasFreeSpace).and_return(false)
allow(Files).to receive(:exists).and_return(true)

let(:pipeline_configs) do
LogStash::Config::Source::Local.new(settings1).pipeline_configs +
LogStash::Config::Source::Local.new(settings2).pipeline_configs
# Mock filesystem
mock_file_store = double("FileStore",
name: "disk1",
getUsableSpace: 100 * 1024 * 1024 * 1024 # 100GB free
)
allow(Files).to receive(:getFileStore).and_return(mock_file_store)
end

it "should throw" do
it "reports detailed space information" do
expect(pq_config_validator).to receive(:check_disk_space) do |_, _, required_free_bytes|
expect(required_free_bytes.size).to eq(1)
expect(required_free_bytes.values[0]).to eq(1024**5 * 1000 * 2) # require 2000pb
expect(required_free_bytes.values[0]).to eq(300 * 1024 * 1024 * 1024)
end.and_call_original

expect(pq_config_validator.logger).to receive(:warn).once.with(/won't fit in file system/)
expect(pq_config_validator.logger).to receive(:warn).once do |msg|
expect(msg).to include("Total space required: 300gb")
expect(msg).to include("Current PQ usage: 25gb")
end

pq_config_validator.check({}, pipeline_configs)
pq_config_validator.check({}, [pipeline_config])
end

context "with multiple pipelines" do
let(:pipeline_id1) { "main" }
let(:pipeline_id2) { "secondary" }
let(:pipeline_id3) { "third" }

let(:base_queue_path) { queue_path }
let(:queue_path1) { ::File.join(base_queue_path, pipeline_id1) }
let(:queue_path2) { ::File.join(base_queue_path, pipeline_id2) }
let(:queue_path3) { ::File.join(Stud::Temporary.directory, pipeline_id3) }

let(:pipeline_config1) do
double("PipelineConfig").tap do |config|
allow(config).to receive(:pipeline_id).and_return(pipeline_id1)
allow(config).to receive(:settings).and_return(
double("Settings").tap do |s|
allow(s).to receive(:get).with("queue.type").and_return("persisted")
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024)
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024)
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id1)
allow(s).to receive(:get).with("path.queue").and_return(base_queue_path)
end
)
end
end

let(:pipeline_config2) do
double("PipelineConfig").tap do |config|
allow(config).to receive(:pipeline_id).and_return(pipeline_id2)
allow(config).to receive(:settings).and_return(
double("Settings").tap do |s|
allow(s).to receive(:get).with("queue.type").and_return("persisted")
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024)
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024)
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id2)
allow(s).to receive(:get).with("path.queue").and_return(base_queue_path)
end
)
end
end

let(:pipeline_config3) do
double("PipelineConfig").tap do |config|
allow(config).to receive(:pipeline_id).and_return(pipeline_id3)
allow(config).to receive(:settings).and_return(
double("Settings").tap do |s|
allow(s).to receive(:get).with("queue.type").and_return("persisted")
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024)
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024)
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id3)
allow(s).to receive(:get).with("path.queue").and_return(::File.dirname(queue_path3))
end
)
end
end

let(:mock_file_store1) { double("FileStore", name: "disk1", getUsableSpace: 100 * 1024 * 1024 * 1024) }
let(:mock_file_store2) { double("FileStore", name: "disk2", getUsableSpace: 50 * 1024 * 1024 * 1024) }

before do
# Precise path matching for Dir.glob
allow(Dir).to receive(:glob) do |pattern|
case pattern
when /#{pipeline_id1}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1"]
when /#{pipeline_id2}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1", "#{::File.dirname(pattern)}/page.2"]
when /#{pipeline_id3}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1"]
else []
end
end

# Set up file size matching with full paths
allow(File).to receive(:size) do |path|
case
when path.include?(pipeline_id1) then 30 * 1024 * 1024 * 1024 # 30GB for main
when path.include?(pipeline_id2) then 25 * 1024 * 1024 * 1024 # 25GB for secondary
when path.include?(pipeline_id3) then 25 * 1024 * 1024 * 1024 # 25GB for third
else 0
end
end

allow(Files).to receive(:getFileStore) do |path|
case path.toString
when /#{pipeline_id3}/ then mock_file_store2
else mock_file_store1
end
end

allow(FsUtil).to receive(:hasFreeSpace).and_return(false)
allow(Files).to receive(:exists).and_return(true)
end

context "with multiple queues on same filesystem" do
it "reports consolidated information for same filesystem" do
expect(pq_config_validator.logger).to receive(:warn).once do |msg|
expect(msg).to match(/Persistent queues require more disk space than is available on a filesystem:/)
expect(msg).to match(/Filesystem 'disk1':/)
expect(msg).to match(/Total space required: 600gb/) # 300GB * 2
expect(msg).to match(/Current PQ usage: 80gb/) # 30GB + (2 * 25GB)
expect(msg).to match(/Current size: 30gb/) # First queue
expect(msg).to match(/Current size: 50gb/) # Second queue (2 files * 25GB)
end

pq_config_validator.check({}, [pipeline_config1, pipeline_config2])
end
end

context "with queues across multiple filesystems" do
it "reports separate information for each filesystem" do
expect(pq_config_validator.logger).to receive(:warn).once do |msg|
# First filesystem
expect(msg).to match(/Filesystem 'disk1':/)
expect(msg).to match(/Total space required: 600gb/) # 300GB * 2
expect(msg).to match(/Current PQ usage: 80gb/) # 30GB + (2 * 25GB)

# Second filesystem
expect(msg).to match(/Filesystem 'disk2':/)
expect(msg).to match(/Total space required: 300gb/) # 300GB
expect(msg).to match(/Current PQ usage: 25gb/) # 25GB
end

pq_config_validator.check({}, [pipeline_config1, pipeline_config2, pipeline_config3])
end
end
end
end

Expand Down