Skip to content

Commit

Permalink
Change PL scheduling notification
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Meyer committed Aug 28, 2023
1 parent 3cdc732 commit f8b3ce0
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 13 deletions.
12 changes: 7 additions & 5 deletions b_eff/src/device/communication_ACCL_pl_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ schedule_send(ap_uint<32> size, ap_uint<32> neighbor_rank,
}

void recv_stream(ap_uint<512>* write_buffer, ap_uint<32> size, ap_uint<32> num_iterations,
ap_uint<32> notify_enabled,
STREAM<stream_word> &data_in,
STREAM<notify_word> &notify) {
#pragma HLS INTERFACE m_axi port=write_buffer bundle=gmem_out
#pragma HLS INTERFACE s_axilite port=size
#pragma HLS INTERFACE s_axilite port=num_iterations
#pragma HLS INTERFACE s_axilite port=notify_enabled
#pragma HLS INTERFACE axis port=data_in
#pragma HLS INTERFACE axis port=notify
#pragma HLS INTERFACE s_axilite port=return
Expand All @@ -81,11 +83,13 @@ void recv_stream(ap_uint<512>* write_buffer, ap_uint<32> size, ap_uint<32> num_
#pragma HLS protocol fixed
read_data(write_buffer, size, data_in);
ap_wait();
notify.write(w);
if (notify_enabled != 0) {
notify.write(w);
}
}
}

void schedule_stream(ap_uint<32> size, ap_uint<32> num_iterations, int enable,
void schedule_stream(ap_uint<32> size, ap_uint<32> num_iterations,
ap_uint<32> neighbor_rank, ap_uint<32> communicator_addr, ap_uint<32> datapath_cfg,
STREAM<command_word> &cmd, STREAM<command_word> &sts,
STREAM<notify_word> &notify) {
Expand All @@ -101,9 +105,7 @@ void schedule_stream(ap_uint<32> size, ap_uint<32> num_iterations, int enable,

for (int i = 0; i < num_iterations; i++) {
#pragma HLS protocol fixed
if (enable) {
schedule_send(size, neighbor_rank, communicator_addr, datapath_cfg, cmd, sts);
}
schedule_send(size, neighbor_rank, communicator_addr, datapath_cfg, cmd, sts);
ap_wait();
notify_word w = notify.read();
}
Expand Down
8 changes: 4 additions & 4 deletions b_eff/src/host/execution_types/execution_accl_pl_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,22 @@ namespace network::execution_types::accl_pl_stream {
MPI_Barrier(MPI_COMM_WORLD);
auto startCalculation = std::chrono::high_resolution_clock::now();
if (!config.programSettings->useAcclEmulation) {
auto run_recv = recvKernel(*acclRecvBuffers[i]->bo(), size_in_values, looplength);
auto run_recv = recvKernel(*acclRecvBuffers[i]->bo(), size_in_values, looplength, 1);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
MPI_Barrier(MPI_COMM_WORLD);
auto run_send = sendKernel(*acclSendBuffers[i]->bo(), size_in_values, looplength);
startCalculation = std::chrono::high_resolution_clock::now();
auto run_schedule = scheduleKernel(size_in_values, looplength, 1, (current_rank - 1 + 2 * ((current_rank + i) % 2) + current_size) % current_size,
auto run_schedule = scheduleKernel(size_in_values, looplength, (current_rank - 1 + 2 * ((current_rank + i) % 2) + current_size) % current_size,
config.context->accl->get_communicator_addr(), config.context->accl->get_arithmetic_config_addr({ACCL::dataType::int32, ACCL::dataType::int32}));
run_send.wait();
run_recv.wait();
run_schedule.wait();
} else {
std::thread run_send(send_stream, reinterpret_cast<ap_uint<512>*>(acclSendBuffers[i]->buffer()), size_in_values, looplength,
std::ref(krnl2cclo));
std::thread run_recv(recv_stream, reinterpret_cast<ap_uint<512>*>(acclRecvBuffers[i]->buffer()), size_in_values, looplength,
std::thread run_recv(recv_stream, reinterpret_cast<ap_uint<512>*>(acclRecvBuffers[i]->buffer()), size_in_values, looplength, 1,
std::ref(cclo2krnl), std::ref(notify));
std::thread run_schedule(schedule_stream,size_in_values, looplength, 1, (current_rank - 1 + 2 * ((current_rank + i) % 2) + current_size) % current_size,
std::thread run_schedule(schedule_stream,size_in_values, looplength, (current_rank - 1 + 2 * ((current_rank + i) % 2) + current_size) % current_size,
config.context->accl->get_communicator_addr(), config.context->accl->get_arithmetic_config_addr({ACCL::dataType::int32, ACCL::dataType::int32}),
std::ref(cmd), std::ref(sts), std::ref(notify));
run_send.join();
Expand Down
5 changes: 1 addition & 4 deletions b_eff/src/host/execution_types/execution_accl_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,11 @@ namespace network::execution_types::accl_stream {
double calculationTime = 0.0;
for (int i = 0; i < config.programSettings->kernelReplications; i++) {
MPI_Barrier(MPI_COMM_WORLD);
auto run_recv = recvKernel(*acclRecvBuffers[i]->bo(), size_in_values, looplength);
auto run_recv = recvKernel(*acclRecvBuffers[i]->bo(), size_in_values, looplength, 0);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
MPI_Barrier(MPI_COMM_WORLD);
auto run_send = sendKernel(*acclSendBuffers[i]->bo(), size_in_values, looplength);
auto startCalculation = std::chrono::high_resolution_clock::now();
auto run_schedule = scheduleKernel(size_in_values, looplength, 0, (current_rank - 1 + 2 * ((current_rank + i) % 2) + current_size) % current_size,
config.context->accl->get_communicator_addr(), config.context->accl->get_arithmetic_config_addr({ACCL::dataType::int32, ACCL::dataType::int32}));
for (int l = 0; l < looplength; l++) {
#ifndef NDEBUG
std::cout << "Stream " << size_in_bytes << " bytes to "
Expand All @@ -103,7 +101,6 @@ namespace network::execution_types::accl_stream {
}
run_send.wait();
run_recv.wait();
run_schedule.wait();
auto endCalculation = std::chrono::high_resolution_clock::now();
calculationTime += std::chrono::duration_cast<std::chrono::duration<double>>(endCalculation - startCalculation).count();
#ifndef NDEBUG
Expand Down

0 comments on commit f8b3ce0

Please sign in to comment.