Skip to content

Commit

Permalink
Merge pull request #6285 from nilsdeppe/add_threading_29
Browse files Browse the repository at this point in the history
Fixes for nodegroups running on multiple nodes
  • Loading branch information
nilsdeppe authored Sep 14, 2024
2 parents 83de4b9 + d756f61 commit 56f59bc
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 16 deletions.
14 changes: 11 additions & 3 deletions src/Evolution/DiscontinuousGalerkin/InboxTags.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,17 @@ struct BoundaryCorrectionAndGhostCellsInbox {
if (UNLIKELY(not gsl::at(inbox->boundary_data_in_directions, neighbor_index)
.try_emplace(time_step_id, std::move(data.second),
std::move(data.first)))) {
ERROR("Failed to emplace data into inbox. neighbor_id: ("
<< neighbor_id.direction() << ',' << neighbor_id.id()
<< ") at TimeStepID: " << time_step_id);
ERROR(
"Failed to emplace data into inbox. neighbor_id: ("
<< neighbor_id.direction() << ',' << neighbor_id.id()
<< ") at TimeStepID: " << time_step_id << " the size of the inbox is "
<< gsl::at(inbox->boundary_data_in_directions, neighbor_index).size()
<< " the message count is "
<< gsl::at(inbox->boundary_data_in_directions, neighbor_index)
.message_count.load()
<< " and the number of neighbors is "
<< gsl::at(inbox->boundary_data_in_directions, neighbor_index)
.number_of_neighbors.load());
}
// Notes:
// 1. fetch_add does a post-increment.
Expand Down
19 changes: 11 additions & 8 deletions src/Parallel/ArrayCollection/ReceiveDataForElement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ struct ReceiveDataForElement {
const ElementId<Dim>& element_to_execute_on,
typename ReceiveTag::temporal_id instance,
ReceiveData receive_data) {
ERROR(
"The multi-node code hasn't been tested. It should work, but be aware "
"that I haven't tried yet.");
[[maybe_unused]] const size_t my_node = Parallel::my_node<size_t>(cache);
auto& element_collection = db::get_mutable_reference<
typename ParallelComponent::element_collection_tag>(
make_not_null(&box));
// Note: We'll be able to do a counter-based check here too once that
// works for LTS in `SendDataToElement`

ASSERT(
element_collection.count(element_to_execute_on) == 1,
"ElementId " << element_to_execute_on << " is not on node " << my_node);
ReceiveTag::insert_into_inbox(
make_not_null(&tuples::get<ReceiveTag>(
element_collection.at(element_to_execute_on).inboxes())),
Expand Down Expand Up @@ -79,14 +79,14 @@ struct ReceiveDataForElement {
Parallel::GlobalCache<Metavariables>& cache,
const ElementId<Dim>& element_to_execute_on,
const gsl::not_null<ElementCollection*> element_collection) {
const size_t my_node = Parallel::my_node<size_t>(cache);
auto& my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);

if constexpr (StartPhase) {
const Phase current_phase =
Parallel::local_branch(
Parallel::get_parallel_component<ParallelComponent>(cache))
->phase();
ASSERT(element_collection->count(element_to_execute_on) == 1,
"ElementId " << element_to_execute_on << " is not on node "
<< Parallel::my_node<size_t>(cache););
auto& element = element_collection->at(element_to_execute_on);
const std::lock_guard element_lock(element.element_lock());
element.start_phase(current_phase);
Expand All @@ -96,6 +96,9 @@ struct ReceiveDataForElement {
if (element_lock.try_lock()) {
element.perform_algorithm();
} else {
const size_t my_node = Parallel::my_node<size_t>(cache);
auto& my_proxy =
Parallel::get_parallel_component<ParallelComponent>(cache);
Parallel::threaded_action<Parallel::Actions::ReceiveDataForElement<>>(
my_proxy[my_node], element_to_execute_on);
}
Expand Down
21 changes: 17 additions & 4 deletions src/Parallel/ArrayCollection/SendDataToElement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "Parallel/GlobalCache.hpp"
#include "Parallel/Info.hpp"
#include "Parallel/NodeLock.hpp"
#include "Utilities/ErrorHandling/Assert.hpp"
#include "Utilities/Gsl.hpp"
#include "Utilities/TaggedTuple.hpp"

Expand Down Expand Up @@ -47,15 +48,16 @@ struct SendDataToElement {
const ReceiveTag& /*meta*/, const ElementId<Dim>& element_to_execute_on,
typename ReceiveTag::temporal_id instance, ReceiveData&& receive_data) {
const size_t my_node = Parallel::my_node<size_t>(*cache);
auto& element = db::get_mutable_reference<
typename ParallelComponent::element_collection_tag>(
make_not_null(&box))
.at(element_to_execute_on);
// While we don't mutate the value, we want to avoid locking the DataBox
// and the nodegroup by using `db::get_mutable_reference`. If/when we
// start dynamically inserting and removing elements, we'll need to update
// how we handle this. For example, we might need the containers to have
// strong stability guarantees.
ASSERT(db::get_mutable_reference<Parallel::Tags::ElementLocations<Dim>>(
make_not_null(&box))
.count(element_to_execute_on) == 1,
"Could not find ElementId " << element_to_execute_on
<< " in the list of element locations");
const size_t node_of_element =
db::get_mutable_reference<Parallel::Tags::ElementLocations<Dim>>(
make_not_null(&box))
Expand All @@ -64,6 +66,17 @@ struct SendDataToElement {
Parallel::get_parallel_component<ParallelComponent>(*cache);
if (node_of_element == my_node) {
[[maybe_unused]] size_t count = 0;
ASSERT(db::get_mutable_reference<
typename ParallelComponent::element_collection_tag>(
make_not_null(&box))
.count(element_to_execute_on) == 1,
"The element with ID "
<< element_to_execute_on << " is not on node " << my_node
<< ". We should be sending data to node " << node_of_element);
auto& element = db::get_mutable_reference<
typename ParallelComponent::element_collection_tag>(
make_not_null(&box))
.at(element_to_execute_on);
if constexpr (std::is_same_v<evolution::dg::AtomicInboxBoundaryData<Dim>,
typename ReceiveTag::type>) {
count = ReceiveTag::insert_into_inbox(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ struct SpawnInitializeElementsInCollection {
Parallel::GlobalCache<Metavariables>& cache,
const ArrayIndex& /*array_index*/,
const double /*unused_but_we_needed_to_reduce_something*/) {
auto my_proxy = Parallel::get_parallel_component<ParallelComponent>(cache);
auto my_proxy = Parallel::get_parallel_component<ParallelComponent>(
cache)[Parallel::my_node<size_t>(cache)];
db::mutate<typename ParallelComponent::element_collection_tag>(
[&my_proxy](const auto element_collection_ptr) {
for (auto& [element_id, element] : *element_collection_ptr) {
Expand Down

0 comments on commit 56f59bc

Please sign in to comment.