From 2112e3d67cde454c32f1ee760e2e62cef45c898d Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 3 Aug 2024 11:03:08 -0400 Subject: [PATCH 01/12] First pass at making dataflow work with decentralized --- core/federated/federate.c | 73 +++++++++++++++++----- core/threaded/reactor_threaded.c | 62 +++++------------- core/threaded/scheduler_NP.c | 2 +- core/threaded/scheduler_sync_tag_advance.c | 14 +++-- include/core/federated/federate.h | 15 +++++ include/core/threaded/reactor_threaded.h | 33 +++++++++- 6 files changed, 129 insertions(+), 70 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index c1bd4cbea..80ca5c002 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -203,7 +203,7 @@ static lf_action_base_t* action_for_port(int port_id) { * @param tag The tag on which the latest status of all network input * ports is known. */ -static void update_last_known_status_on_input_ports(tag_t tag) { +static void update_last_known_status_on_input_ports(tag_t tag, environment_t* env) { LF_PRINT_DEBUG("In update_last_known_status_on_input ports."); bool notify = false; for (size_t i = 0; i < _lf_action_table_size; i++) { @@ -229,6 +229,8 @@ static void update_last_known_status_on_input_ports(tag_t tag) { if (notify && lf_update_max_level(tag, false)) { // Notify network input reactions lf_cond_broadcast(&lf_port_status_changed); + // Could be blocked waiting for physical time to advance to the STA, so unblock that too. + lf_cond_broadcast(&env->event_q_changed); } } @@ -623,7 +625,7 @@ static int handle_tagged_message(int* socket, int fed_id) { } #endif // FEDERATED_DECENTRALIZED // The following will update the input_port_action->last_known_status_tag. - // For decentralized coordination, this is needed for the thread implementing STAA. + // For decentralized coordination, this is needed to unblock the STAA. update_last_known_status_on_input_port(env, actual_tag, port_id); // If the current time >= stop time, discard the message. @@ -1020,7 +1022,7 @@ static void handle_tag_advance_grant(void) { // knows the status of network ports up to and including the granted tag, // so by extension, we assume that the federate can safely rely // on the RTI to handle port statuses up until the granted tag. - update_last_known_status_on_input_ports(TAG); + update_last_known_status_on_input_ports(TAG, env); // It is possible for this federate to have received a PTAG // earlier with the same tag as this TAG. @@ -1075,7 +1077,8 @@ static int id_of_action(lf_action_base_t* input_port_action) { /** * @brief Thread handling setting the known absent status of input ports. - * For the code-generated array of staa offsets `staa_lst`, which is sorted by STAA offset, + * + * For the code-generated array of STAA offsets `staa_lst`, which is sorted by STAA offset, * wait for physical time to advance to the current time plus the STAA offset, * then set the absent status of the input ports associated with the STAA. * Then wait for current time to advance and start over. @@ -1099,8 +1102,9 @@ static void* update_ports_from_staa_offsets(void* args) { // The staa_elem is adjusted in the code generator to have subtracted the delay on the connection. // The list is sorted in increasing order of adjusted STAA offsets. // The wait_until function automatically adds the lf_fed_STA_offset to the wait time. - interval_t wait_until_time = env->current_tag.time + staa_elem->STAA; - LF_PRINT_DEBUG("**** (update thread) original wait_until_time: " PRINTF_TIME, wait_until_time - lf_time_start()); + // Note that the microstep does not matter here. + tag_t wait_until_tag = {.time = env->current_tag.time + staa_elem->STAA, .microstep = 0}; + LF_PRINT_DEBUG("**** (update thread) original wait_until time: " PRINTF_TIME, wait_until_tag.time - lf_time_start()); // The wait_until call will release the env->mutex while it is waiting. // However, it will not release the env->mutex if the wait time is too small. @@ -1113,11 +1117,12 @@ static void* update_ports_from_staa_offsets(void* args) { // It only slightly delays the decision that an event is absent, and only // if the STAA and STA are extremely small. if (lf_fed_STA_offset + staa_elem->STAA < 5 * MIN_SLEEP_DURATION) { - wait_until_time += 5 * MIN_SLEEP_DURATION; + wait_until_tag.time += 5 * MIN_SLEEP_DURATION; } while (a_port_is_unknown(staa_elem)) { - LF_PRINT_DEBUG("**** (update thread) waiting until: " PRINTF_TIME, wait_until_time - lf_time_start()); - if (wait_until(wait_until_time, &lf_port_status_changed)) { + LF_PRINT_DEBUG("**** (update thread) waiting until: " PRINTF_TIME, wait_until_tag.time - lf_time_start()); + if (wait_until(wait_until_tag, &lf_port_status_changed)) { + // Specified timeout time was reached. if (lf_tag_compare(lf_tag(env), tag_when_started_waiting) != 0) { break; } @@ -1125,16 +1130,17 @@ static void* update_ports_from_staa_offsets(void* args) { tag_t current_tag = lf_tag(env); LF_PRINT_DEBUG("**** (update thread) Assuming absent! " PRINTF_TAG, current_tag.time - lf_time_start(), current_tag.microstep); LF_PRINT_DEBUG("**** (update thread) Lag is " PRINTF_TIME, current_tag.time - - lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME, wait_until_time - + lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME, wait_until_tag.time - lf_time_start()); */ + // Mark input ports absent. for (size_t j = 0; j < staa_elem->num_actions; ++j) { lf_action_base_t* input_port_action = staa_elem->actions[j]; if (input_port_action->trigger->status == unknown) { input_port_action->trigger->status = absent; - LF_PRINT_DEBUG("**** (update thread) Assuming port absent at time " PRINTF_TIME, - lf_tag(env).time - start_time); + LF_PRINT_DEBUG("**** (update thread) Assuming port absent at tag " PRINTF_TAG, + lf_tag(env).time - start_time, lf_tag(env).microstep); update_last_known_status_on_input_port(env, lf_tag(env), id_of_action(input_port_action)); lf_cond_broadcast(&lf_port_status_changed); } @@ -1189,7 +1195,7 @@ static void* update_ports_from_staa_offsets(void* args) { // Ports are reset to unknown at the start of new tag, so that will wake this up. lf_cond_wait(&lf_port_status_changed); } - LF_PRINT_DEBUG("**** (update thread) Tags after wait: " PRINTF_TAG ", " PRINTF_TAG, + LF_PRINT_DEBUG("**** (update thread) Tags after wait: " PRINTF_TAG ", and before: " PRINTF_TAG, lf_tag(env).time - lf_time_start(), lf_tag(env).microstep, tag_when_started_waiting.time - lf_time_start(), tag_when_started_waiting.microstep); } @@ -2712,4 +2718,43 @@ bool lf_update_max_level(tag_t tag, bool is_provisional) { return (prev_max_level_allowed_to_advance != max_level_allowed_to_advance); } -#endif +#ifdef FEDERATED_DECENTRALIZED +instant_t lf_wait_until_time(tag_t tag) { + instant_t result = tag.time; // Default. + + // Do not add the STA if the tag is the starting tag. + if (tag.time != start_time || tag.microstep != 0u) { + + // Apply the STA to the logical time, but only if at least one network input port is not known up to this tag. + // Subtract one microstep because it is sufficient to commit to a tag if the input ports are known + // up to one microstep earlier. + if (tag.microstep > 0) { + tag.microstep--; + } else { + tag.microstep = UINT_MAX; + tag.time -= 1; + } + + for (int i = 0; i < _lf_action_table_size; i++) { + tag_t known_to = _lf_action_table[i]->trigger->last_known_status_tag; + if (lf_tag_compare(known_to, tag) < 0) { + // There is a network input port for which it is not known whether a message with tag earlier + // than the specified tag may later arrive. Add the STA offset. + // Prevent an overflow and allow the STA offset to be FOREVER. + if (result < FOREVER - lf_fed_STA_offset) { + LF_PRINT_DEBUG("Adding STA " PRINTF_TIME " to wait until time " PRINTF_TIME ".", lf_fed_STA_offset, + result - start_time); + result += lf_fed_STA_offset; + } else { + LF_PRINT_DEBUG("Setting the wait time to FOREVER."); + result = FOREVER; + } + break; // No need to check the rest. + } + } + } + return result; +} +#endif // FEDERATED_DECENTRALIZED + +#endif // FEDERATED diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index ce4c463b5..48c0f36ae 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -194,46 +194,15 @@ void lf_set_present(lf_port_base_t* port) { } } -/** - * Wait until physical time matches or exceeds the specified logical time, - * unless -fast is given. For decentralized coordination, this function will - * add the STA offset to the wait time. - * - * If an event is put on the event queue during the wait, then the wait is - * interrupted and this function returns false. It also returns false if the - * timeout time is reached before the wait has completed. Note this this could - * return true even if the a new event was placed on the queue if that event - * time matches or exceeds the specified time. - * - * The mutex lock associated with the condition argument is assumed to be held by - * the calling thread. This mutex is released while waiting. If the wait time is - * too small to actually wait (less than MIN_SLEEP_DURATION), then this function - * immediately returns true and the mutex is not released. - * - * @param env Environment within which we are executing. - * @param logical_time Logical time to wait until physical time matches it. - * @param condition A condition variable that can interrupt the wait. The mutex - * associated with this condition variable will be released during the wait. - * - * @return Return false if the wait is interrupted either because of an event - * queue signal or if the wait time was interrupted early by reaching - * the stop time, if one was specified. Return true if the full wait time - * was reached. - */ -bool wait_until(instant_t logical_time, lf_cond_t* condition) { - LF_PRINT_DEBUG("-------- Waiting until physical time matches logical time " PRINTF_TIME, logical_time); - interval_t wait_until_time = logical_time; -#ifdef FEDERATED_DECENTRALIZED // Only apply the STA if coordination is decentralized - // Apply the STA to the logical time - // Prevent an overflow - if (start_time != logical_time && wait_until_time < FOREVER - lf_fed_STA_offset) { - // If wait_time is not forever - LF_PRINT_DEBUG("Adding STA " PRINTF_TIME " to wait until time " PRINTF_TIME ".", lf_fed_STA_offset, - wait_until_time - start_time); - wait_until_time += lf_fed_STA_offset; - } -#endif +bool wait_until(tag_t tag, lf_cond_t* condition) { +#ifdef FEDERATED_DECENTRALIZED // Only apply the STA if coordination is decentralized. + interval_t wait_until_time = lf_wait_until_time(tag); +#else // not FEDERATED_DECENTRALIZED + interval_t wait_until_time = tag.time; +#endif // FEDERATED_DECENTRALIZED if (!fast) { + LF_PRINT_DEBUG("-------- Waiting until physical time matches logical time " PRINTF_TIME, + wait_until_time - start_time); // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); if (wait_duration < MIN_SLEEP_DURATION) { @@ -252,10 +221,8 @@ bool wait_until(instant_t logical_time, lf_cond_t* condition) { // Wait did not time out, which means that there // may have been an asynchronous call to lf_schedule(). - // Continue waiting. - // Do not adjust logical tag here. If there was an asynchronous - // call to lf_schedule(), it will have put an event on the event queue, - // and logical tag will be set to that time when that event is pulled. + // If there was an asynchronous call to lf_schedule(), it will have put an event on the event queue, + // and the tag will be set to that value when that event is pulled. return false; } else { // Reached timeout. @@ -421,7 +388,7 @@ void _lf_next_locked(environment_t* env) { // This can be interrupted if a physical action triggers (e.g., a message // arrives from an upstream federate or a local physical action triggers). LF_PRINT_LOG("Waiting until elapsed time " PRINTF_TIME ".", (next_tag.time - start_time)); - while (!wait_until(next_tag.time, &env->event_q_changed)) { + while (!wait_until(next_tag, &env->event_q_changed)) { LF_PRINT_DEBUG("_lf_next_locked(): Wait until time interrupted."); // Sleep was interrupted. Check for a new next_event. // The interruption could also have been due to a call to lf_request_stop(). @@ -603,7 +570,8 @@ void _lf_initialize_start_tag(environment_t* env) { } // The start time will likely have changed. Adjust the current tag and stop tag. - env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; + tag_t start_tag = (tag_t){.time = start_time, .microstep = 0u}; + env->current_tag = start_tag; if (duration >= 0LL) { // A duration has been specified. Recalculate the stop time. env->stop_tag = ((tag_t){.time = start_time + duration, .microstep = 0}); @@ -611,8 +579,6 @@ void _lf_initialize_start_tag(environment_t* env) { _lf_initialize_timers(env); - env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; - #if defined FEDERATED_DECENTRALIZED // If we have a non-zero STA offset, then we need to allow messages to arrive // at the start time. To avoid spurious STP violations, we temporarily @@ -642,7 +608,7 @@ void _lf_initialize_start_tag(environment_t* env) { // Here we wait until the start time and also release the environment mutex. // this means that the other worker threads will be allowed to start. We need // this to avoid potential deadlock in federated startup. - while (!wait_until(start_time, &env->event_q_changed)) { + while (!wait_until(start_tag, &env->event_q_changed)) { }; LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + lf_fed_STA_offset); LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME ". This should be close to the STA offset.", diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 7edd41a81..54a611ea8 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -159,7 +159,7 @@ static void _lf_sched_signal_stop(lf_scheduler_t* scheduler) { * Advance tag if there are no reactions in the array of reaction vectors. If * there are such reactions, distribute them to worker threads. * - * This function assumes the caller does not hold the 'mutex' lock. + * This function assumes the caller does not hold the mutex lock. */ static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { // Reset the index diff --git a/core/threaded/scheduler_sync_tag_advance.c b/core/threaded/scheduler_sync_tag_advance.c index cc91c88f0..f298cca25 100644 --- a/core/threaded/scheduler_sync_tag_advance.c +++ b/core/threaded/scheduler_sync_tag_advance.c @@ -20,16 +20,19 @@ void _lf_next_locked(struct environment_t* env); /** - * @brief Indicator that execution of at least one tag has completed. + * @brief Indicator that execution of the specified tag has completed. */ -static bool _latest_tag_completed = false; +static tag_t _latest_tag_completed = NEVER_TAG; bool should_stop_locked(lf_scheduler_t* sched) { // If this is not the very first step, check against the stop tag to see whether this is the last step. - if (_latest_tag_completed) { + // Also, stop only after completing the stop tag. + if (lf_tag_compare(_latest_tag_completed, sched->env->current_tag) == 0) { // If we are at the stop tag, do not call _lf_next_locked() // to prevent advancing the logical time. if (lf_tag_compare(sched->env->current_tag, sched->env->stop_tag) >= 0) { + LF_PRINT_DEBUG("****************** Stopping execution at tag " PRINTF_TAG, + sched->env->current_tag.time - lf_time_start(), sched->env->current_tag.microstep); return true; } } @@ -50,11 +53,10 @@ bool _lf_sched_advance_tag_locked(lf_scheduler_t* sched) { return true; } - _latest_tag_completed = true; + _latest_tag_completed = env->current_tag; // Advance time. - // _lf_next_locked() may block waiting for real time to pass or events to appear. - // to appear on the event queue. Note that we already + // _lf_next_locked() may block waiting for real time to pass or events to appear on the event queue. tracepoint_scheduler_advancing_time_starts(env); _lf_next_locked(env); tracepoint_scheduler_advancing_time_ends(env); diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 8ab5dab2e..af9625c80 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -529,4 +529,19 @@ void lf_synchronize_with_other_federates(); */ bool lf_update_max_level(tag_t tag, bool is_provisional); +#ifdef FEDERATED_DECENTRALIZED +/** + * @brief Return the physical time that we should wait until before advancing to the specified tag. + * + * This function adds the STA offset (STP_offset parameter) to the time of the specified tag unless + * the tag is the starting tag (it is always safe to advance to the starting tag). It also avoids + * adding the STA offset if all network input ports are known at least up to one microstep earlier + * than the specified tag. + * + * This function assumes that the caller holds the environment mutex. + * @param time The specified time. + */ +instant_t lf_wait_until_time(tag_t tag); +#endif // FEDERATED_DECENTRALIZED + #endif // FEDERATE_H diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 0d58f7431..01f22bcce 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -80,7 +80,38 @@ void _lf_decrement_tag_barrier_locked(environment_t* env); int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag); void lf_synchronize_with_other_federates(void); -bool wait_until(instant_t logical_time_ns, lf_cond_t* condition); + + +/** + * @brief Wait until physical time matches or exceeds the time of the specified tag. + * + * If -fast is given, there will be no wait. For federated programs with decentralized coordination, + * this function will add the STA offset to the wait time unless all network input ports are known + * up to, but not necessarily including, the specified tag. + * + * If an event is put on the event queue during the wait, then the wait is + * interrupted and this function returns false. It also returns false if the + * timeout time is reached before the wait has completed. Note this this could + * return true even if the a new event was placed on the queue if that event + * time matches or exceeds the specified time. + * + * The mutex lock associated with the condition argument is assumed to be held by + * the calling thread. This mutex is released while waiting. If the wait time is + * too small to actually wait (less than MIN_SLEEP_DURATION), then this function + * immediately returns true and the mutex is not released. + * + * @param env Environment within which we are executing. + * @param tag The tag with the time to wait until physical time matches it. + * @param condition A condition variable that can interrupt the wait. The mutex + * associated with this condition variable will be released during the wait. + * + * @return Return false if the wait is interrupted either because of an event + * queue signal or if the wait time was interrupted early by reaching + * the stop time, if one was specified. Return true if the full wait time + * was reached. + */ +bool wait_until(tag_t tag, lf_cond_t* condition); + tag_t get_next_event_tag(environment_t* env); tag_t send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply); void _lf_next_locked(environment_t* env); From 01ebb73aba1ff9956df91fc29f4cf1be17e41659 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 3 Aug 2024 11:17:03 -0400 Subject: [PATCH 02/12] Deal with BS compiler complaint --- core/threaded/scheduler_sync_tag_advance.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/threaded/scheduler_sync_tag_advance.c b/core/threaded/scheduler_sync_tag_advance.c index f298cca25..0de94eed4 100644 --- a/core/threaded/scheduler_sync_tag_advance.c +++ b/core/threaded/scheduler_sync_tag_advance.c @@ -22,7 +22,7 @@ void _lf_next_locked(struct environment_t* env); /** * @brief Indicator that execution of the specified tag has completed. */ -static tag_t _latest_tag_completed = NEVER_TAG; +static tag_t _latest_tag_completed = NEVER_TAG_INITIALIZER; bool should_stop_locked(lf_scheduler_t* sched) { // If this is not the very first step, check against the stop tag to see whether this is the last step. From 803b1f65a295ee5beba2bb11028d6d5c5ada22d0 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 3 Aug 2024 11:30:22 -0400 Subject: [PATCH 03/12] format --- core/federated/federate.c | 11 ++++++----- core/threaded/reactor_threaded.c | 4 ++-- core/threaded/scheduler_sync_tag_advance.c | 2 +- include/core/federated/federate.h | 4 ++-- include/core/threaded/reactor_threaded.h | 3 +-- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 80ca5c002..9bae3ae16 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1077,7 +1077,7 @@ static int id_of_action(lf_action_base_t* input_port_action) { /** * @brief Thread handling setting the known absent status of input ports. - * + * * For the code-generated array of STAA offsets `staa_lst`, which is sorted by STAA offset, * wait for physical time to advance to the current time plus the STAA offset, * then set the absent status of the input ports associated with the STAA. @@ -1104,7 +1104,8 @@ static void* update_ports_from_staa_offsets(void* args) { // The wait_until function automatically adds the lf_fed_STA_offset to the wait time. // Note that the microstep does not matter here. tag_t wait_until_tag = {.time = env->current_tag.time + staa_elem->STAA, .microstep = 0}; - LF_PRINT_DEBUG("**** (update thread) original wait_until time: " PRINTF_TIME, wait_until_tag.time - lf_time_start()); + LF_PRINT_DEBUG("**** (update thread) original wait_until time: " PRINTF_TIME, + wait_until_tag.time - lf_time_start()); // The wait_until call will release the env->mutex while it is waiting. // However, it will not release the env->mutex if the wait time is too small. @@ -1130,8 +1131,8 @@ static void* update_ports_from_staa_offsets(void* args) { tag_t current_tag = lf_tag(env); LF_PRINT_DEBUG("**** (update thread) Assuming absent! " PRINTF_TAG, current_tag.time - lf_time_start(), current_tag.microstep); LF_PRINT_DEBUG("**** (update thread) Lag is " PRINTF_TIME, current_tag.time - - lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME, wait_until_tag.time - - lf_time_start()); + lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME, + wait_until_tag.time - lf_time_start()); */ // Mark input ports absent. @@ -2743,7 +2744,7 @@ instant_t lf_wait_until_time(tag_t tag) { // Prevent an overflow and allow the STA offset to be FOREVER. if (result < FOREVER - lf_fed_STA_offset) { LF_PRINT_DEBUG("Adding STA " PRINTF_TIME " to wait until time " PRINTF_TIME ".", lf_fed_STA_offset, - result - start_time); + result - start_time); result += lf_fed_STA_offset; } else { LF_PRINT_DEBUG("Setting the wait time to FOREVER."); diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 48c0f36ae..b33bb5df1 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -197,12 +197,12 @@ void lf_set_present(lf_port_base_t* port) { bool wait_until(tag_t tag, lf_cond_t* condition) { #ifdef FEDERATED_DECENTRALIZED // Only apply the STA if coordination is decentralized. interval_t wait_until_time = lf_wait_until_time(tag); -#else // not FEDERATED_DECENTRALIZED +#else // not FEDERATED_DECENTRALIZED interval_t wait_until_time = tag.time; #endif // FEDERATED_DECENTRALIZED if (!fast) { LF_PRINT_DEBUG("-------- Waiting until physical time matches logical time " PRINTF_TIME, - wait_until_time - start_time); + wait_until_time - start_time); // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); if (wait_duration < MIN_SLEEP_DURATION) { diff --git a/core/threaded/scheduler_sync_tag_advance.c b/core/threaded/scheduler_sync_tag_advance.c index 0de94eed4..1af8bcb32 100644 --- a/core/threaded/scheduler_sync_tag_advance.c +++ b/core/threaded/scheduler_sync_tag_advance.c @@ -32,7 +32,7 @@ bool should_stop_locked(lf_scheduler_t* sched) { // to prevent advancing the logical time. if (lf_tag_compare(sched->env->current_tag, sched->env->stop_tag) >= 0) { LF_PRINT_DEBUG("****************** Stopping execution at tag " PRINTF_TAG, - sched->env->current_tag.time - lf_time_start(), sched->env->current_tag.microstep); + sched->env->current_tag.time - lf_time_start(), sched->env->current_tag.microstep); return true; } } diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index af9625c80..c6372b803 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -532,12 +532,12 @@ bool lf_update_max_level(tag_t tag, bool is_provisional); #ifdef FEDERATED_DECENTRALIZED /** * @brief Return the physical time that we should wait until before advancing to the specified tag. - * + * * This function adds the STA offset (STP_offset parameter) to the time of the specified tag unless * the tag is the starting tag (it is always safe to advance to the starting tag). It also avoids * adding the STA offset if all network input ports are known at least up to one microstep earlier * than the specified tag. - * + * * This function assumes that the caller holds the environment mutex. * @param time The specified time. */ diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 01f22bcce..5805b26ac 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -81,10 +81,9 @@ void _lf_decrement_tag_barrier_locked(environment_t* env); int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag); void lf_synchronize_with_other_federates(void); - /** * @brief Wait until physical time matches or exceeds the time of the specified tag. - * + * * If -fast is given, there will be no wait. For federated programs with decentralized coordination, * this function will add the STA offset to the wait time unless all network input ports are known * up to, but not necessarily including, the specified tag. From efb6d80e534349986a6840ba1793cc0a674052f0 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 3 Aug 2024 11:43:46 -0400 Subject: [PATCH 04/12] Deal with compiler nit --- core/federated/federate.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 9bae3ae16..8128d6419 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -2736,7 +2736,7 @@ instant_t lf_wait_until_time(tag_t tag) { tag.time -= 1; } - for (int i = 0; i < _lf_action_table_size; i++) { + for (size_t i = 0; i < _lf_action_table_size; i++) { tag_t known_to = _lf_action_table[i]->trigger->last_known_status_tag; if (lf_tag_compare(known_to, tag) < 0) { // There is a network input port for which it is not known whether a message with tag earlier From c043d928d89a74a5f62fb2216ac2fcb0d3808ede Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 3 Aug 2024 17:35:53 -0400 Subject: [PATCH 05/12] Added .gitignore for cmake-generated files --- core/federated/RTI/.gitignore | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 core/federated/RTI/.gitignore diff --git a/core/federated/RTI/.gitignore b/core/federated/RTI/.gitignore new file mode 100644 index 000000000..a1e6bc234 --- /dev/null +++ b/core/federated/RTI/.gitignore @@ -0,0 +1,3 @@ +CMakeFiles +Makefile +cmake_install.cmake \ No newline at end of file From 83cbf554c120f48c7999b230365f8f14af80bede Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sun, 4 Aug 2024 09:53:36 -0400 Subject: [PATCH 06/12] Refactored to apply STA only where needed --- core/federated/federate.c | 64 +++++++++++++----------- core/threaded/reactor_threaded.c | 40 +++++++-------- include/core/threaded/reactor_threaded.h | 12 ++--- 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 8128d6419..df4670919 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1046,7 +1046,7 @@ static void handle_tag_advance_grant(void) { #ifdef FEDERATED_DECENTRALIZED /** - * @brief Return whether there exists an input port whose status is unknown. + * @brief Return true if there is an input port among those with a given STAA whose status is unknown. * * @param staa_elem A record of all input port actions. */ @@ -1075,6 +1075,22 @@ static int id_of_action(lf_action_base_t* input_port_action) { #endif +/** + * @brief Return true if all network input ports are known up to the specified tag. + * @param tag The tag. + */ +static bool inputs_known_to(tag_t tag) { + for (size_t i = 0; i < _lf_action_table_size; i++) { + tag_t known_to = _lf_action_table[i]->trigger->last_known_status_tag; + if (lf_tag_compare(known_to, tag) < 0) { + // There is a network input port for which it is not known whether a message with tag earlier + // than or equal to the specified tag may later arrive. + return false; + } + } + return true; +} + /** * @brief Thread handling setting the known absent status of input ports. * @@ -1101,11 +1117,11 @@ static void* update_ports_from_staa_offsets(void* args) { staa_t* staa_elem = staa_lst[i]; // The staa_elem is adjusted in the code generator to have subtracted the delay on the connection. // The list is sorted in increasing order of adjusted STAA offsets. - // The wait_until function automatically adds the lf_fed_STA_offset to the wait time. - // Note that the microstep does not matter here. - tag_t wait_until_tag = {.time = env->current_tag.time + staa_elem->STAA, .microstep = 0}; - LF_PRINT_DEBUG("**** (update thread) original wait_until time: " PRINTF_TIME, - wait_until_tag.time - lf_time_start()); + // We need to add the lf_fed_STA_offset to the wait time and guard against overflow. + interval_t wait_time = lf_time_add(staa_elem->STAA, lf_fed_STA_offset); + instant_t wait_until_time = lf_time_add(env->current_tag.time, wait_time); + LF_PRINT_DEBUG("**** (update thread) wait_until_time: " PRINTF_TIME, + wait_until_time - lf_time_start()); // The wait_until call will release the env->mutex while it is waiting. // However, it will not release the env->mutex if the wait time is too small. @@ -1117,13 +1133,14 @@ static void* update_ports_from_staa_offsets(void* args) { // block progress of any execution that is actually processing events. // It only slightly delays the decision that an event is absent, and only // if the STAA and STA are extremely small. - if (lf_fed_STA_offset + staa_elem->STAA < 5 * MIN_SLEEP_DURATION) { - wait_until_tag.time += 5 * MIN_SLEEP_DURATION; + if (wait_time < 5 * MIN_SLEEP_DURATION) { + wait_until_time += 5 * MIN_SLEEP_DURATION; } while (a_port_is_unknown(staa_elem)) { - LF_PRINT_DEBUG("**** (update thread) waiting until: " PRINTF_TIME, wait_until_tag.time - lf_time_start()); - if (wait_until(wait_until_tag, &lf_port_status_changed)) { + LF_PRINT_DEBUG("**** (update thread) waiting until: " PRINTF_TIME, wait_until_time - lf_time_start()); + if (wait_until(wait_until_time, &lf_port_status_changed)) { // Specified timeout time was reached. + // If the current tag has changed, start over. if (lf_tag_compare(lf_tag(env), tag_when_started_waiting) != 0) { break; } @@ -1132,7 +1149,7 @@ static void* update_ports_from_staa_offsets(void* args) { LF_PRINT_DEBUG("**** (update thread) Assuming absent! " PRINTF_TAG, current_tag.time - lf_time_start(), current_tag.microstep); LF_PRINT_DEBUG("**** (update thread) Lag is " PRINTF_TIME, current_tag.time - lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME, - wait_until_tag.time - lf_time_start()); + wait_until_time - lf_time_start()); */ // Mark input ports absent. @@ -1167,15 +1184,15 @@ static void* update_ports_from_staa_offsets(void* args) { // Some ports may have been reset to uknown during that wait, in which case, // it would be huge mistake to enter the wait for a new tag below because the // program will freeze. First, check whether any ports are unknown: - bool port_unkonwn = false; + bool port_unknown = false; for (size_t i = 0; i < staa_lst_size; ++i) { staa_t* staa_elem = staa_lst[i]; if (a_port_is_unknown(staa_elem)) { - port_unkonwn = true; + port_unknown = true; break; } } - if (!port_unkonwn) { + if (!port_unknown) { // If this occurs, then there is a race condition that can lead to deadlocks. lf_print_error_and_exit("**** (update thread) Inconsistency: All ports are known, but MLAA is blocking."); } @@ -1183,6 +1200,7 @@ static void* update_ports_from_staa_offsets(void* args) { // Since max_level_allowed_to_advance will block advancement of time, we cannot follow // through to the next step without deadlocking. Wait some time, then continue. // The wait is necessary to prevent a busy wait. + // FIXME: This seems wrong because the mutex is held while sleeping! lf_sleep(2 * MIN_SLEEP_DURATION); continue; } @@ -2736,22 +2754,8 @@ instant_t lf_wait_until_time(tag_t tag) { tag.time -= 1; } - for (size_t i = 0; i < _lf_action_table_size; i++) { - tag_t known_to = _lf_action_table[i]->trigger->last_known_status_tag; - if (lf_tag_compare(known_to, tag) < 0) { - // There is a network input port for which it is not known whether a message with tag earlier - // than the specified tag may later arrive. Add the STA offset. - // Prevent an overflow and allow the STA offset to be FOREVER. - if (result < FOREVER - lf_fed_STA_offset) { - LF_PRINT_DEBUG("Adding STA " PRINTF_TIME " to wait until time " PRINTF_TIME ".", lf_fed_STA_offset, - result - start_time); - result += lf_fed_STA_offset; - } else { - LF_PRINT_DEBUG("Setting the wait time to FOREVER."); - result = FOREVER; - } - break; // No need to check the rest. - } + if (!inputs_known_to(tag)) { + result = lf_time_add(result, lf_fed_STA_offset); } } return result; diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index b33bb5df1..acb50694d 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -194,14 +194,9 @@ void lf_set_present(lf_port_base_t* port) { } } -bool wait_until(tag_t tag, lf_cond_t* condition) { -#ifdef FEDERATED_DECENTRALIZED // Only apply the STA if coordination is decentralized. - interval_t wait_until_time = lf_wait_until_time(tag); -#else // not FEDERATED_DECENTRALIZED - interval_t wait_until_time = tag.time; -#endif // FEDERATED_DECENTRALIZED +bool wait_until(instant_t wait_until_time, lf_cond_t* condition) { if (!fast) { - LF_PRINT_DEBUG("-------- Waiting until physical time matches logical time " PRINTF_TIME, + LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, wait_until_time - start_time); // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); @@ -387,8 +382,18 @@ void _lf_next_locked(environment_t* env) { // Wait for physical time to advance to the next event time (or stop time). // This can be interrupted if a physical action triggers (e.g., a message // arrives from an upstream federate or a local physical action triggers). - LF_PRINT_LOG("Waiting until elapsed time " PRINTF_TIME ".", (next_tag.time - start_time)); - while (!wait_until(next_tag, &env->event_q_changed)) { + while (true) { +#ifdef FEDERATED_DECENTRALIZED + // Apply the STA, if needed. + interval_t wait_until_time = lf_wait_until_time(next_tag); +#else // not FEDERATED_DECENTRALIZED + interval_t wait_until_time = next_tag.time; +#endif // FEDERATED_DECENTRALIZED + LF_PRINT_LOG("Waiting until elapsed time " PRINTF_TIME ".", (wait_until_time - start_time)); + if (wait_until(wait_until_time, &env->event_q_changed)) { + // Waited the full time. + break; + } LF_PRINT_DEBUG("_lf_next_locked(): Wait until time interrupted."); // Sleep was interrupted. Check for a new next_event. // The interruption could also have been due to a call to lf_request_stop(). @@ -570,8 +575,7 @@ void _lf_initialize_start_tag(environment_t* env) { } // The start time will likely have changed. Adjust the current tag and stop tag. - tag_t start_tag = (tag_t){.time = start_time, .microstep = 0u}; - env->current_tag = start_tag; + env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; if (duration >= 0LL) { // A duration has been specified. Recalculate the stop time. env->stop_tag = ((tag_t){.time = start_time + duration, .microstep = 0}); @@ -591,24 +595,16 @@ void _lf_initialize_start_tag(environment_t* env) { #endif LF_PRINT_LOG("Waiting for start time " PRINTF_TIME ".", start_time); - // Call wait_until if federated. This is required because the startup procedure + // Wait until the start time. This is required for federates because the startup procedure // in lf_synchronize_with_other_federates() can decide on a new start_time that is // larger than the current physical time. - // Therefore, if --fast was not specified, wait until physical time matches - // or exceeds the start time. Microstep is ignored. // This wait_until() is deliberately called after most precursor operations // for tag (0,0) are performed (e.g., injecting startup reactions, etc.). // This has two benefits: First, the startup overheads will reduce // the required waiting time. Second, this call releases the mutex lock and allows // other threads (specifically, federate threads that handle incoming p2p messages - // from other federates) to hold the lock and possibly raise a tag barrier. This is - // especially useful if an STA is set properly because the federate will get - // a chance to process incoming messages while utilizing the STA. - - // Here we wait until the start time and also release the environment mutex. - // this means that the other worker threads will be allowed to start. We need - // this to avoid potential deadlock in federated startup. - while (!wait_until(start_tag, &env->event_q_changed)) { + // from other federates) to hold the lock and possibly raise a tag barrier. + while (!wait_until(start_time, &env->event_q_changed)) { }; LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + lf_fed_STA_offset); LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME ". This should be close to the STA offset.", diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 5805b26ac..2f5463165 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -84,15 +84,13 @@ void lf_synchronize_with_other_federates(void); /** * @brief Wait until physical time matches or exceeds the time of the specified tag. * - * If -fast is given, there will be no wait. For federated programs with decentralized coordination, - * this function will add the STA offset to the wait time unless all network input ports are known - * up to, but not necessarily including, the specified tag. + * If -fast is given, there will be no wait. * * If an event is put on the event queue during the wait, then the wait is * interrupted and this function returns false. It also returns false if the * timeout time is reached before the wait has completed. Note this this could - * return true even if the a new event was placed on the queue if that event - * time matches or exceeds the specified time. + * return true even if the a new event was placed on the queue. This will occur + * if that event time matches or exceeds the specified time. * * The mutex lock associated with the condition argument is assumed to be held by * the calling thread. This mutex is released while waiting. If the wait time is @@ -100,7 +98,7 @@ void lf_synchronize_with_other_federates(void); * immediately returns true and the mutex is not released. * * @param env Environment within which we are executing. - * @param tag The tag with the time to wait until physical time matches it. + * @param wait_until_time The time to wait until physical time matches it. * @param condition A condition variable that can interrupt the wait. The mutex * associated with this condition variable will be released during the wait. * @@ -109,7 +107,7 @@ void lf_synchronize_with_other_federates(void); * the stop time, if one was specified. Return true if the full wait time * was reached. */ -bool wait_until(tag_t tag, lf_cond_t* condition); +bool wait_until(instant_t wait_until_time, lf_cond_t* condition); tag_t get_next_event_tag(environment_t* env); tag_t send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply); From 1c4063dd6deceeb2cb26cbf2ad64597209ad8dc8 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sun, 4 Aug 2024 09:58:40 -0400 Subject: [PATCH 07/12] Format --- core/federated/federate.c | 3 +-- core/threaded/reactor_threaded.c | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index df4670919..ada4da6ce 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1120,8 +1120,7 @@ static void* update_ports_from_staa_offsets(void* args) { // We need to add the lf_fed_STA_offset to the wait time and guard against overflow. interval_t wait_time = lf_time_add(staa_elem->STAA, lf_fed_STA_offset); instant_t wait_until_time = lf_time_add(env->current_tag.time, wait_time); - LF_PRINT_DEBUG("**** (update thread) wait_until_time: " PRINTF_TIME, - wait_until_time - lf_time_start()); + LF_PRINT_DEBUG("**** (update thread) wait_until_time: " PRINTF_TIME, wait_until_time - lf_time_start()); // The wait_until call will release the env->mutex while it is waiting. // However, it will not release the env->mutex if the wait time is too small. diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index acb50694d..56a53cffa 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -196,8 +196,7 @@ void lf_set_present(lf_port_base_t* port) { bool wait_until(instant_t wait_until_time, lf_cond_t* condition) { if (!fast) { - LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, - wait_until_time - start_time); + LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, wait_until_time - start_time); // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); if (wait_duration < MIN_SLEEP_DURATION) { From 41e027c3ffdf56739180920364102ebc1c1e08f4 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sun, 4 Aug 2024 11:40:11 -0400 Subject: [PATCH 08/12] Compiler nit --- core/federated/federate.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/federated/federate.c b/core/federated/federate.c index ada4da6ce..f34e3ee45 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1075,6 +1075,7 @@ static int id_of_action(lf_action_base_t* input_port_action) { #endif +#ifdef FEDERATED_DECENTRALIZED /** * @brief Return true if all network input ports are known up to the specified tag. * @param tag The tag. @@ -1090,6 +1091,7 @@ static bool inputs_known_to(tag_t tag) { } return true; } +#endif // FEDERATED_DECENTRALIZED /** * @brief Thread handling setting the known absent status of input ports. From 3814afb041f879973bfb1adf27491edf8c6b1c9b Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Mon, 5 Aug 2024 14:15:01 -0400 Subject: [PATCH 09/12] Removed unused condition variable --- core/federated/federate.c | 1 - core/reactor_common.c | 2 -- include/core/federated/federate.h | 5 ----- lingua-franca-ref.txt | 2 +- 4 files changed, 1 insertion(+), 9 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index f34e3ee45..87059a434 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -51,7 +51,6 @@ extern bool _lf_termination_executed; // Global variables references in federate.h lf_mutex_t lf_outbound_socket_mutex; lf_cond_t lf_port_status_changed; -lf_cond_t lf_current_tag_changed; /** * The max level allowed to advance (MLAA) is a variable that tracks how far in the reaction diff --git a/core/reactor_common.c b/core/reactor_common.c index 34f9d68eb..83a1592b9 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -217,8 +217,6 @@ void _lf_start_time_step(environment_t* env) { // Reset absent fields on network ports because // their status is unknown lf_reset_status_fields_on_input_port_triggers(); - // Signal the helper thread to reset its progress since the logical time has changed. - lf_cond_signal(&lf_current_tag_changed); } #endif // FEDERATED } diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index c6372b803..50c59daa1 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -215,11 +215,6 @@ extern lf_mutex_t lf_outbound_socket_mutex; */ extern lf_cond_t lf_port_status_changed; -/** - * Condition variable for blocking on tag advance in - */ -extern lf_cond_t lf_current_tag_changed; - ////////////////////////////////////////////////////////////////////////////////// // Public functions (in alphabetical order) diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 1f7391f92..526c8f63c 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master +dataflow From 21dee16522bcc9843dd0d18e9bca71bed3081ed4 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Mon, 5 Aug 2024 14:53:48 -0400 Subject: [PATCH 10/12] Release mutex in extra wait --- core/federated/federate.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 87059a434..39189b7cd 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1126,8 +1126,9 @@ static void* update_ports_from_staa_offsets(void* args) { // The wait_until call will release the env->mutex while it is waiting. // However, it will not release the env->mutex if the wait time is too small. // At the cost of a small additional delay in deciding a port is absent, - // we require a minimum wait time here. Otherwise, if both the STAA and STA are - // zero, this thread will fail to ever release the environment mutex. + // we require a minimum wait time here. Note that zero-valued STAAs are not + // included, but STA might be zero and the STAA might be very small. + // In this case, this thread will fail to ever release the environment mutex. // This causes chaos. The MIN_SLEEP_DURATION is the smallest amount of time // that wait_until will actually wait. Note that this strategy does not // block progress of any execution that is actually processing events. @@ -1199,9 +1200,12 @@ static void* update_ports_from_staa_offsets(void* args) { // Since max_level_allowed_to_advance will block advancement of time, we cannot follow // through to the next step without deadlocking. Wait some time, then continue. - // The wait is necessary to prevent a busy wait. - // FIXME: This seems wrong because the mutex is held while sleeping! - lf_sleep(2 * MIN_SLEEP_DURATION); + // The wait is necessary to prevent a busy wait, which will only occur if port + // status are always known inside the while loop + // Be sure to use wait_until() instead of sleep() because sleep() will not release the mutex. + instant_t wait_until_time = lf_time_add(env->current_tag.time, 2 * MIN_SLEEP_DURATION); + wait_until(wait_until_time, &lf_port_status_changed); + continue; } From 89b0e3617db1d2730552a418fd35f1395afe114a Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Mon, 5 Aug 2024 18:12:22 -0400 Subject: [PATCH 11/12] Comments only --- core/federated/federate.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 39189b7cd..75c4cc04c 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1126,8 +1126,8 @@ static void* update_ports_from_staa_offsets(void* args) { // The wait_until call will release the env->mutex while it is waiting. // However, it will not release the env->mutex if the wait time is too small. // At the cost of a small additional delay in deciding a port is absent, - // we require a minimum wait time here. Note that zero-valued STAAs are not - // included, but STA might be zero and the STAA might be very small. + // we require a minimum wait time here. Note that zero-valued STAAs are + // included, and STA might be zero or very small. // In this case, this thread will fail to ever release the environment mutex. // This causes chaos. The MIN_SLEEP_DURATION is the smallest amount of time // that wait_until will actually wait. Note that this strategy does not From 7db4930deaa0e4100b017b1d274e24aecba11e0d Mon Sep 17 00:00:00 2001 From: Marten Lohstroh Date: Thu, 8 Aug 2024 21:43:12 -0700 Subject: [PATCH 12/12] Update lingua-franca-ref.txt --- lingua-franca-ref.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 526c8f63c..1f7391f92 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -dataflow +master