Skip to content

Commit

Permalink
Import the latest changes from the dev. repository:
Browse files Browse the repository at this point in the history
- improve syncrhonization stability and performance on faster Internet connections
- refine the tracing supporting truncation of indices and state
- bump package version to 0.4.16
  • Loading branch information
sierkov committed Apr 30, 2024
1 parent f335ff8 commit d86dac0
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 24 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.20)
project(DT)
set(CMAKE_PROJECT_VERSION_MAJOR 0)
set(CMAKE_PROJECT_VERSION_MINOR 4)
set(CMAKE_PROJECT_VERSION_PATCH 15)
set(CMAKE_PROJECT_VERSION_PATCH 16)
if(PROJECT_SOURCE_DIR STREQUAL PROJECT_BINARY_DIR)
message(FATAL_ERROR "In-source builds are not allowed")
endif()
Expand Down
8 changes: 7 additions & 1 deletion lib/dt/chunk-registry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ namespace daedalus_turbo {
return _data_dir;
}

uint64_t find_epoch(const uint64_t offset) const
{
mutex::scoped_lock lk { _update_mutex };
return find(offset).epoch();
}

const chunk_info &find(uint64_t offset) const
{
return _find_it(offset)->second;
Expand Down Expand Up @@ -418,7 +424,7 @@ namespace daedalus_turbo {
{
if (num_bytes() <= max_end_offset)
return;
timer t { fmt::format("chunk_registry::_truncate to size {}", max_end_offset) };
timer t { fmt::format("chunk_registry::_truncate to size {}", max_end_offset), logger::level::info };
auto chunk_it = _find_it(max_end_offset);
if (chunk_it->second.offset != max_end_offset)
throw error("cannot truncate to offsets not on the boundary between chunks!");
Expand Down
2 changes: 1 addition & 1 deletion lib/dt/cli/http-api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace daedalus_turbo::cli::http_api {
}
}
}
logger::info("HTTP API listens at the address {}:{}\n", ip, port);
logger::info("HTTP API listens at the address {}:{}", ip, port);
server s { data_dir, ignore_requirements };
s.serve(ip, port);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/dt/http/download-queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace daedalus_turbo::http {

~impl()
{
const auto num_reqs = cancel([](const auto req) { return true; });
const auto num_reqs = cancel([](const auto &) { return true; });
if (num_reqs > 0)
logger::warn("download_queue destroyed before completion: cancelled {} requests", num_reqs);
while (_queue_size.load() > 0 || _active_conns.load() > 0) {
Expand Down
2 changes: 1 addition & 1 deletion lib/dt/http/download-queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ namespace daedalus_turbo::http {
}
}
private:
virtual size_t _cancel_impl(const cancel_predicate &pred)
virtual size_t _cancel_impl(const cancel_predicate &/*pred*/)
{
throw error("cancellation are not supported!");
}
Expand Down
12 changes: 12 additions & 0 deletions lib/dt/index/io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,14 @@ namespace daedalus_turbo::index {
return tot_size;
}

[[nodiscard]] vector<std::string> paths() const
{
vector<std::string> p {};
p.reserve(_readers.size());
for (const auto &r: _readers)
p.emplace_back(r->path());
return p;
}
private:
vector<std::unique_ptr<reader_mt<T>>> _readers;

Expand Down Expand Up @@ -839,6 +847,10 @@ namespace daedalus_turbo::index {
return _reader.size();
}

[[nodiscard]] vector<std::string> paths() const
{
return _reader.paths();
}
private:
reader_multi_mt<T> _reader;
reader_multi_mt<T>::thread_data _data;
Expand Down
20 changes: 14 additions & 6 deletions lib/dt/indexer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ namespace daedalus_turbo::indexer {
// merge final not-yet merged epochs
if (_slices.continuous_size() <= max_end_offset)
return;
timer t { fmt::format("truncate indices to max offset {}", max_end_offset) };
timer t { fmt::format("truncate indices to max offset {}", max_end_offset), logger::level::info };
std::vector<merger::slice> updated {};
for (auto it = _slices.begin(); it != _slices.end(); ) {
const auto &s = it->second;
Expand Down Expand Up @@ -341,27 +341,35 @@ namespace daedalus_turbo::indexer {
break;
if (total_size < merger::part_size && !force && _merge_next_offset + total_size < _transaction->target_offset)
break;
auto first_epoch = _epoch_slices.begin()->first;
auto last_epoch = first_epoch;
std::vector<std::string> input_slices {};
while (!_epoch_slices.empty() && _epoch_slices.begin()->second.offset < _merge_next_offset + total_size) {
input_slices.emplace_back(_epoch_slices.begin()->second.slice_id);
last_epoch = _epoch_slices.begin()->first;
_epoch_slices.erase(_epoch_slices.begin());
}
merger::slice output_slice { _merge_next_offset, total_size };
_merge_next_offset += total_size;
epoch_slices_lk.unlock();
_merge_slice(output_slice, input_slices, 200, [this, output_slice, first_epoch, last_epoch] {
_merge_slice(output_slice, input_slices, 200, [this, output_slice] {
// ensures notifications are sent only in their continuous order
std::vector<merger::slice> notify_slices {};
{
mutex::scoped_lock lk { _slices_mutex };
const auto old_indexed_size = _slices.continuous_size();
_slices.add(output_slice);
const auto new_indexed_size = _slices.continuous_size();
if (new_indexed_size > old_indexed_size) {
for (auto slice_it = _slices.find(old_indexed_size); slice_it != _slices.end() && slice_it->second.end_offset() <= new_indexed_size; ++slice_it)
notify_slices.emplace_back(slice_it->second);
}
_final_merged = _slices.continuous_size() - _transaction->start_offset;
// experimental support for on-the-go checkpoints
_save_json_slices(_index_state_path);
}
progress::get().update("merge", _epoch_merged + _epoch_merged_base + _final_merged, (_transaction->target_offset - _transaction->start_offset) * 2);
_on_slice_ready(first_epoch, last_epoch, output_slice);
for (const auto &ns: notify_slices) {
if (ns.size > 0)
_on_slice_ready(find_epoch(ns.offset), find_epoch(ns.end_offset() - 1), ns);
}
});
epoch_slices_lk.lock();
}
Expand Down
8 changes: 4 additions & 4 deletions lib/dt/sync/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ namespace daedalus_turbo::sync::http {

static uint64_t _run_max_offset(const json::array &j_epochs, const uint64_t start_offset, const uint64_t max_offset)
{
static constexpr uint64_t max_sync_part = static_cast<uint64_t>(1) << 34;
static constexpr uint64_t max_sync_part = static_cast<uint64_t>(1) << 35;
uint64_t run_max_offset = 0;
for (size_t epoch = 0; epoch < j_epochs.size(); ++epoch) {
const auto &j_epoch_meta = j_epochs.at(epoch);
Expand Down Expand Up @@ -259,7 +259,7 @@ namespace daedalus_turbo::sync::http {
const auto num_downloads = _dlq.cancel([new_failure_offset](const auto &req) {
return req.priority >= new_failure_offset;
});
const auto num_tasks = _sched.cancel([new_failure_offset](const auto &name, const auto &param) {
const auto num_tasks = _sched.cancel([new_failure_offset](const auto &, const auto &param) {
return param && param->type() == typeid(chunk_offset_t) && std::any_cast<chunk_offset_t>(*param) >= new_failure_offset;
});
logger::info("validation failure at offset {}: cancelled {} download tasks and {} scheduler tasks",
Expand All @@ -281,7 +281,7 @@ namespace daedalus_turbo::sync::http {
auto &progress = progress::get();
auto parsed_proc = [&](std::any &&res) {
if (res.type() == typeid(scheduled_task_error)) {
const auto &task = std::any_cast<scheduled_task_error>(res).task();
const auto task = std::any_cast<scheduled_task_error>(std::move(res)).task();
_cancel_tasks(std::any_cast<chunk_offset_t>(*task.param));
return;
}
Expand All @@ -296,7 +296,7 @@ namespace daedalus_turbo::sync::http {
};
_sched.on_result(std::string { daedalus_turbo::validator::validate_leaders_task }, [&](auto &&res) {
if (res.type() == typeid(scheduled_task_error)) {
const auto &task = std::any_cast<scheduled_task_error>(res).task();
const auto task = std::any_cast<scheduled_task_error>(std::move(res)).task();
_cancel_tasks(std::any_cast<chunk_offset_t>(*task.param));
}
});
Expand Down
19 changes: 10 additions & 9 deletions lib/dt/validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ namespace daedalus_turbo::validator {

void truncate_impl(uint64_t max_end_offset)
{
timer t { "validator::truncate" };
_cr._parent_truncate_impl(max_end_offset);
if (std::max(_subchains.valid_size(), _state.end_offset()) <= max_end_offset)
return;
timer t { fmt::format("validator::truncate to size: {}", max_end_offset), logger::level::info };
if (!_snapshots.empty() && _snapshots.rbegin()->end_offset > max_end_offset) {
for (auto it = _snapshots.begin(); it != _snapshots.end(); ) {
if (it->end_offset > max_end_offset) {
Expand Down Expand Up @@ -232,7 +232,7 @@ namespace daedalus_turbo::validator {
return chunk;
}

void on_slice_ready(uint64_t first_epoch, uint64_t last_epoch, const indexer::merger::slice &slice)
void on_slice_ready(const uint64_t first_epoch, const uint64_t last_epoch, const indexer::merger::slice &slice)
{
_cr._parent_on_slice_ready(first_epoch, last_epoch, slice);
// only one thread at a time must work on this
Expand Down Expand Up @@ -426,14 +426,14 @@ namespace daedalus_turbo::validator {
mutex::unique_lock lk2 { _next_task_mutex };
while (_state.end_offset() < _next_end_offset) {
logger::debug("acquired _naxt_task mutex and configuring the validation task");
auto start_offset = _state.end_offset();
auto end_offset = _next_end_offset;
auto first_epoch = _state.epoch();
auto last_epoch = _next_last_epoch;
auto ready_slices = _cr.slices(end_offset);
const auto start_offset = _state.end_offset();
const auto end_offset = _next_end_offset;
const auto first_epoch = _state.epoch();
const auto last_epoch = _next_last_epoch;
const auto ready_slices = _cr.slices(end_offset);
lk2.unlock();
logger::info("pre-aggregating data for ledger state updates between epochs {} and {}", first_epoch, last_epoch);
auto num_outflow_parts = _prepare_outflows(start_offset, end_offset, ready_slices);
const auto num_outflow_parts = _prepare_outflows(start_offset, end_offset, ready_slices);
logger::debug("outflows ready, preparing the per-epoch deltas");
_process_updates(num_outflow_parts, first_epoch, last_epoch);
logger::debug("per-epoch deltas are ready, merging subchains from the same epoch");
Expand Down Expand Up @@ -750,7 +750,8 @@ namespace daedalus_turbo::validator {
index::txo::item search_item { cc.tx_hash, cc.txo_idx };
auto [ txo_count, txo_item ] = txo_reader.find(search_item);
if (txo_count != 1)
throw error("each input used as a collateral must be present exactly once but got: {} for {} #{}", txo_count, cc.tx_hash, cc.txo_idx);
throw error("slot {}: each input used as a collateral must be present exactly once but got: {} for {} #{} index slices: {}",
upd.slot, txo_count, cc.tx_hash, cc.txo_idx, txo_reader.paths());
_state.add_fees(txo_item.amount);
break;
}
Expand Down

0 comments on commit d86dac0

Please sign in to comment.