Skip to content

Commit

Permalink
Merge branch 'main' into transient-fed
Browse files Browse the repository at this point in the history
  • Loading branch information
ChadliaJerad committed Aug 14, 2024
2 parents 3408e35 + 7db4930 commit 1d06d53
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 93 deletions.
3 changes: 3 additions & 0 deletions core/federated/RTI/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CMakeFiles
Makefile
cmake_install.cmake
135 changes: 113 additions & 22 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ extern bool _lf_termination_executed;
// Global variables references in federate.h
lf_mutex_t lf_outbound_socket_mutex;
lf_cond_t lf_port_status_changed;
lf_cond_t lf_current_tag_changed;

/**
* The max level allowed to advance (MLAA) is a variable that tracks how far in the reaction
Expand Down Expand Up @@ -205,7 +204,7 @@ static lf_action_base_t* action_for_port(int port_id) {
* @param tag The tag on which the latest status of all network input
* ports is known.
*/
static void update_last_known_status_on_input_ports(tag_t tag) {
static void update_last_known_status_on_input_ports(tag_t tag, environment_t* env) {
LF_PRINT_DEBUG("In update_last_known_status_on_input ports.");
bool notify = false;
for (size_t i = 0; i < _lf_action_table_size; i++) {
Expand All @@ -231,6 +230,8 @@ static void update_last_known_status_on_input_ports(tag_t tag) {
if (notify && lf_update_max_level(tag, false)) {
// Notify network input reactions
lf_cond_broadcast(&lf_port_status_changed);
// Could be blocked waiting for physical time to advance to the STA, so unblock that too.
lf_cond_broadcast(&env->event_q_changed);
}
}

Expand Down Expand Up @@ -625,7 +626,7 @@ static int handle_tagged_message(int* socket, int fed_id) {
}
#endif // FEDERATED_DECENTRALIZED
// The following will update the input_port_action->last_known_status_tag.
// For decentralized coordination, this is needed for the thread implementing STAA.
// For decentralized coordination, this is needed to unblock the STAA.
update_last_known_status_on_input_port(env, actual_tag, port_id);

// If the current time >= stop time, discard the message.
Expand Down Expand Up @@ -1023,7 +1024,7 @@ static void handle_tag_advance_grant(void) {
// knows the status of network ports up to and including the granted tag,
// so by extension, we assume that the federate can safely rely
// on the RTI to handle port statuses up until the granted tag.
update_last_known_status_on_input_ports(TAG);
update_last_known_status_on_input_ports(TAG, env);

// It is possible for this federate to have received a PTAG
// earlier with the same tag as this TAG.
Expand All @@ -1047,7 +1048,7 @@ static void handle_tag_advance_grant(void) {

#ifdef FEDERATED_DECENTRALIZED
/**
* @brief Return whether there exists an input port whose status is unknown.
* @brief Return true if there is an input port among those with a given STAA whose status is unknown.
*
* @param staa_elem A record of all input port actions.
*/
Expand Down Expand Up @@ -1076,9 +1077,28 @@ static int id_of_action(lf_action_base_t* input_port_action) {

#endif

#ifdef FEDERATED_DECENTRALIZED
/**
* @brief Return true if all network input ports are known up to the specified tag.
* @param tag The tag.
*/
static bool inputs_known_to(tag_t tag) {
for (size_t i = 0; i < _lf_action_table_size; i++) {
tag_t known_to = _lf_action_table[i]->trigger->last_known_status_tag;
if (lf_tag_compare(known_to, tag) < 0) {
// There is a network input port for which it is not known whether a message with tag earlier
// than or equal to the specified tag may later arrive.
return false;
}
}
return true;
}
#endif // FEDERATED_DECENTRALIZED

/**
* @brief Thread handling setting the known absent status of input ports.
* For the code-generated array of staa offsets `staa_lst`, which is sorted by STAA offset,
*
* For the code-generated array of STAA offsets `staa_lst`, which is sorted by STAA offset,
* wait for physical time to advance to the current time plus the STAA offset,
* then set the absent status of the input ports associated with the STAA.
* Then wait for current time to advance and start over.
Expand All @@ -1101,43 +1121,48 @@ static void* update_ports_from_staa_offsets(void* args) {
staa_t* staa_elem = staa_lst[i];
// The staa_elem is adjusted in the code generator to have subtracted the delay on the connection.
// The list is sorted in increasing order of adjusted STAA offsets.
// The wait_until function automatically adds the lf_fed_STA_offset to the wait time.
interval_t wait_until_time = env->current_tag.time + staa_elem->STAA;
LF_PRINT_DEBUG("**** (update thread) original wait_until_time: " PRINTF_TIME, wait_until_time - lf_time_start());
// We need to add the lf_fed_STA_offset to the wait time and guard against overflow.
interval_t wait_time = lf_time_add(staa_elem->STAA, lf_fed_STA_offset);
instant_t wait_until_time = lf_time_add(env->current_tag.time, wait_time);
LF_PRINT_DEBUG("**** (update thread) wait_until_time: " PRINTF_TIME, wait_until_time - lf_time_start());

// The wait_until call will release the env->mutex while it is waiting.
// However, it will not release the env->mutex if the wait time is too small.
// At the cost of a small additional delay in deciding a port is absent,
// we require a minimum wait time here. Otherwise, if both the STAA and STA are
// zero, this thread will fail to ever release the environment mutex.
// we require a minimum wait time here. Note that zero-valued STAAs are
// included, and STA might be zero or very small.
// In this case, this thread will fail to ever release the environment mutex.
// This causes chaos. The MIN_SLEEP_DURATION is the smallest amount of time
// that wait_until will actually wait. Note that this strategy does not
// block progress of any execution that is actually processing events.
// It only slightly delays the decision that an event is absent, and only
// if the STAA and STA are extremely small.
if (lf_fed_STA_offset + staa_elem->STAA < 5 * MIN_SLEEP_DURATION) {
if (wait_time < 5 * MIN_SLEEP_DURATION) {
wait_until_time += 5 * MIN_SLEEP_DURATION;
}
while (a_port_is_unknown(staa_elem)) {
LF_PRINT_DEBUG("**** (update thread) waiting until: " PRINTF_TIME, wait_until_time - lf_time_start());
if (wait_until(wait_until_time, &lf_port_status_changed)) {
// Specified timeout time was reached.
// If the current tag has changed, start over.
if (lf_tag_compare(lf_tag(env), tag_when_started_waiting) != 0) {
break;
}
/* Possibly useful for debugging:
tag_t current_tag = lf_tag(env);
LF_PRINT_DEBUG("**** (update thread) Assuming absent! " PRINTF_TAG, current_tag.time - lf_time_start(),
current_tag.microstep); LF_PRINT_DEBUG("**** (update thread) Lag is " PRINTF_TIME, current_tag.time -
lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME, wait_until_time -
lf_time_start());
lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME,
wait_until_time - lf_time_start());
*/

// Mark input ports absent.
for (size_t j = 0; j < staa_elem->num_actions; ++j) {
lf_action_base_t* input_port_action = staa_elem->actions[j];
if (input_port_action->trigger->status == unknown) {
input_port_action->trigger->status = absent;
LF_PRINT_DEBUG("**** (update thread) Assuming port absent at time " PRINTF_TIME,
lf_tag(env).time - start_time);
LF_PRINT_DEBUG("**** (update thread) Assuming port absent at tag " PRINTF_TAG,
lf_tag(env).time - start_time, lf_tag(env).microstep);
update_last_known_status_on_input_port(env, lf_tag(env), id_of_action(input_port_action));
lf_cond_broadcast(&lf_port_status_changed);
}
Expand All @@ -1163,23 +1188,27 @@ static void* update_ports_from_staa_offsets(void* args) {
// Some ports may have been reset to uknown during that wait, in which case,
// it would be huge mistake to enter the wait for a new tag below because the
// program will freeze. First, check whether any ports are unknown:
bool port_unkonwn = false;
bool port_unknown = false;
for (size_t i = 0; i < staa_lst_size; ++i) {
staa_t* staa_elem = staa_lst[i];
if (a_port_is_unknown(staa_elem)) {
port_unkonwn = true;
port_unknown = true;
break;
}
}
if (!port_unkonwn) {
if (!port_unknown) {
// If this occurs, then there is a race condition that can lead to deadlocks.
lf_print_error_and_exit("**** (update thread) Inconsistency: All ports are known, but MLAA is blocking.");
}

// Since max_level_allowed_to_advance will block advancement of time, we cannot follow
// through to the next step without deadlocking. Wait some time, then continue.
// The wait is necessary to prevent a busy wait.
lf_sleep(2 * MIN_SLEEP_DURATION);
// The wait is necessary to prevent a busy wait, which will only occur if port
// status are always known inside the while loop
// Be sure to use wait_until() instead of sleep() because sleep() will not release the mutex.
instant_t wait_until_time = lf_time_add(env->current_tag.time, 2 * MIN_SLEEP_DURATION);
wait_until(wait_until_time, &lf_port_status_changed);

continue;
}

Expand All @@ -1192,7 +1221,7 @@ static void* update_ports_from_staa_offsets(void* args) {
// Ports are reset to unknown at the start of new tag, so that will wake this up.
lf_cond_wait(&lf_port_status_changed);
}
LF_PRINT_DEBUG("**** (update thread) Tags after wait: " PRINTF_TAG ", " PRINTF_TAG,
LF_PRINT_DEBUG("**** (update thread) Tags after wait: " PRINTF_TAG ", and before: " PRINTF_TAG,
lf_tag(env).time - lf_time_start(), lf_tag(env).microstep,
tag_when_started_waiting.time - lf_time_start(), tag_when_started_waiting.microstep);
}
Expand Down Expand Up @@ -2777,4 +2806,66 @@ char* lf_get_federates_bin_directory() {

const char* lf_get_federation_id() { return federation_metadata.federation_id; }

void lf_stop() {
environment_t* env;
int num_env = _lf_get_environments(&env);

for (int i = 0; i < num_env; i++) {
LF_MUTEX_LOCK(&env[i].mutex);

tag_t new_stop_tag;
new_stop_tag.time = env[i].current_tag.time;
new_stop_tag.microstep = env[i].current_tag.microstep + 1;

lf_set_stop_tag(&env[i], new_stop_tag);

lf_print("Setting the stop tag of env %d to " PRINTF_TAG ".", i, env[i].stop_tag.time - start_time,
env[i].stop_tag.microstep);

if (env[i].barrier.requestors)
_lf_decrement_tag_barrier_locked(&env[i]);
lf_cond_broadcast(&env[i].event_q_changed);
LF_MUTEX_UNLOCK(&env[i].mutex);
}
LF_PRINT_LOG("Federate is stopping.");
}

char* lf_get_federates_bin_directory() {
bool bin_directory_defined = false;
#ifdef LF_FEDERATES_BIN_DIRECTORY
bin_directory_defined = true;
#endif
if (bin_directory_defined) {
return (LF_FEDERATES_BIN_DIRECTORY);
}
return NULL;
}

const char* lf_get_federation_id() { return federation_metadata.federation_id; }

#ifdef FEDERATED_DECENTRALIZED
instant_t lf_wait_until_time(tag_t tag) {
instant_t result = tag.time; // Default.

// Do not add the STA if the tag is the starting tag.
if (tag.time != start_time || tag.microstep != 0u) {

// Apply the STA to the logical time, but only if at least one network input port is not known up to this tag.
// Subtract one microstep because it is sufficient to commit to a tag if the input ports are known
// up to one microstep earlier.
if (tag.microstep > 0) {
tag.microstep--;
} else {
tag.microstep = UINT_MAX;
tag.time -= 1;
}

if (!inputs_known_to(tag)) {
result = lf_time_add(result, lf_fed_STA_offset);
}
}
return result;
}
#endif // FEDERATED_DECENTRALIZED

#endif // FEDERATED
2 changes: 0 additions & 2 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,6 @@ void _lf_start_time_step(environment_t* env) {
// Reset absent fields on network ports because
// their status is unknown
lf_reset_status_fields_on_input_port_triggers();
// Signal the helper thread to reset its progress since the logical time has changed.
lf_cond_signal(&lf_current_tag_changed);
}
#endif // FEDERATED
}
Expand Down
74 changes: 18 additions & 56 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,46 +195,9 @@ void lf_set_present(lf_port_base_t* port) {
}
}

/**
* Wait until physical time matches or exceeds the specified logical time,
* unless -fast is given. For decentralized coordination, this function will
* add the STA offset to the wait time.
*
* If an event is put on the event queue during the wait, then the wait is
* interrupted and this function returns false. It also returns false if the
* timeout time is reached before the wait has completed. Note this this could
* return true even if the a new event was placed on the queue if that event
* time matches or exceeds the specified time.
*
* The mutex lock associated with the condition argument is assumed to be held by
* the calling thread. This mutex is released while waiting. If the wait time is
* too small to actually wait (less than MIN_SLEEP_DURATION), then this function
* immediately returns true and the mutex is not released.
*
* @param env Environment within which we are executing.
* @param logical_time Logical time to wait until physical time matches it.
* @param condition A condition variable that can interrupt the wait. The mutex
* associated with this condition variable will be released during the wait.
*
* @return Return false if the wait is interrupted either because of an event
* queue signal or if the wait time was interrupted early by reaching
* the stop time, if one was specified. Return true if the full wait time
* was reached.
*/
bool wait_until(instant_t logical_time, lf_cond_t* condition) {
LF_PRINT_DEBUG("-------- Waiting until physical time matches logical time " PRINTF_TIME, logical_time);
interval_t wait_until_time = logical_time;
#ifdef FEDERATED_DECENTRALIZED // Only apply the STA if coordination is decentralized
// Apply the STA to the logical time
// Prevent an overflow
if (start_time != logical_time && wait_until_time < FOREVER - lf_fed_STA_offset) {
// If wait_time is not forever
LF_PRINT_DEBUG("Adding STA " PRINTF_TIME " to wait until time " PRINTF_TIME ".", lf_fed_STA_offset,
wait_until_time - start_time);
wait_until_time += lf_fed_STA_offset;
}
#endif
bool wait_until(instant_t wait_until_time, lf_cond_t* condition) {
if (!fast) {
LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, wait_until_time - start_time);
// Check whether we actually need to wait, or if we have already passed the timepoint.
interval_t wait_duration = wait_until_time - lf_time_physical();
if (wait_duration < MIN_SLEEP_DURATION) {
Expand All @@ -253,10 +216,8 @@ bool wait_until(instant_t logical_time, lf_cond_t* condition) {

// Wait did not time out, which means that there
// may have been an asynchronous call to lf_schedule().
// Continue waiting.
// Do not adjust logical tag here. If there was an asynchronous
// call to lf_schedule(), it will have put an event on the event queue,
// and logical tag will be set to that time when that event is pulled.
// If there was an asynchronous call to lf_schedule(), it will have put an event on the event queue,
// and the tag will be set to that value when that event is pulled.
return false;
} else {
// Reached timeout.
Expand Down Expand Up @@ -421,8 +382,18 @@ void _lf_next_locked(environment_t* env) {
// Wait for physical time to advance to the next event time (or stop time).
// This can be interrupted if a physical action triggers (e.g., a message
// arrives from an upstream federate or a local physical action triggers).
LF_PRINT_LOG("Waiting until elapsed time " PRINTF_TIME ".", (next_tag.time - start_time));
while (!wait_until(next_tag.time, &env->event_q_changed)) {
while (true) {
#ifdef FEDERATED_DECENTRALIZED
// Apply the STA, if needed.
interval_t wait_until_time = lf_wait_until_time(next_tag);
#else // not FEDERATED_DECENTRALIZED
interval_t wait_until_time = next_tag.time;
#endif // FEDERATED_DECENTRALIZED
LF_PRINT_LOG("Waiting until elapsed time " PRINTF_TIME ".", (wait_until_time - start_time));
if (wait_until(wait_until_time, &env->event_q_changed)) {
// Waited the full time.
break;
}
LF_PRINT_DEBUG("_lf_next_locked(): Wait until time interrupted.");
// Sleep was interrupted. Check for a new next_event.
// The interruption could also have been due to a call to lf_request_stop().
Expand Down Expand Up @@ -625,24 +596,15 @@ void _lf_initialize_start_tag(environment_t* env) {
#endif
LF_PRINT_LOG("Waiting for start time " PRINTF_TIME ".", start_time);

// Call wait_until if federated. This is required because the startup procedure
// Wait until the start time. This is required for federates because the startup procedure
// in lf_synchronize_with_other_federates() can decide on a new start_time that is
// larger than the current physical time.
// Therefore, if --fast was not specified, wait until physical time matches
// or exceeds the start time. Microstep is ignored.
// This wait_until() is deliberately called after most precursor operations
// for tag (0,0) are performed (e.g., injecting startup reactions, etc.).
// This has two benefits: First, the startup overheads will reduce
// the required waiting time. Second, this call releases the mutex lock and allows
// other threads (specifically, federate threads that handle incoming p2p messages
// from other federates) to hold the lock and possibly raise a tag barrier. This is
// especially useful if an STA is set properly because the federate will get
// a chance to process incoming messages while utilizing the STA.
LF_PRINT_LOG("Waiting for effective start time " PRINTF_TIME " plus STA " PRINTF_TIME ".", effective_start_tag.time,
lf_fed_STA_offset);
// Here we wait until the start time and also release the environment mutex.
// this means that the other worker threads will be allowed to start. We need
// this to avoid potential deadlock in federated startup.
// from other federates) to hold the lock and possibly raise a tag barrier.
while (!wait_until(effective_start_tag.time, &env->event_q_changed)) {
};
LF_PRINT_DEBUG("Done waiting for effective start time + STA offset " PRINTF_TIME ".",
Expand Down
2 changes: 1 addition & 1 deletion core/threaded/scheduler_NP.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ static void _lf_sched_signal_stop(lf_scheduler_t* scheduler) {
* Advance tag if there are no reactions in the array of reaction vectors. If
* there are such reactions, distribute them to worker threads.
*
* This function assumes the caller does not hold the 'mutex' lock.
* This function assumes the caller does not hold the mutex lock.
*/
static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) {
// Reset the index
Expand Down
Loading

0 comments on commit 1d06d53

Please sign in to comment.