Skip to content

Commit

Permalink
Cleanup code from debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
driv3r committed Oct 17, 2023
1 parent 3c4116f commit 399155a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 65 deletions.
1 change: 0 additions & 1 deletion ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,6 @@ func (f *Ferry) WaitUntilBinlogStreamerCatchesUp() {
// You will know that the BinlogStreamer finished when .Run() returns.
func (f *Ferry) FlushBinlogAndStopStreaming() {
if f.WaitUntilReplicaIsCaughtUpToMaster != nil {
f.logger.Info("flush binlog and stop streaming: wait until replica is caught up to master")
isReplica, err := CheckDbIsAReplica(f.WaitUntilReplicaIsCaughtUpToMaster.MasterDB)
if err != nil {
f.ErrorHandler.Fatal("wait_replica", err)
Expand Down
64 changes: 19 additions & 45 deletions test/helpers/ghostferry_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
require "tmpdir"
require "webrick"
require "cgi"
require "securerandom"

module GhostferryHelper
GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration")
Expand Down Expand Up @@ -50,12 +49,10 @@ module Status
AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY"
end

attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines, :tag
attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines

def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: 39393)
@log_capturer = log_capturer
@logger = log_capturer.logger
@tag = SecureRandom.hex[0..3]
def initialize(main_path, config: {}, logger:, message_timeout: 30, port: 39393)
@logger = logger

@main_path = main_path
@config = config
Expand Down Expand Up @@ -98,7 +95,6 @@ def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port:

# The main method to call to run a Ghostferry subprocess.
def run(resuming_state = nil)
@logger.info("[#{@tag}] ghostferry#run(state:#{(!resuming_state.nil?).inspect})")
resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash)

compile_binary
Expand All @@ -117,26 +113,21 @@ def run(resuming_state = nil)
# When using this method, you need to ensure that the datawriter has been
# stopped properly (if you're using stop_datawriter_during_cutover).
def run_expecting_interrupt(resuming_state = nil)
@logger.info("[#{@tag}] ghostferry#run_expecting_interrupt(state:#{(!resuming_state.nil?).inspect})")
run(resuming_state)
rescue ExitError
@logger.info("[#{@tag}] ghostferry#run_expecting_interrupt: got Ghostferry::ExitError")
dumped_state = @stdout.join("")
JSON.parse(dumped_state)
else
@logger.error("[#{@tag}] ghostferry#run_expecting_interrupt: something's wrong")
raise "Ghostferry did not get interrupted"
end

# Same as above - ensure that the datawriter has been
# stopped properly (if you're using stop_datawriter_during_cutover).
def run_expecting_failure(resuming_state = nil)
@logger.info("[#{@tag}] ghostferry#run_expecting_failure(state:#{(!resuming_state.nil?).inspect})")
run(resuming_state)
rescue ExitError
@logger.info("[#{@tag}] ghostferry#run_expecting_failure: got Ghostferry::ExitError")
else
raise "[#{@tag}] Ghostferry did not fail"
raise "Ghostferry did not fail"
end

def run_with_logs(resuming_state = nil)
Expand All @@ -150,14 +141,14 @@ def run_with_logs(resuming_state = nil)
def compile_binary
return if File.exist?(@compiled_binary_path)

@logger.debug("[#{@tag}] compiling test binary to #{@compiled_binary_path}")
@logger.debug("compiling test binary to #{@compiled_binary_path}")
rc = system(
"go", "build",
"-o", @compiled_binary_path,
@main_path
)

raise "[#{@tag}] could not compile ghostferry" unless rc
raise "could not compile ghostferry" unless rc
end

def start_server
Expand All @@ -182,27 +173,17 @@ def start_server

query = CGI::parse(req.body)

statuses = Array(query["status"])
status = query["status"]
data = query["data"]

if statuses.empty?
unless status
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status")
resp.status = 400
@server.shutdown
elsif statuses.size > 1
@logger.warn("[#{@tag}] Got multiple statuses at once: #{statuses.inspect}")
puts "Got multiple statuses at once: #{statuses.inspect}"
end

@last_message_time = now

data = query["data"]

@logger.info("[#{@tag}] server: got / with #{statuses.inspect}")
statuses.each do |status|
next if @status_handlers[status].nil?

@status_handlers[status].each { |f| f.call(*data) }
end
@status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil?
rescue StandardError => e
# errors are not reported from WEBrick but the server should fail early
# as this indicates there is likely a programming error.
Expand All @@ -213,7 +194,6 @@ def start_server

@server.mount_proc "/callbacks/progress" do |req, resp|
begin
@logger.info("[#{@tag}] server: got /callbacks/progress")
unless req.body
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data")
resp.status = 400
Expand All @@ -228,7 +208,6 @@ def start_server

@server.mount_proc "/callbacks/state" do |req, resp|
begin
@logger.info("[#{@tag}] server: got /callbacks/state")
unless req.body
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data")
resp.status = 400
Expand All @@ -241,15 +220,14 @@ def start_server
end

@server.mount_proc "/callbacks/error" do |req, resp|
@logger.info("[#{@tag}] server: got /callbacks/error")
@error = JSON.parse(JSON.parse(req.body)["Payload"])
@callback_handlers["error"].each { |f| f.call(@error) } unless @callback_handlers["error"].nil?
end

@server_thread = Thread.new do
@logger.debug("[#{@tag}] starting server thread")
@logger.debug("starting server thread")
@server.start
@logger.debug("[#{@tag}] server thread stopped")
@logger.debug("server thread stopped")
end
end

Expand Down Expand Up @@ -289,7 +267,7 @@ def start_ghostferry(resuming_state = nil)
environment["GHOSTFERRY_MARGINALIA"] = @config[:marginalia]
end

@logger.debug("[#{@tag}] starting ghostferry test binary #{@compiled_binary_path}")
@logger.debug("starting ghostferry test binary #{@compiled_binary_path}")
Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr|
stdin.puts(resuming_state) unless resuming_state.nil?
stdin.close
Expand All @@ -311,7 +289,7 @@ def start_ghostferry(resuming_state = nil)

if reader == stdout
@stdout << line
@logger.debug("[#{@tag}] stdout: #{line}")
@logger.debug("stdout: #{line}")
elsif reader == stderr
@stderr << line
if json_log_line?(line)
Expand All @@ -324,11 +302,8 @@ def start_ghostferry(resuming_state = nil)
if logline["level"] == "error"
@error_lines << logline
end

@logger.debug("[#{@tag}] stderr: #{line}") unless tag.end_with?("_binlog_streamer")
else
@logger.debug("[#{@tag}] stderr: #{line}")
end
@logger.debug("stderr: #{line}")
end
end
end
Expand All @@ -337,9 +312,9 @@ def start_ghostferry(resuming_state = nil)
@pid = 0
end

@logger.debug("[#{@tag}] ghostferry test binary exitted: #{@exit_status}")
@logger.debug("ghostferry test binary exitted: #{@exit_status}")
if @exit_status.exitstatus != 0
raise ExitError, "[#{@tag}] ghostferry test binary returned non-zero status: #{@exit_status}"
raise ExitError, "ghostferry test binary returned non-zero status: #{@exit_status}"
end
end
end
Expand All @@ -352,15 +327,14 @@ def start_server_watchdog
while @subprocess_thread.alive? do
if (now - @last_message_time) > @message_timeout
@server.shutdown
@log_capturer.print_output
raise TimeoutError, "[#{@tag}] ghostferry did not report to the integration test server for the last #{@message_timeout}s"
raise TimeoutError, "ghostferry did not report to the integration test server for the last #{@message_timeout}s"
end

sleep 1
end

@server.shutdown
@logger.debug("[#{@tag}] server watchdog thread stopped")
@logger.debug("server watchdog thread stopped")
end

@server_watchdog_thread.abort_on_exception = true
Expand Down
7 changes: 0 additions & 7 deletions test/integration/interrupt_resume_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def test_interrupt_resume_without_writes_to_source_to_check_target_state_when_in

# Writes one batch
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
info("test[09]: on_status, received -> TERM")
ghostferry.send_signal("TERM")
end

Expand All @@ -33,7 +32,6 @@ def test_interrupt_and_resume_without_last_known_schema_cache

# Writes one batch
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
info("test[31]: on_status, received -> TERM")
ghostferry.send_signal("TERM")
end

Expand Down Expand Up @@ -450,26 +448,21 @@ def test_interrupt_resume_idempotence_with_multiple_interrupts
end

def test_interrupt_resume_idempotence_with_multiple_interrupts_and_writes_to_source
@debug_me = true
info("test[452] start\n\n")
ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2)

datawriter = new_source_datawriter
start_datawriter_with_ghostferry(datawriter, ghostferry)

info("test[461] ghostferry#run_expecting_interrupt, no state\n\n")
dumped_state = ghostferry.run_expecting_interrupt
assert_basic_fields_exist_in_dumped_state(dumped_state)

ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2)

info("test[466] ghostferry#run_expecting_interrupt, with state\n\n")
ghostferry.run_expecting_interrupt(dumped_state)

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
stop_datawriter_during_cutover(datawriter, ghostferry)

info("test[472] ghostferry#run_with_logs, with state\n\n")
ghostferry.run_with_logs(dumped_state)

assert_test_table_is_identical
Expand Down
14 changes: 2 additions & 12 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,20 @@ class GhostferryTestCase < Minitest::Test
def new_ghostferry(filepath, config: {})
# Transform path to something ruby understands
path = File.join(GO_CODE_PATH, filepath, "main.go")
g = Ghostferry.new(path, config: config, log_capturer: @log_capturer)
info("[#{g.tag}] new_ghostferry: create")
g = Ghostferry.new(path, config: config, logger: @log_capturer.logger)
@ghostferry_instances << g
g
end

def new_ghostferry_with_interrupt_after_row_copy(filepath, config: {}, after_batches_written: 0)
g = new_ghostferry(filepath, config: config)

info("[#{g.tag}] new_ghostferry_wiarc: register status hook")
batches_written = 0
g.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
batches_written += 1

if batches_written >= after_batches_written
info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> true")
g.send_signal("TERM")
else
info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> false")
end
end

Expand All @@ -89,10 +84,6 @@ def setup_signal_watcher
Signal.trap("TERM") { self.on_term }
end

def info(msg)
@log_capturer.logger.info(msg)
end

##############
# Test Hooks #
##############
Expand All @@ -115,7 +106,6 @@ def before_setup

# Same thing with DataWriter as above
@datawriter_instances = []
@debug_me = nil
end

def after_teardown
Expand All @@ -127,7 +117,7 @@ def after_teardown
datawriter.stop_and_join
end

@log_capturer.print_output if self.failure || @debug_me
@log_capturer.print_output if self.failure
@log_capturer.reset
super
end
Expand Down

0 comments on commit 399155a

Please sign in to comment.