Skip to content

Commit

Permalink
Replace WorkerManager by HasWorker and WorkerPool
Browse files Browse the repository at this point in the history
Also introduce `run_in_worker_pool()` functions/methods in C/C++ API
  • Loading branch information
nhusung committed Oct 16, 2024
1 parent 71cccc8 commit 45ba9da
Show file tree
Hide file tree
Showing 28 changed files with 579 additions and 256 deletions.
21 changes: 21 additions & 0 deletions bindings/cpp/include/oxidd/bcdd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <iterator>
#include <tuple>
#include <type_traits>
Expand Down Expand Up @@ -114,6 +115,26 @@ class bcdd_manager {
return _manager._p == nullptr;
}

/// Execute `f()` in the worker thread pool of `manager`
///
/// Recursive calls in the multithreaded apply algorithms are always executed
/// within the manager's thread pool, requiring a rather expensive context
/// switch if the apply algorithm is not called from within the thread pool.
/// If the algorithm takes long to execute anyway, this may not be important,
/// but for many small operations, this may easily make a difference by
/// factors.
///
/// This method blocks until `f()` has finished.
///
/// `this` must not be invalid (check via `is_invalid()`).
///
/// @returns The result of calling `f()`
template <typename R> R run_in_worker_pool(std::function<R()> f) const {
assert(_manager._p);
return oxidd::util::detail::run_in_worker_pool(
capi::oxidd_bcdd_manager_run_in_worker_pool, _manager, std::move(f));
}

/// @name BCDD Construction
/// @{

Expand Down
21 changes: 21 additions & 0 deletions bindings/cpp/include/oxidd/bdd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <iterator>
#include <tuple>
#include <type_traits>
Expand Down Expand Up @@ -116,6 +117,26 @@ class bdd_manager {
return _manager._p == nullptr;
}

/// Execute `f()` in the worker thread pool of `manager`
///
/// Recursive calls in the multithreaded apply algorithms are always executed
/// within the manager's thread pool, requiring a rather expensive context
/// switch if the apply algorithm is not called from within the thread pool.
/// If the algorithm takes long to execute anyway, this may not be important,
/// but for many small operations, this may easily make a difference by
/// factors.
///
/// This method blocks until `f()` has finished.
///
/// `this` must not be invalid (check via `is_invalid()`).
///
/// @returns The result of calling `f()`
template <typename R> R run_in_worker_pool(std::function<R()> f) const {
assert(_manager._p);
return oxidd::util::detail::run_in_worker_pool(
capi::oxidd_bdd_manager_run_in_worker_pool, _manager, std::move(f));
}

/// @name BDD Construction
/// @{

Expand Down
48 changes: 48 additions & 0 deletions bindings/cpp/include/oxidd/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <initializer_list>
#include <iterator>
#include <optional>
#include <tuple>
#include <type_traits>
#include <vector>
#include <version>

Expand Down Expand Up @@ -371,6 +373,52 @@ concept pair_like =

#endif // __cpp_lib_concepts

/// @cond

namespace detail {

extern "C" inline void *oxidd_callback_helper(void *f) {
return (*static_cast<std::function<void *()> *>(f))();
}

template <typename M, typename R>
R run_in_worker_pool(void *(*run_fn)(M, void *(void *), void *), M manager,
std::function<R()> f) {
if constexpr (std::is_void_v<R>) {
std::function<void *()> wrapper([f = std::move(f)]() -> void * {
f();
return nullptr;
});
run_fn(manager, oxidd_callback_helper, (void *)(&wrapper));
} else if constexpr (std::is_reference_v<R>) {
std::function<void *()> wrapper(
[f = std::move(f)]() -> void * { return (void *)(&f()); });
return *static_cast<std::remove_reference_t<R> *>(
run_fn(manager, oxidd_callback_helper, (void *)(&wrapper)));
} else if constexpr (std::is_pointer_v<R>) {
std::function<void *()> wrapper(
[f = std::move(f)]() -> void * { return (void *)f(); });
return static_cast<R>(
run_fn(manager, oxidd_callback_helper, (void *)(&wrapper)));
} else {
// union type to not call the R's default constructor (or even require one)
union {
R v;
} return_val;
std::function<void *()> wrapper(
[f = std::move(f), &return_val]() -> void * {
return_val.v = f();
return nullptr;
});
run_fn(manager, oxidd_callback_helper, (void *)(&wrapper));
return return_val.v;
}
}

} // namespace detail

/// @endcond

} // namespace util

} // namespace oxidd
21 changes: 21 additions & 0 deletions bindings/cpp/include/oxidd/zbdd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <tuple>
#include <type_traits>
#include <utility>
Expand Down Expand Up @@ -107,6 +108,26 @@ class zbdd_manager {
return _manager._p == nullptr;
}

/// Execute `f()` in the worker thread pool of `manager`
///
/// Recursive calls in the multithreaded apply algorithms are always executed
/// within the manager's thread pool, requiring a rather expensive context
/// switch if the apply algorithm is not called from within the thread pool.
/// If the algorithm takes long to execute anyway, this may not be important,
/// but for many small operations, this may easily make a difference by
/// factors.
///
/// This method blocks until `f()` has finished.
///
/// `this` must not be invalid (check via `is_invalid()`).
///
/// @returns The result of calling `f()`
template <typename R> R run_in_worker_pool(std::function<R()> f) const {
assert(_manager._p);
return oxidd::util::detail::run_in_worker_pool(
capi::oxidd_zbdd_manager_run_in_worker_pool, _manager, std::move(f));
}

/// @name ZBDD Construction
/// @{

Expand Down
36 changes: 21 additions & 15 deletions bindings/cpp/tests/boolean-function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,31 +564,37 @@ template <boolean_function_manager M> class test_all_boolean_functions {
void bdd_all_boolean_functions_2vars_t1() {
// NOLINTNEXTLINE(*-magic-numbers)
bdd_manager mgr(65536, 1024, 1);
const std::array vars{mgr.new_var(), mgr.new_var()};
const test_all_boolean_functions test(mgr, vars, vars);
test.basic();
test.subst();
test.quant();
mgr.run_in_worker_pool<void>([&mgr]() {
const std::array vars{mgr.new_var(), mgr.new_var()};
const test_all_boolean_functions test(mgr, vars, vars);
test.basic();
test.subst();
test.quant();
});
}

void bcdd_all_boolean_functions_2vars_t1() {
// NOLINTNEXTLINE(*-magic-numbers)
bcdd_manager mgr(65536, 1024, 1);
const std::array vars{mgr.new_var(), mgr.new_var()};
const test_all_boolean_functions test(mgr, vars, vars);
test.basic();
test.subst();
test.quant();
mgr.run_in_worker_pool<void>([&mgr]() {
const std::array vars{mgr.new_var(), mgr.new_var()};
const test_all_boolean_functions test(mgr, vars, vars);
test.basic();
test.subst();
test.quant();
});
}

void zbdd_all_boolean_functions_2vars_t1() {
// NOLINTNEXTLINE(*-magic-numbers)
zbdd_manager mgr(65536, 1024, 1);
const std::array singletons{mgr.new_singleton(), mgr.new_singleton()};
const std::array vars{singletons[0].var_boolean_function(),
singletons[1].var_boolean_function()};
const test_all_boolean_functions test(mgr, vars, singletons);
test.basic();
mgr.run_in_worker_pool<void>([&mgr]() {
const std::array singletons{mgr.new_singleton(), mgr.new_singleton()};
const std::array vars{singletons[0].var_boolean_function(),
singletons[1].var_boolean_function()};
const test_all_boolean_functions test(mgr, vars, singletons);
test.basic();
});
}

int main() {
Expand Down
53 changes: 33 additions & 20 deletions crates/oxidd-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use bitvec::prelude::*;
use clap::{Parser, ValueEnum};
use num_bigint::BigUint;
use oxidd::util::{AllocResult, SatCountCache};
use oxidd::{BooleanFunction, Edge, LevelNo, Manager, WorkerManager};
use oxidd::{BooleanFunction, Edge, HasWorkers, LevelNo, Manager, WorkerPool};
use oxidd_core::{ApplyCache, HasApplyCache, HasLevel, ManagerRef};
use oxidd_dump::{dddmp, dot};
use oxidd_parser::load_file::load_file;
Expand Down Expand Up @@ -160,7 +160,7 @@ fn make_vars<'id, B: BooleanFunction>(
name_map: &mut FxHashMap<String, B>,
) -> Vec<B>
where
B::Manager<'id>: WorkerManager,
B::Manager<'id>: HasWorkers,
<B::Manager<'id> as Manager>::InnerNode: HasLevel,
{
let num_vars = var_set.len();
Expand Down Expand Up @@ -216,7 +216,7 @@ fn make_bool_dd<B>(
) -> B
where
B: BooleanFunction + Send + Sync + 'static,
for<'id> B::Manager<'id>: WorkerManager,
for<'id> B::Manager<'id>: HasWorkers,
for<'id> <B::Manager<'id> as Manager>::InnerNode: HasLevel,
{
fn prop_rec<B: BooleanFunction>(
Expand Down Expand Up @@ -285,14 +285,16 @@ where
rec(&mut leaves, &mut f)
}

fn balanced_reduce_par<T: Send>(
fn balanced_reduce_par<W: WorkerPool, T: Send>(
workers: &W,
iter: impl IntoParallelIterator<Item = T>,
f: impl for<'a> Fn(&'a T, &'a T) -> T + Send + Copy,
) -> Option<T> {
// We use `Option<T>` such that we can drop the values as soon as possible
let mut leaves: Vec<Option<T>> = iter.into_par_iter().map(Some).collect();

fn rec<T: Send>(
fn rec<W: WorkerPool, T: Send>(
workers: &W,
leaves: &mut [Option<T>],
f: impl for<'a> Fn(&'a T, &'a T) -> T + Send + Copy,
) -> Option<T> {
Expand All @@ -301,15 +303,15 @@ where
[l] => l.take(),
_ => {
let (l, r) = leaves.split_at_mut(leaves.len() / 2);
match rayon::join(move || rec(l, f), move || rec(r, f)) {
match workers.join(move || rec(workers, l, f), move || rec(workers, r, f)) {
(None, v) | (v, None) => v,
(Some(l), Some(r)) => Some(f(&l, &r)),
}
}
}
}

rec(&mut leaves, f)
rec(workers, &mut leaves, f)
}

fn clause_tree_rec<B: BooleanFunction>(
Expand All @@ -332,16 +334,18 @@ where
}
}

fn clause_tree_par_rec<B: BooleanFunction + Send + Sync>(
fn clause_tree_par_rec<W: WorkerPool, B: BooleanFunction + Send + Sync>(
workers: &W,
clauses: &[B],
clause_order: &Tree<usize>,
profiler: &Profiler,
) -> Option<B> {
match clause_order {
Tree::Leaf(n) => Some(clauses[*n].clone()),
Tree::Inner(sub) => balanced_reduce_par(
workers,
sub.into_par_iter()
.filter_map(|t| clause_tree_par_rec(clauses, t, profiler)),
.filter_map(|t| clause_tree_par_rec(workers, clauses, t, profiler)),
|lhs: &B, rhs: &B| {
let op_start = profiler.start_op();
let conj = handle_oom!(lhs.and(rhs));
Expand All @@ -366,7 +370,7 @@ where
});

mref.with_manager_shared(|manager| {
manager.set_split_depth(Some(0));
manager.workers().set_split_depth(Some(0));
let clauses = cnf.clauses_mut();
PROGRESS.set_task(
format!("build clauses (problem {problem_no})"),
Expand Down Expand Up @@ -407,7 +411,7 @@ where
HDuration(profiler.elapsed_time())
);

manager.set_split_depth(cli.operation_split_depth);
manager.workers().set_split_depth(cli.operation_split_depth);
PROGRESS.set_task(
format!("conjoin clauses (problem {problem_no})"),
clauses.len() - 1,
Expand Down Expand Up @@ -436,7 +440,7 @@ where
.unwrap()
}
(CNFBuildOrder::Balanced, true) => {
balanced_reduce_par(clauses, |lhs: &B, rhs: &B| {
balanced_reduce_par(manager.workers(), clauses, |lhs: &B, rhs: &B| {
let op_start = profiler.start_op();
let conj = handle_oom!(lhs.and(rhs));
profiler.finish_op(op_start, &conj);
Expand All @@ -463,10 +467,13 @@ where
clause_tree_rec(&clauses, cnf.clause_order().unwrap(), &profiler)
.unwrap()
}
(CNFBuildOrder::Tree, true) => {
clause_tree_par_rec(&clauses, cnf.clause_order().unwrap(), &profiler)
.unwrap()
}
(CNFBuildOrder::Tree, true) => clause_tree_par_rec(
manager.workers(),
&clauses,
cnf.clause_order().unwrap(),
&profiler,
)
.unwrap(),
}
}
})
Expand All @@ -481,7 +488,7 @@ where
make_vars::<B>(manager, prop.vars(), cli.read_var_order, var_name_map)
});
mref.with_manager_shared(|manager| {
manager.set_split_depth(cli.operation_split_depth);
manager.workers().set_split_depth(cli.operation_split_depth);
prop_rec(manager, prop.formula(), &vars, &profiler)
})
}
Expand All @@ -500,7 +507,7 @@ where
B: BooleanFunction + Send + Sync + 'static,
B::ManagerRef: Send + 'static,
for<'id> B: dot::DotStyle<<B::Manager<'id> as Manager>::EdgeTag>,
for<'id> B::Manager<'id>: WorkerManager + HasApplyCache<B::Manager<'id>, O>,
for<'id> B::Manager<'id>: HasWorkers + HasApplyCache<B::Manager<'id>, O>,
for<'id> <B::Manager<'id> as Manager>::InnerNode: HasLevel,
for<'id> <B::Manager<'id> as Manager>::EdgeTag: fmt::Debug,
for<'id> <B::Manager<'id> as Manager>::Terminal: FromStr + fmt::Display + dddmp::AsciiDisplay,
Expand Down Expand Up @@ -785,15 +792,21 @@ fn main() {
DDType::BDD => {
let mref =
oxidd::bdd::new_manager(inner_node_capacity, cli.apply_cache_capacity, cli.threads);
bool_dd_main::<oxidd::bdd::BDDFunction, _>(&cli, mref);
// Run all operations from within the worker pool to reduce the number of
// context switches
mref.clone()
.workers()
.install(move || bool_dd_main::<oxidd::bdd::BDDFunction, _>(&cli, mref))
}
DDType::BCDD => {
let mref = oxidd::bcdd::new_manager(
inner_node_capacity,
cli.apply_cache_capacity,
cli.threads,
);
bool_dd_main::<oxidd::bcdd::BCDDFunction, _>(&cli, mref);
mref.clone()
.workers()
.install(move || bool_dd_main::<oxidd::bcdd::BCDDFunction, _>(&cli, mref))
}
DDType::ZBDD => todo!(),
}
Expand Down
Loading

0 comments on commit 45ba9da

Please sign in to comment.