diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index ba1661a19..de0864555 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -91,6 +91,7 @@ static void send_failed_signal(federate_info_t* fed) { /** * @brief Function to run upon termination. + * * This function will be invoked both after main() returns and when a signal * that results in terminating the process, such as SIGINT. In the former * case, it should do nothing. In the latter case, it will send a MSG_TYPE_FAILED diff --git a/core/federated/RTI/rti_local.c b/core/federated/RTI/rti_local.c index 538a5b2be..f69f21dc5 100644 --- a/core/federated/RTI/rti_local.c +++ b/core/federated/RTI/rti_local.c @@ -53,9 +53,9 @@ void initialize_local_rti(environment_t *envs, int num_envs) { rti_local->base.scheduling_nodes[i] = (scheduling_node_t *) enclave_info; // Encode the connection topology into the enclave_info object. - enclave_info->base.num_downstream = _lf_get_downstream_of(i, &enclave_info->base.downstream); - enclave_info->base.num_upstream = _lf_get_upstream_of(i, &enclave_info->base.upstream); - _lf_get_upstream_delay_of(i, &enclave_info->base.upstream_delay); + enclave_info->base.num_downstream = lf_get_downstream_of(i, &enclave_info->base.downstream); + enclave_info->base.num_upstream = lf_get_upstream_of(i, &enclave_info->base.upstream); + lf_get_upstream_delay_of(i, &enclave_info->base.upstream_delay); enclave_info->base.state = GRANTED; } diff --git a/core/federated/RTI/rti_local.h b/core/federated/RTI/rti_local.h index 8960ad1b1..30c255c42 100644 --- a/core/federated/RTI/rti_local.h +++ b/core/federated/RTI/rti_local.h @@ -1,3 +1,19 @@ +/** + * @file + * @author Erling Jellum (erling.r.jellum@ntnu.no) + * @author Edward A. Lee (eal@berkeley.edu) + * @author Chadlia Jerad (chadlia.jerad@ensi-uma.tn) + * @author Soroush Bateni (soroush@utdallas.edu) + * @copyright (c) 2020-2024, The University of California at Berkeley + * License in [BSD 2-clause](https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md) + * + * @brief This file declares functions used to implement scheduling enclaves. + * + * A scheduling enclave is portion of the runtime system that maintains its own event + * and reaction queues and has its own scheduler. It uses a local runtime infrastructure (RTI) + * to coordinate the advancement of tags across enclaves. + */ + #ifndef RTI_LOCAL_H #define RTI_LOCAL_H @@ -8,9 +24,9 @@ #include "rti_common.h" /** - * @brief Structure holding information about each enclave in the program - * The first field is the generic scheduling_node_info struct + * @brief Structure holding information about each enclave in the program. * + * The first field is the generic scheduling_node_info struct */ typedef struct enclave_info_t { scheduling_node_t base; @@ -21,7 +37,6 @@ typedef struct enclave_info_t { /** * @brief Structure holding information about the local RTI - * */ typedef struct { rti_common_t base; @@ -29,6 +44,8 @@ typedef struct { /** * @brief Dynamically create and initialize the local RTI. + * @param envs Array of environments. + * @param num_envs Number of environments. */ void initialize_local_rti(environment_t* envs, int num_envs); @@ -39,13 +56,15 @@ void free_local_rti(); /** * @brief Initialize the enclave object. - * - * @param enclave + * @param enclave The enclave object to initialize. + * @param idx The index of the enclave. + * @param env The environment of the enclave. */ void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t *env); /** * @brief Notify the local RTI of a next event tag (NET). + * * This function call may block. A call to this function serves two purposes. * 1) It is a promise that, unless receiving events from other enclaves, this * enclave will not produce any event until the next_event_tag (NET) argument. @@ -65,8 +84,9 @@ void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t *en tag_t rti_next_event_tag_locked(enclave_info_t* enclave, tag_t next_event_tag); /** - * @brief This function informs the local RTI that `enclave` has completed tag - * `completed`. This will update the data structures and can release other + * @brief Inform the local RTI that `enclave` has completed tag `completed`. + * + * This will update the data structures and can release other * enclaves waiting on a TAG. * * This assumes the caller is holding the environment mutex of the source enclave. @@ -77,16 +97,55 @@ tag_t rti_next_event_tag_locked(enclave_info_t* enclave, tag_t next_event_tag); void rti_logical_tag_complete_locked(enclave_info_t* enclave, tag_t completed); /** - * @brief This function is called after scheduling an event onto the event queue - * of another enclave. The source enclave must call this function to potentially update + * @brief Notify the local RTI to update the next event tag (NET) of a target enclave. + * + * This function is called after scheduling an event onto the event queue of another enclave. + * The source enclave must call this function to potentially update * the NET of the target enclave. * * This assumes the caller is holding the environment mutex of the target enclave. * - * @param target The enclave of which we want to update the NET of - * @param net The proposed next event tag + * @param src The enclave that has scheduled an event. + * @param target The enclave of which we want to update the NET of. + * @param net The proposed next event tag. */ void rti_update_other_net_locked(enclave_info_t* src, enclave_info_t* target, tag_t net); +/** + * @brief Get the array of ids of enclaves directly upstream of the specified enclave. + * + * This updates the specified result pointer to point to a statically allocated array of IDs + * and returns the length of the array. The implementation is code-generated. + * + * @param enclave_id The enclave for which to report upstream IDs. + * @param result The pointer to dereference and update to point to the resulting array. + * @return The number of direct upstream enclaves. + */ +int lf_get_upstream_of(int enclave_id, int** result); + +/** + * @brief Get the array of ids of enclaves directly downstream of the specified enclave. + * + * This updates the specified result pointer to point to a statically allocated array of IDs + * and returns the length of the array. The implementation is code-generated. + * + * @param enclave_id The enclave for which to report downstream IDs. + * @param result The pointer to dereference and update to point to the resulting array. + * @return The number of direct downstream enclaves. + */ +int lf_get_downstream_of(int enclave_id, int** result); + +/** + * @brief Retrieve the delays on the connections to direct upstream enclaves. + * + * This updates the result pointer to point to a statically allocated array of delays. + * The implementation is code-generated. + * + * @param enclave_id The enclave for which to search for upstream delays. + * @param result The pointer to dereference and update to point to the resulting array. + * @return int The number of direct upstream enclaves. + */ +int lf_get_upstream_delay_of(int enclave_id, interval_t** result); + #endif // LF_ENCLAVES #endif // RTI_LOCAL_H diff --git a/core/federated/federate.c b/core/federated/federate.c index b96cbcb7e..290433851 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -33,6 +33,7 @@ #include "reactor.h" #include "reactor_common.h" #include "reactor_threaded.h" +#include "api/schedule.h" #include "scheduler.h" #include "trace.h" @@ -371,7 +372,7 @@ static trigger_handle_t schedule_message_received_from_network_locked( LF_PRINT_LOG("Calling schedule with 0 delay and intended tag " PRINTF_TAG ".", trigger->intended_tag.time - start_time, trigger->intended_tag.microstep); - return_value = _lf_schedule(env, trigger, extra_delay, token); + return_value = lf_schedule_trigger(env, trigger, extra_delay, token); #endif } else { // In case the message is in the future, call @@ -501,7 +502,7 @@ static int handle_message(int* socket, int fed_id) { LF_PRINT_LOG("Message received by federate: %s. Length: %zu.", message_contents, length); LF_PRINT_DEBUG("Calling schedule for message received on a physical connection."); - _lf_schedule_value(action, 0, message_contents, length); + lf_schedule_value(action, 0, message_contents, length); return 0; } @@ -1122,7 +1123,7 @@ static void* update_ports_from_staa_offsets(void* args) { staa_t* staa_elem = staa_lst[i]; // The staa_elem is adjusted in the code generator to have subtracted the delay on the connection. // The list is sorted in increasing order of adjusted STAA offsets. - // The wait_until function automatically adds the _lf_fed_STA_offset to the wait time. + // The wait_until function automatically adds the lf_fed_STA_offset to the wait time. interval_t wait_until_time = env->current_tag.time + staa_elem->STAA; LF_PRINT_DEBUG("**** (update thread) original wait_until_time: " PRINTF_TIME, wait_until_time - lf_time_start()); @@ -1136,7 +1137,7 @@ static void* update_ports_from_staa_offsets(void* args) { // block progress of any execution that is actually processing events. // It only slightly delays the decision that an event is absent, and only // if the STAA and STA are extremely small. - if (_lf_fed_STA_offset + staa_elem->STAA < 5 * MIN_SLEEP_DURATION) { + if (lf_fed_STA_offset + staa_elem->STAA < 5 * MIN_SLEEP_DURATION) { wait_until_time += 5 * MIN_SLEEP_DURATION; } while (a_port_is_unknown(staa_elem)) { @@ -1363,7 +1364,7 @@ static void handle_stop_granted_message() { received_stop_tag.microstep++; } - _lf_set_stop_tag(&env[i], received_stop_tag); + lf_set_stop_tag(&env[i], received_stop_tag); LF_PRINT_DEBUG("Setting the stop tag to " PRINTF_TAG ".", env[i].stop_tag.time - start_time, env[i].stop_tag.microstep); @@ -1662,7 +1663,7 @@ static bool bounded_NET(tag_t* tag) { * generates an empty implementation. * @param env The environment of the federate */ -void terminate_execution(environment_t* env) { +void lf_terminate_execution(environment_t* env) { assert(env != GLOBAL_ENVIRONMENT); // For an abnormal termination (e.g. a SIGINT), we need to send a @@ -2668,7 +2669,7 @@ int lf_send_tagged_message(environment_t* env, // tag of the outgoing message. tag_t current_message_intended_tag = lf_delay_tag(env->current_tag, additional_delay); - if (_lf_is_tag_after_stop_tag(env, current_message_intended_tag)) { + if (lf_is_tag_after_stop_tag(env, current_message_intended_tag)) { // Message tag is past the timeout time (the stop time) so it should not be sent. LF_PRINT_LOG("Dropping message because it will be after the timeout time."); return -1; diff --git a/core/lf_token.c b/core/lf_token.c index 8e4b5ae43..8da5d9c01 100644 --- a/core/lf_token.c +++ b/core/lf_token.c @@ -1,33 +1,9 @@ /** * @file * @author Edward A. Lee (eal@berkeley.edu) - * - * @section LICENSE - * Copyright (c) 2022, The University of California at Berkeley. - * - * Redistribution and use in source and binary forms, with or without modification, - * are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, - * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF - * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * @section DESCRIPTION - * - * Functions supporting token types. See lf_token.h for docs. + * * @copyright (c) 2020-2024, The University of California at Berkeley. + * License: BSD 2-clause + * @brief Functions supporting token types. See lf_token.h for docs. */ #if !defined NDEBUG @@ -47,7 +23,7 @@ int _lf_count_token_allocations; #include "lf_types.h" #include "hashset/hashset_itr.h" #include "util.h" -#include "reactor_common.h" // Enter/exit critical sections +#include "platform.h" // Enter/exit critical sections #include "port.h" // Defines lf_port_base_t. lf_token_t* _lf_tokens_allocated_in_reactions = NULL; diff --git a/core/modal_models/modes.c b/core/modal_models/modes.c index 91d58b7df..eda031755 100644 --- a/core/modal_models/modes.c +++ b/core/modal_models/modes.c @@ -42,6 +42,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "platform.h" #include "lf_types.h" #include "modes.h" +#include "reactor.h" #include "reactor_common.h" // Bit masks for the internally used flags on modes @@ -390,7 +391,7 @@ void _lf_process_mode_changes( for (int j = 0; j < timer_triggers_size; j++) { trigger_t* timer = timer_triggers[j]; if (timer->period == 0 && timer->mode == state->next_mode) { - _lf_schedule(env, timer, timer->offset, NULL); + lf_schedule_trigger(env, timer, timer->offset, NULL); } } } @@ -407,7 +408,7 @@ void _lf_process_mode_changes( LF_PRINT_DEBUG("Modes: Re-enqueuing reset timer."); // Reschedule the timer with no additional delay. // This will take care of super dense time when offset is 0. - _lf_schedule(env, timer, event->trigger->offset, NULL); + lf_schedule_trigger(env, timer, event->trigger->offset, NULL); } // No further processing; drops all events upon reset (timer event was recreated by schedule and original can be removed here) } else if (state->next_mode != state->current_mode && event->trigger != NULL) { // History transition to a different mode @@ -430,11 +431,11 @@ void _lf_process_mode_changes( event_t* tmp = e->next; e = tmp->next; // A fresh event was created by schedule, hence, recycle old one - _lf_recycle_event(env, tmp); + lf_recycle_event(env, tmp); } } // A fresh event was created by schedule, hence, recycle old one - _lf_recycle_event(env, event); + lf_recycle_event(env, event); // Remove suspended event and continue suspended_event = _lf_remove_suspended_event(suspended_event); @@ -541,7 +542,7 @@ void _lf_process_mode_changes( void _lf_terminate_modal_reactors(environment_t* env) { _lf_suspended_event_t* suspended_event = _lf_suspended_events_head; while(suspended_event != NULL) { - _lf_recycle_event(env, suspended_event->event); + lf_recycle_event(env, suspended_event->event); _lf_suspended_event_t* next = suspended_event->next; free(suspended_event); suspended_event = next; diff --git a/core/platform/lf_zephyr_support.c b/core/platform/lf_zephyr_support.c index d9bbef80b..b132cd9f7 100644 --- a/core/platform/lf_zephyr_support.c +++ b/core/platform/lf_zephyr_support.c @@ -35,7 +35,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "lf_zephyr_support.h" #include "lf_zephyr_board_support.h" #include "platform.h" -#include "reactor_common.h" +#include "reactor.h" #include "utils/util.h" #include "tag.h" diff --git a/core/reactor.c b/core/reactor.c index da10beb2b..9e2092dcd 100644 --- a/core/reactor.c +++ b/core/reactor.c @@ -1,39 +1,14 @@ -#if defined(LF_SINGLE_THREADED) -/* Runtime infrastructure for the non-threaded version of the C target of Lingua Franca. */ - -/************* -Copyright (c) 2019, The University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - /** - * @brief Runtime implementation for the non-threaded version of the - * C target of Lingua Franca. + * @brief Runtime implementation for the single-threaded version of the C target of Lingua Franca. * * @author{Edward A. Lee } * @author{Marten Lohstroh } * @author{Soroush Bateni } * @author{Erling Jellum } */ + +#if defined(LF_SINGLE_THREADED) + #include #include @@ -51,6 +26,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // Global variable defined in tag.c: extern instant_t start_time; +// Defined in reactor_common.c: +extern bool fast; +extern bool keepalive_specified; + void lf_set_present(lf_port_base_t* port) { if (!port->source_reactor) return; environment_t *env = port->source_reactor->environment; @@ -92,6 +71,7 @@ int wait_until(environment_t* env, instant_t wakeup_time) { return 0; } +#ifndef NDEBUG void lf_print_snapshot(environment_t* env) { if(LOG_LEVEL > LOG_LEVEL_LOG) { LF_PRINT_DEBUG(">>> START Snapshot"); @@ -99,17 +79,12 @@ void lf_print_snapshot(environment_t* env) { LF_PRINT_DEBUG(">>> END Snapshot"); } } +#else // NDEBUG +void lf_print_snapshot(environment_t* env) { + // Do nothing. +} +#endif // NDEBUG -/** - * Trigger 'reaction'. - * - * @param env Environment in which we are executing - * @param reaction The reaction. - * @param worker_number The ID of the worker that is making this call. 0 should be - * used if there is only one worker (e.g., when the program is using the - * single-threaded C runtime). -1 is used for an anonymous call in a context where a - * worker number does not make sense (e.g., the caller is not a worker thread). - */ void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number) { assert(env != GLOBAL_ENVIRONMENT); @@ -240,7 +215,7 @@ int next(environment_t* env) { if (event == NULL) { // No event in the queue. if (!keepalive_specified) { - _lf_set_stop_tag( env, + lf_set_stop_tag( env, (tag_t){.time=env->current_tag.time, .microstep=env->current_tag.microstep+1} ); } @@ -254,7 +229,7 @@ int next(environment_t* env) { } } - if (_lf_is_tag_after_stop_tag(env, next_tag)) { + if (lf_is_tag_after_stop_tag(env, next_tag)) { // Cannot process events after the stop tag. next_tag = env->stop_tag; } @@ -304,7 +279,7 @@ void lf_request_stop(void) { tag_t new_stop_tag; new_stop_tag.time = env->current_tag.time; new_stop_tag.microstep = env->current_tag.microstep + 1; - _lf_set_stop_tag(env, new_stop_tag); + lf_set_stop_tag(env, new_stop_tag); } /** @@ -329,7 +304,7 @@ bool _lf_is_blocked_by_executing_reaction(void) { */ int lf_reactor_c_main(int argc, const char* argv[]) { // Invoke the function that optionally provides default command-line options. - _lf_set_default_command_line_options(); + lf_set_default_command_line_options(); _lf_initialize_clock(); LF_PRINT_DEBUG("Processing command line arguments."); @@ -348,7 +323,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { signal(SIGINT, exit); #endif // Create and initialize the environment - _lf_create_environments(); // code-generated function + lf_create_environments(); // code-generated function environment_t *env; int num_environments = _lf_get_environments(&env); LF_ASSERT(num_environments == 1, diff --git a/core/reactor_common.c b/core/reactor_common.c index 50d3c6422..866e64b79 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -1,40 +1,14 @@ -/* Runtime infrastructure for the C target of Lingua Franca. */ - -/************* -Copyright (c) 2019, The University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - /** - * Runtime infrastructure for the C target of Lingua Franca. - * This file contains resources that are shared by the threaded and - * non-threaded versions of the C runtime. - * - * @author{Edward A. Lee } - * @author{Marten Lohstroh } - * @author{Mehrdad Niknami } - * @author{Soroush Bateni } - * @author{Erling Rennemo Jellum } + * @file + * @author Edward A. Lee + * @author Marten Lohstroh + * @author Soroush Bateni + * @author Mehrdad Niknami + * @author Alexander Schulz-Rosengarten + * @author Erling Rennemo Jellum + * @copyright (c) 2020-2024, The University of California at Berkeley. + * License: BSD 2-clause + * @brief Runtime infrastructure common to the threaded and single-threaded versions of the C runtime. */ #include #include @@ -42,7 +16,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include "platform.h" -#include "lf_types.h" +#include "api/schedule.h" #ifdef MODAL_REACTORS #include "modes.h" #endif @@ -52,14 +26,13 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "port.h" #include "pqueue.h" #include "reactor.h" -#include "reactor_common.h" -#include "tag.h" #include "trace.h" #include "util.h" #include "vector.h" #include "hashset/hashset.h" #include "hashset/hashset_itr.h" #include "environment.h" +#include "reactor_common.h" #if !defined(LF_SINGLE_THREADED) #include "watchdog.h" @@ -73,6 +46,19 @@ extern instant_t start_time; extern int _lf_count_payload_allocations; #endif +#ifdef FEDERATED_DECENTRALIZED + +/** + * @brief Global STA (safe to advance) offset uniformly applied to advancement of each + * time step in federated execution. + * + * This can be retrieved in user code by calling lf_get_stp_offset() and adjusted by + * calling lf_set_stp_offset(interval_t offset). + */ +interval_t lf_fed_STA_offset = 0LL; + +#endif // FEDERATED_DECENTRALIZED + /** * Indicator of whether to wait for physical time to match logical time. * By default, execution will wait. The command-line argument -fast will @@ -100,26 +86,7 @@ instant_t duration = -1LL; /** Indicator of whether the keepalive command-line option was given. */ bool keepalive_specified = false; -/** - * Global STP offset uniformly applied to advancement of each - * time step in federated execution. This can be retrieved in - * user code by calling lf_get_stp_offset() and adjusted by - * calling lf_set_stp_offset(interval_t offset). - */ -interval_t _lf_fed_STA_offset = 0LL; - -/** - * Allocate memory using calloc (so the allocated memory is zeroed out) - * and record the allocated memory on the specified self struct so that - * it will be freed when calling {@link free_reactor(self_base_t)}. - * @param count The number of items of size 'size' to accomodate. - * @param size The size of each item. - * @param head Pointer to the head of a list on which to record - * the allocation, or NULL to not record it. - * @return A pointer to the allocated memory. - */ -void* _lf_allocate( - size_t count, size_t size, struct allocation_record_t** head) { +void* lf_allocate(size_t count, size_t size, struct allocation_record_t** head) { void *mem = calloc(count, size); if (mem == NULL) lf_print_error_and_exit("Out of memory!"); if (head != NULL) { @@ -140,25 +107,11 @@ void* _lf_allocate( */ struct allocation_record_t* _lf_reactors_to_free = NULL; -/** - * Allocate memory for a new runtime instance of a reactor. - * This records the reactor on the list of reactors to be freed at - * termination of the program. If you plan to free the reactor before - * termination of the program, use calloc instead (which this uses). - * @param size The size of the self struct, obtained with sizeof(). - */ -void* _lf_new_reactor(size_t size) { - return _lf_allocate(1, size, &_lf_reactors_to_free); +self_base_t* lf_new_reactor(size_t size) { + return (self_base_t*)lf_allocate(1, size, &_lf_reactors_to_free); } -/** - * Free memory on the specified allocation record, e.g. allocated by - * {@link _lf_allocate(size_t, size_t, allocation_record_t**)}. - * Mark the list empty by setting `*head` to NULL. - * @param head Pointer to the head of a list on which to record - * the allocation, or NULL to not record it. - */ -void _lf_free(struct allocation_record_t** head) { +void lf_free(struct allocation_record_t** head) { if (head == NULL) return; struct allocation_record_t* record = *head; while (record != NULL) { @@ -172,24 +125,15 @@ void _lf_free(struct allocation_record_t** head) { *head = NULL; } -/** - * Free memory recorded on the allocations list of the specified reactor - * and then free the specified self struct. - * @param self The self struct of the reactor. - */ -void _lf_free_reactor(self_base_t *self) { - _lf_free(&self->allocations); +void lf_free_reactor(self_base_t *self) { + lf_free(&self->allocations); free(self); } -/** - * Free all the reactors that are allocated with - * {@link #_lf_new_reactor(size_t)}. - */ -void _lf_free_all_reactors(void) { +void lf_free_all_reactors(void) { struct allocation_record_t* head = _lf_reactors_to_free; while (head != NULL) { - _lf_free_reactor((self_base_t*)head->allocated); + lf_free_reactor((self_base_t*)head->allocated); struct allocation_record_t* tmp = head->next; free(head); head = tmp; @@ -197,65 +141,27 @@ void _lf_free_all_reactors(void) { _lf_reactors_to_free = NULL; } -/** - * Set the stop tag. - * - * This function will always choose the minimum - * of the provided tag and stop_tag - * - * @note In threaded programs, the mutex must be locked before - * calling this function. - */ -void _lf_set_stop_tag(environment_t* env, tag_t tag) { +void lf_set_stop_tag(environment_t* env, tag_t tag) { assert(env != GLOBAL_ENVIRONMENT); if (lf_tag_compare(tag, env->stop_tag) < 0) { env->stop_tag = tag; } } -///////////////////////////// -// The following functions are in scope for all reactors: +#ifdef FEDERATED_DECENTRALIZED -/** - * Return the global STP offset on advancement of logical - * time for federated execution. - */ interval_t lf_get_stp_offset() { - return _lf_fed_STA_offset; + return lf_fed_STA_offset; } -/** - * Set the global STP offset on advancement of logical - * time for federated execution. - * - * @param offset A positive time value to be applied - * as the STP offset. - */ void lf_set_stp_offset(interval_t offset) { if (offset > 0LL) { - _lf_fed_STA_offset = offset; + lf_fed_STA_offset = offset; } } +#endif // FEDERATED_DECENTRALIZED -/** - * Trigger 'reaction'. - * - * @param env Environment in which we are executing. - * @param reaction The reaction. - * @param worker_number The ID of the worker that is making this call. 0 should be - * used if there is only one worker (e.g., when the program is using the - * single-threaded C runtime). -1 is used for an anonymous call in a context where a - * worker number does not make sense (e.g., the caller is not a worker thread). - */ -void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number); - -/** - * Use tables to reset is_present fields to false, - * set intended_tag fields in federated execution - * to the current_tag, and decrement reference - * counts between time steps and at the end of execution. - */ void _lf_start_time_step(environment_t *env) { assert(env != GLOBAL_ENVIRONMENT); if (!env->execution_started) { @@ -313,23 +219,11 @@ void _lf_start_time_step(environment_t *env) { #endif // FEDERATED } -/** - * A helper function that returns true if the provided tag is after stop tag. - * - * @param env Environment in which we are executing. - * @param tag The tag to check against stop tag - */ -bool _lf_is_tag_after_stop_tag(environment_t* env, tag_t tag) { +bool lf_is_tag_after_stop_tag(environment_t* env, tag_t tag) { assert(env != GLOBAL_ENVIRONMENT); return (lf_tag_compare(tag, env->stop_tag) > 0); } -/** - * Pop all events from event_q with timestamp equal to current_tag.time, extract all - * the reactions triggered by these events, and stick them into the reaction - * queue. - * @param env Environment in which we are executing. - */ void _lf_pop_events(environment_t *env) { assert(env != GLOBAL_ENVIRONMENT); #ifdef MODAL_REACTORS @@ -346,7 +240,7 @@ void _lf_pop_events(environment_t *env) { LF_PRINT_DEBUG("Putting event from the event queue for the next microstep."); pqueue_insert(env->next_q, event->next); } - _lf_recycle_event(env, event); + lf_recycle_event(env, event); // Peek at the next event in the event queue. event = (event_t*)pqueue_peek(env->event_q); continue; @@ -416,7 +310,7 @@ void _lf_pop_events(environment_t *env) { // If the trigger is a periodic timer, create a new event for its next execution. if (event->trigger->is_timer && event->trigger->period > 0LL) { // Reschedule the trigger. - _lf_schedule(env, event->trigger, event->trigger->period, NULL); + lf_schedule_trigger(env, event->trigger, event->trigger->period, NULL); } // Copy the token pointer into the trigger struct so that the @@ -439,7 +333,7 @@ void _lf_pop_events(environment_t *env) { pqueue_insert(env->next_q, event->next); } - _lf_recycle_event(env, event); + lf_recycle_event(env, event); // Peek at the next event in the event queue. event = (event_t*)pqueue_peek(env->event_q); @@ -454,12 +348,7 @@ void _lf_pop_events(environment_t *env) { } } -/** - * Get a new event. If there is a recycled event available, use that. - * If not, allocate a new one. In either case, all fields will be zero'ed out. - * @param env Environment in which we are executing. - */ -static event_t* _lf_get_new_event(environment_t* env) { +event_t* lf_get_new_event(environment_t* env) { assert(env != GLOBAL_ENVIRONMENT); // Recycle event_t structs, if possible. event_t* e = (event_t*)pqueue_pop(env->recycle_q); @@ -473,14 +362,6 @@ static event_t* _lf_get_new_event(environment_t* env) { return e; } -/** - * Initialize the given timer. - * If this timer has a zero offset, enqueue the reactions it triggers. - * If this timer is to trigger reactions at a _future_ tag as well, - * schedule it accordingly. - * @param env Environment in which we are executing. - * @param timer The timer to initialize. - */ void _lf_initialize_timer(environment_t* env, trigger_t* timer) { assert(env != GLOBAL_ENVIRONMENT); interval_t delay = 0; @@ -491,7 +372,7 @@ void _lf_initialize_timer(environment_t* env, trigger_t* timer) { // FIXME: The following check might not be working as // intended // && (timer->offset != 0 || timer->period != 0)) { - event_t* e = _lf_get_new_event(env); + event_t* e = lf_get_new_event(env); e->trigger = timer; e->time = lf_time_logical(env) + timer->offset; _lf_add_suspended_event(e); @@ -516,7 +397,7 @@ void _lf_initialize_timer(environment_t* env, trigger_t* timer) { // Get an event_t struct to put on the event queue. // Recycle event_t structs, if possible. - event_t* e = _lf_get_new_event(env); + event_t* e = lf_get_new_event(env); e->trigger = timer; e->time = lf_time_logical(env) + delay; // NOTE: No lock is being held. Assuming this only happens at startup. @@ -524,10 +405,6 @@ void _lf_initialize_timer(environment_t* env, trigger_t* timer) { tracepoint_schedule(env->trace, timer, delay); // Trace even though schedule is not called. } -/** - * @brief Initialize all the timers in the environment - * @param env Environment in which we are executing. - */ void _lf_initialize_timers(environment_t* env) { assert(env != GLOBAL_ENVIRONMENT); for (int i = 0; i < env->timer_triggers_size; i++) { @@ -539,15 +416,11 @@ void _lf_initialize_timers(environment_t* env) { // To avoid runtime memory allocations for timer-driven programs // the recycle queue is initialized with a single event. if (env->timer_triggers_size > 0) { - event_t *e = _lf_get_new_event(env); - _lf_recycle_event(env, e); + event_t *e = lf_get_new_event(env); + lf_recycle_event(env, e); } } -/** - * @brief Trigger all the startup reactions in our environment - * @param env Environment in which we are executing. - */ void _lf_trigger_startup_reactions(environment_t* env) { assert(env != GLOBAL_ENVIRONMENT); for (int i = 0; i < env->startup_reactions_size; i++) { @@ -571,10 +444,6 @@ void _lf_trigger_startup_reactions(environment_t* env) { #endif } -/** - * @brief Trigger all the shutdown reactions in our environment - * @param env Environment in which we are executing. - */ void _lf_trigger_shutdown_reactions(environment_t *env) { assert(env != GLOBAL_ENVIRONMENT); for (int i = 0; i < env->shutdown_reactions_size; i++) { @@ -593,13 +462,7 @@ void _lf_trigger_shutdown_reactions(environment_t *env) { #endif } -/** - * Recycle the given event. - * Zero it out and pushed it onto the recycle queue. - * @param env Environment in which we are executing. - * @param e The event to recycle. - */ -void _lf_recycle_event(environment_t* env, event_t* e) { +void lf_recycle_event(environment_t* env, event_t* e) { assert(env != GLOBAL_ENVIRONMENT); e->time = 0LL; e->trigger = NULL; @@ -613,17 +476,8 @@ void _lf_recycle_event(environment_t* env, event_t* e) { pqueue_insert(env->recycle_q, e); } -/** - * Create dummy events to be used as spacers in the event queue. - * @param env Environment in which we are executing. - * @param trigger The eventual event to be triggered. - * @param time The logical time of that event. - * @param next The event to place after the dummy events. - * @param offset The number of dummy events to insert. - * @return A pointer to the first dummy event. - */ event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant_t time, event_t* next, microstep_t offset) { - event_t* first_dummy = _lf_get_new_event(env); + event_t* first_dummy = lf_get_new_event(env); event_t* dummy = first_dummy; dummy->time = time; dummy->is_dummy = true; @@ -633,7 +487,7 @@ event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant dummy->next = next; break; } - dummy->next = _lf_get_new_event(env); + dummy->next = lf_get_new_event(env); dummy = dummy->next; dummy->time = time; dummy->is_dummy = true; @@ -643,13 +497,7 @@ event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant return first_dummy; } -/** - * Replace the token on the specified event with the specified - * token and free the old token. - * @param event The event. - * @param token The token. - */ -static void _lf_replace_token(event_t* event, lf_token_t* token) { +void lf_replace_token(event_t* event, lf_token_t* token) { if (event->token != token) { // Free the existing token, if any. _lf_done_using(event->token); @@ -658,34 +506,6 @@ static void _lf_replace_token(event_t* event, lf_token_t* token) { event->token = token; } -/** - * Schedule events at a specific tag (time, microstep), provided - * that the tag is in the future relative to the current tag (or the - * environment has not started executing). The input time values are absolute. - * - * If there is an event found at the requested tag, the payload - * is replaced and 0 is returned. - * - * Note that this function is an internal API that must - * be called with tags that are in order for a given - * trigger. This means that the following order is illegal: - * _lf_schedule_at_tag(trigger1, bigger_tag, ...); - * _lf_schedule_at_tag(trigger1, smaller_tag, ...); - * where bigger_tag > smaller_tag. This function is primarily - * used for network communication (which is assumed to be - * in order). - * - * This function assumes the caller holds the mutex lock. - * - * @param env Environment in which we are executing. - * @param trigger The trigger to be invoked at a later logical time. - * @param tag Logical tag of the event - * @param token The token wrapping the payload or NULL for no payload. - * - * @return A positive trigger handle for success, 0 if no new event was scheduled - * (instead, the payload was updated), or -1 for error (the tag is equal to or less - * than the current tag). - */ trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag_t tag, lf_token_t* token) { assert(env != GLOBAL_ENVIRONMENT); tag_t current_logical_tag = env->current_tag; @@ -706,13 +526,13 @@ trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag } // Do not schedule events if the tag is after the stop tag - if (_lf_is_tag_after_stop_tag(env, tag)) { + if (lf_is_tag_after_stop_tag(env, tag)) { lf_print_warning("_lf_schedule_at_tag: event time is past the timeout. Discarding event."); _lf_done_using(token); return -1; } - event_t* e = _lf_get_new_event(env); + event_t* e = lf_get_new_event(env); // Set the event time e->time = tag.time; @@ -744,23 +564,23 @@ trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag if (found->token != token) { _lf_done_using(token); } - _lf_recycle_event(env, e); + lf_recycle_event(env, e); return(0); break; case replace: // Replace the payload of the event at the head with our // current payload. - _lf_replace_token(found, token); - _lf_recycle_event(env, e); + lf_replace_token(found, token); + lf_recycle_event(env, e); return 0; break; default: // Adding a microstep to the original // intended tag. - if (_lf_is_tag_after_stop_tag(env, (tag_t) {.time=found->time,.microstep=1})) { + if (lf_is_tag_after_stop_tag(env, (tag_t) {.time=found->time,.microstep=1})) { // Scheduling e will incur a microstep after the stop tag, // which is illegal. - _lf_recycle_event(env, e); + lf_recycle_event(env, e); return 0; } if (found->next != NULL) { @@ -812,23 +632,23 @@ trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag if (found->next->token != token) { _lf_done_using(token); } - _lf_recycle_event(env, e); + lf_recycle_event(env, e); return 0; break; case replace: // Replace the payload of the event at the head with our // current payload. - _lf_replace_token(found->next, token); - _lf_recycle_event(env, e); + lf_replace_token(found->next, token); + lf_recycle_event(env, e); return 0; break; default: // Adding a microstep to the original // intended tag. - if (_lf_is_tag_after_stop_tag(env, (tag_t){.time=found->time,.microstep=microstep_of_found+1})) { + if (lf_is_tag_after_stop_tag(env, (tag_t){.time=found->time,.microstep=microstep_of_found+1})) { // Scheduling e will incur a microstep at timeout, // which is illegal. - _lf_recycle_event(env, e); + lf_recycle_event(env, e); return 0; } if (found->next->next != NULL) { @@ -864,277 +684,6 @@ trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag return return_value; } -/** - * Schedule the specified trigger at env->current_tag.time plus the offset of the - * specified trigger plus the delay. See schedule_token() in reactor.h for details. - * This is the internal implementation shared by both the threaded - * and non-threaded versions. - * - * The value is required to be either - * NULL or a pointer to a token wrapping the payload. The token carries - * a reference count, and when the reference count decrements to 0, - * the will be freed. Hence, it is essential that the payload be in - * memory allocated using malloc. - * - * There are several conditions under which this function will not - * actually put an event on the event queue and decrement the reference count - * of the token (if there is one), which could result in the payload being - * freed. In all cases, this function returns 0. Otherwise, - * it returns a handle to the scheduled trigger, which is an integer - * greater than 0. - * - * The first condition is that a stop has been requested and the trigger - * offset plus the extra delay is greater than zero. - * The second condition is that the trigger offset plus the extra delay - * is greater that the requested stop time (timeout). - * A third condition is that the trigger argument is null. - * Also, an event might not be scheduled if the trigger is an action - * with a `min_spacing` parameter. See the documentation. - * - * @param env Environment in which we are executing. - * @param trigger The trigger to be invoked at a later logical time. - * @param extra_delay The logical time delay, which gets added to the - * trigger's minimum delay, if it has one. If this number is negative, - * then zero is used instead. - * @param token The token wrapping the payload or NULL for no payload. - * @return A handle to the event, or 0 if no new event was scheduled, or -1 for error. - */ -trigger_handle_t _lf_schedule(environment_t *env, trigger_t* trigger, interval_t extra_delay, lf_token_t* token) { - assert(env != GLOBAL_ENVIRONMENT); - if (_lf_is_tag_after_stop_tag(env, env->current_tag)) { - // If schedule is called after stop_tag - // This is a critical condition. - _lf_done_using(token); - lf_print_warning("lf_schedule() called after stop tag."); - return 0; - } - - if (extra_delay < 0LL) { - lf_print_warning("schedule called with a negative extra_delay " PRINTF_TIME ". Replacing with zero.", extra_delay); - extra_delay = 0LL; - } - - LF_PRINT_DEBUG("_lf_schedule: scheduling trigger %p with delay " PRINTF_TIME " and token %p.", - trigger, extra_delay, token); - - // Increment the reference count of the token. - if (token != NULL) { - token->ref_count++; - LF_PRINT_DEBUG("_lf_schedule: Incremented ref_count of %p to %zu.", - token, token->ref_count); - } - - // The trigger argument could be null, meaning that nothing is triggered. - // Doing this after incrementing the reference count ensures that the - // payload will be freed, if there is one. - if (trigger == NULL) { - _lf_done_using(token); - return 0; - } - - // Compute the tag (the logical timestamp for the future event). - // We first do this assuming it is logical action and then, if it is a - // physical action, modify it if physical time exceeds the result. - interval_t delay = extra_delay; - // Add the offset if this is not a timer because, in that case, - // it is the minimum delay. - if (!trigger->is_timer) { - delay += trigger->offset; - } - tag_t intended_tag = (tag_t){.time = env->current_tag.time + delay, .microstep = 0}; - - LF_PRINT_DEBUG("_lf_schedule: env->current_tag.time = " PRINTF_TIME ". Total logical delay = " PRINTF_TIME "", - env->current_tag.time, delay); - interval_t min_spacing = trigger->period; - - event_t* e = _lf_get_new_event(env); - - // Initialize the next pointer. - e->next = NULL; - - // Set the payload. - e->token = token; - - // Make sure the event points to this trigger so when it is - // dequeued, it will trigger this trigger. - e->trigger = trigger; - - // If the trigger is physical, then we need to check whether - // physical time is larger than the intended time and, if so, - // modify the intended time. - if (trigger->is_physical) { - // Get the current physical time and assign it as the intended time. - intended_tag.time = lf_time_physical() + delay; - } else { - // FIXME: We need to verify that we are executing within a reaction? - // See reactor_threaded. - // If a logical action is scheduled asynchronously (which should never be - // done) the computed tag can be smaller than the current tag, in which case - // it needs to be adjusted. - // FIXME: This can go away once: - // - we have eliminated the possibility to have a negative additional delay; and - // - we detect the asynchronous use of logical actions - #ifndef NDEBUG - if (intended_tag.time < env->current_tag.time) { - lf_print_warning("Attempting to schedule an event earlier than current time by " PRINTF_TIME " nsec! " - "Revising to the current time " PRINTF_TIME ".", - env->current_tag.time - intended_tag.time, env->current_tag.time); - intended_tag.time = env->current_tag.time; - } - #endif - } - -#ifdef FEDERATED_DECENTRALIZED - // Event inherits the original intended_tag of the trigger - // set by the network stack (or the default, which is (NEVER,0)) - e->intended_tag = trigger->intended_tag; -#endif - - // Check for conflicts (a queued event with the same trigger and time). - if (min_spacing <= 0) { - // No minimum spacing defined. - e->time = intended_tag.time; - event_t* found = (event_t *)pqueue_find_equal_same_priority(env->event_q, e); - // Check for conflicts. Let events pile up in super dense time. - if (found != NULL) { - intended_tag.microstep++; - // Skip to the last node in the linked list. - while(found->next != NULL) { - found = found->next; - intended_tag.microstep++; - } - if (_lf_is_tag_after_stop_tag(env, intended_tag)) { - LF_PRINT_DEBUG("Attempt to schedule an event after stop_tag was rejected."); - // Scheduling an event will incur a microstep - // after the stop tag. - _lf_recycle_event(env, e); - return 0; - } - // Hook the event into the list. - found->next = e; - trigger->last_tag = intended_tag; - return(0); // FIXME: return value - } - // If there are not conflicts, schedule as usual. If intended time is - // equal to the current logical time, the event will effectively be - // scheduled at the next microstep. - } else if (!trigger->is_timer && trigger->last_tag.time != NEVER) { - // There is a min_spacing and there exists a previously - // scheduled event. It determines the - // earliest time at which the new event can be scheduled. - // Check to see whether the event is too early. - instant_t earliest_time = trigger->last_tag.time + min_spacing; - LF_PRINT_DEBUG("There is a previously scheduled event; earliest possible time " - "with min spacing: " PRINTF_TIME, - earliest_time); - // If the event is early, see which policy applies. - if (earliest_time > intended_tag.time) { - LF_PRINT_DEBUG("Event is early."); - switch(trigger->policy) { - case drop: - LF_PRINT_DEBUG("Policy is drop. Dropping the event."); - // Recycle the new event and decrement the - // reference count of the token. - _lf_done_using(token); - _lf_recycle_event(env, e); - return(0); - case replace: - LF_PRINT_DEBUG("Policy is replace. Replacing the previous event."); - // If the event with the previous time is still on the event - // queue, then replace the token. To find this event, we have - // to construct a dummy event_t struct. - event_t* dummy = _lf_get_new_event(env); - dummy->next = NULL; - dummy->trigger = trigger; - dummy->time = trigger->last_tag.time; - event_t* found = (event_t *)pqueue_find_equal_same_priority(env->event_q, dummy); - - if (found != NULL) { - // Recycle the existing token and the new event - // and update the token of the existing event. - _lf_replace_token(found, token); - _lf_recycle_event(env, e); - _lf_recycle_event(env, dummy); - // Leave the last_tag the same. - return(0); - } - _lf_recycle_event(env, dummy); - - // If the preceding event _has_ been handled, then adjust - // the tag to defer the event. - intended_tag = (tag_t){.time = earliest_time, .microstep = 0}; - break; - default: - // Default policy is defer - intended_tag = (tag_t){.time = earliest_time, .microstep = 0}; - break; - } - } - } - - // Check if the intended time is in the future - // This is a sanity check for the logic above - // FIXME: This is a development assertion and might - // not be necessary for end-user LF programs - #ifndef NDEBUG - if (intended_tag.time < env->current_tag.time) { - lf_print_error("Attempting to schedule an event earlier than current time by " PRINTF_TIME " nsec! " - "Revising to the current time " PRINTF_TIME ".", - env->current_tag.time - intended_tag.time, env->current_tag.time); - intended_tag.time = env->current_tag.time; - } - #endif - - // Set the tag of the event. - e->time = intended_tag.time; - - // Do not schedule events if if the event time is past the stop time - // (current microsteps are checked earlier). - LF_PRINT_DEBUG("Comparing event with elapsed time " PRINTF_TIME " against stop time " PRINTF_TIME ".", e->time - start_time, env->stop_tag.time - start_time); - if (e->time > env->stop_tag.time) { - LF_PRINT_DEBUG("_lf_schedule: event time is past the timeout. Discarding event."); - _lf_done_using(token); - _lf_recycle_event(env, e); - return(0); - } - - // Store the time in order to check the min spacing - // between this and any following event. - trigger->last_tag = intended_tag; - - // Queue the event. - // NOTE: There is no need for an explicit microstep because - // when this is called, all events at the current tag - // (time and microstep) have been pulled from the queue, - // and any new events added at this tag will go into the reaction_q - // rather than the event_q, so anything put in the event_q with this - // same time will automatically be executed at the next microstep. - LF_PRINT_LOG("Inserting event in the event queue with elapsed time " PRINTF_TIME ".", - e->time - start_time); - pqueue_insert(env->event_q, e); - - tracepoint_schedule(env->trace, trigger, e->time - env->current_tag.time); - - // FIXME: make a record of handle and implement unschedule. - // NOTE: Rather than wrapping around to get a negative number, - // we reset the handle on the assumption that much earlier - // handles are irrelevant. - trigger_handle_t return_value = env->_lf_handle++; - if (env->_lf_handle < 0) { - env->_lf_handle = 1; - } - return return_value; -} - -/** - * Insert reactions triggered by trigger to the reaction queue... - * - * @param env Environment in which we are executing. - * @param trigger The trigger - * @param token The token wrapping the payload or NULL for no payload. - * @return 1 if successful, or 0 if no new reaction was scheduled because the function - * was called incorrectly. - */ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t* trigger, lf_token_t* token) { assert(env != GLOBAL_ENVIRONMENT); // The trigger argument could be null, meaning that nothing is triggered. @@ -1211,67 +760,6 @@ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t* return 1; } -/** - * Schedule the specified trigger at env->current_tag.time plus the offset of the - * specified trigger plus the delay. - * See reactor.h for documentation. - */ -trigger_handle_t _lf_schedule_token(lf_action_base_t* action, interval_t extra_delay, lf_token_t* token) { - environment_t* env = action->parent->environment; - - LF_CRITICAL_SECTION_ENTER(env); - int return_value = _lf_schedule(env, action->trigger, extra_delay, token); - // Notify the main thread in case it is waiting for physical time to elapse. - lf_notify_of_event(env); - LF_CRITICAL_SECTION_EXIT(env); - return return_value; -} - -/** - * Schedule an action to occur with the specified value and time offset - * with a copy of the specified value. - * See reactor.h for documentation. - */ -trigger_handle_t _lf_schedule_copy(lf_action_base_t* action, interval_t offset, void* value, size_t length) { - if (value == NULL) { - return _lf_schedule_token(action, offset, NULL); - } - environment_t* env = action->parent->environment; - token_template_t* template = (token_template_t*)action; - if (action == NULL || template->type.element_size <= 0) { - lf_print_error("schedule: Invalid element size."); - return -1; - } - LF_CRITICAL_SECTION_ENTER(env); - // Initialize token with an array size of length and a reference count of 0. - lf_token_t* token = _lf_initialize_token(template, length); - // Copy the value into the newly allocated memory. - memcpy(token->value, value, template->type.element_size * length); - // The schedule function will increment the reference count. - trigger_handle_t result = _lf_schedule(env, action->trigger, offset, token); - // Notify the main thread in case it is waiting for physical time to elapse. - lf_notify_of_event(env); - LF_CRITICAL_SECTION_EXIT(env); - return result; -} - - -/** - * Variant of schedule_token that creates a token to carry the specified value. - * See reactor.h for documentation. - */ -trigger_handle_t _lf_schedule_value(lf_action_base_t* action, interval_t extra_delay, void* value, size_t length) { - token_template_t* template = (token_template_t*)action; - environment_t* env = action->parent->environment; - LF_CRITICAL_SECTION_ENTER(env); - lf_token_t* token = _lf_initialize_token_with_value(template, value, length); - int return_value = _lf_schedule(env, action->trigger, extra_delay, token); - // Notify the main thread in case it is waiting for physical time to elapse. - lf_notify_of_event(env); - LF_CRITICAL_SECTION_EXIT(env); - return return_value; -} - void _lf_advance_logical_time(environment_t *env, instant_t next_time) { assert(env != GLOBAL_ENVIRONMENT); @@ -1305,26 +793,6 @@ void _lf_advance_logical_time(environment_t *env, instant_t next_time) { next_time - start_time, env->current_tag.microstep, lf_time_physical_elapsed()); } -/** - * Variant of schedule_value when the value is an integer. - * See reactor.h for documentation. - * @param action Pointer to an action on the self struct. - */ -trigger_handle_t _lf_schedule_int(lf_action_base_t* action, interval_t extra_delay, int value) { - token_template_t* template = (token_template_t*)action; - - // NOTE: This doesn't acquire the mutex lock in the multithreaded version - // until schedule_value is called. This should be OK because the element_size - // does not change dynamically. - if (template->type.element_size != sizeof(int)) { - lf_print_error("Action type is not an integer. element_size is %zu", template->type.element_size); - return -1; - } - int* container = (int*)malloc(sizeof(int)); - *container = value; - return _lf_schedule_value(action, extra_delay, container, 1); -} - /** * Invoke the given reaction @@ -1703,10 +1171,6 @@ int process_args(int argc, const char* argv[]) { return 1; } -/** - * Initialize global variables and start tracing before calling the - * `_lf_initialize_trigger_objects` function - */ void initialize_global(void) { #if !defined NDEBUG _lf_count_payload_allocations = 0; @@ -1750,7 +1214,7 @@ void termination(void) { int num_envs = _lf_get_environments(&env); // Invoke the code generated termination function. It terminates the federated related services. // It should only be called for the top-level environment, which, by convention, is the first environment. - terminate_execution(env); + lf_terminate_execution(env); // In order to free tokens, we perform the same actions we would have for a new time step. for (int i = 0; i < num_envs; i++) { @@ -1823,7 +1287,7 @@ void termination(void) { } } #endif - _lf_free_all_reactors(); + lf_free_all_reactors(); // Free up memory associated with environment. // Do this last so that printed warnings don't access freed memory. diff --git a/core/tag.c b/core/tag.c index 1b7b9feda..e236dd766 100644 --- a/core/tag.c +++ b/core/tag.c @@ -38,12 +38,6 @@ typedef enum _lf_time_type { // Global variables declared in tag.h: instant_t start_time = NEVER; -//////////////// Global variables not declared in tag.h (must be declared extern if used elsewhere): - - -//////////////// Functions not declared in tag.h (local use only) - - //////////////// Functions declared in tag.h tag_t lf_tag(void *env) { diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index ef507a565..5d6e89c2a 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -1,34 +1,11 @@ -/* Runtime infrastructure for the threaded version of the C target of Lingua Franca. */ - -/************* -Copyright (c) 2019, The University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - -/** Runtime infrastructure for the threaded version of the C target of Lingua Franca. - * - * @author{Edward A. Lee } - * @author{Marten Lohstroh } - * @author{Soroush Bateni } +/** + * @file + * @author Edward A. Lee (eal@berkeley.edu) + * @author{Marten Lohstroh } + * @author{Soroush Bateni } + * @copyright (c) 2020-2024, The University of California at Berkeley. + * License: BSD 2-clause + * @brief Runtime infrastructure for the threaded version of the C target of Lingua Franca. */ #if !defined LF_SINGLE_THREADED #ifndef NUMBER_OF_WORKERS @@ -42,13 +19,14 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "lf_types.h" #include "platform.h" -#include "reactor_common.h" #include "reactor_threaded.h" #include "reactor.h" #include "scheduler.h" #include "tag.h" #include "environment.h" #include "rti_local.h" +#include "reactor_common.h" +#include "watchdog.h" #ifdef FEDERATED #include "federate.h" @@ -69,14 +47,13 @@ extern instant_t start_time; */ lf_mutex_t global_mutex; - void _lf_increment_tag_barrier_locked(environment_t *env, tag_t future_tag) { assert(env != GLOBAL_ENVIRONMENT); // Check if future_tag is after stop tag. // This will only occur when a federate receives a timed message with // a tag that is after the stop tag - if (_lf_is_tag_after_stop_tag(env, future_tag)) { + if (lf_is_tag_after_stop_tag(env, future_tag)) { lf_print_warning("Attempting to raise a barrier after the stop tag."); future_tag = env->stop_tag; } @@ -170,7 +147,7 @@ int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag) { if (env->barrier.requestors == 0) return 0; // Do not wait for tags after the stop tag - if (_lf_is_tag_after_stop_tag(env, proposed_tag)) { + if (lf_is_tag_after_stop_tag(env, proposed_tag)) { proposed_tag = env->stop_tag; } // Do not wait forever @@ -190,7 +167,7 @@ int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag) { lf_cond_wait(&env->global_tag_barrier_requestors_reached_zero); // The stop tag may have changed during the wait. - if (_lf_is_tag_after_stop_tag(env, proposed_tag)) { + if (lf_is_tag_after_stop_tag(env, proposed_tag)) { proposed_tag = env->stop_tag; } } @@ -255,12 +232,12 @@ bool wait_until(environment_t* env, instant_t logical_time, lf_cond_t* condition #ifdef FEDERATED_DECENTRALIZED // Only apply the STA if coordination is decentralized // Apply the STA to the logical time // Prevent an overflow - if (start_time != logical_time && wait_until_time < FOREVER - _lf_fed_STA_offset) { + if (start_time != logical_time && wait_until_time < FOREVER - lf_fed_STA_offset) { // If wait_time is not forever LF_PRINT_DEBUG("Adding STA " PRINTF_TIME " to wait until time " PRINTF_TIME ".", - _lf_fed_STA_offset, + lf_fed_STA_offset, wait_until_time - start_time); - wait_until_time += _lf_fed_STA_offset; + wait_until_time += lf_fed_STA_offset; } #endif if (!fast) { @@ -329,7 +306,7 @@ tag_t get_next_event_tag(environment_t *env) { // If a timeout tag was given, adjust the next_tag from the // event tag to that timeout tag. - if (_lf_is_tag_after_stop_tag(env, next_tag)) { + if (lf_is_tag_after_stop_tag(env, next_tag)) { next_tag = env->stop_tag; } LF_PRINT_LOG("Earliest event on the event queue (or stop time if empty) is " PRINTF_TAG ". Event queue has size %zu.", @@ -409,7 +386,7 @@ void _lf_next_locked(environment_t *env) { // to advance to FOREVER. I.e. all upstream enclaves have terminated and sent // an LTC for FOREVER. We can, in this case, terminate the current enclave. if(!keepalive_specified && lf_tag_compare(next_tag, FOREVER_TAG) == 0) { - _lf_set_stop_tag(env, (tag_t){.time=env->current_tag.time,.microstep=env->current_tag.microstep+1}); + lf_set_stop_tag(env, (tag_t){.time=env->current_tag.time,.microstep=env->current_tag.microstep+1}); next_tag = get_next_event_tag(env); } #elif defined FEDERATED_CENTRALIZED @@ -444,7 +421,7 @@ void _lf_next_locked(environment_t *env) { // keepalive is not set so we should stop. // Note that federated programs with decentralized coordination always have // keepalive = true - _lf_set_stop_tag(env, (tag_t){.time=env->current_tag.time,.microstep=env->current_tag.microstep+1}); + lf_set_stop_tag(env, (tag_t){.time=env->current_tag.time,.microstep=env->current_tag.microstep+1}); // Stop tag has changed. Need to check next_tag again. next_tag = get_next_event_tag(env); @@ -462,7 +439,7 @@ void _lf_next_locked(environment_t *env) { next_tag = get_next_event_tag(env); // If this (possibly new) next tag is past the stop time, return. - if (_lf_is_tag_after_stop_tag(env, next_tag)) { + if (lf_is_tag_after_stop_tag(env, next_tag)) { return; } } @@ -471,7 +448,7 @@ void _lf_next_locked(environment_t *env) { next_tag = get_next_event_tag(env); // If this (possibly new) next tag is past the stop time, return. - if (_lf_is_tag_after_stop_tag(env, next_tag)) { // lf_tag_compare(tag, stop_tag) > 0 + if (lf_is_tag_after_stop_tag(env, next_tag)) { // lf_tag_compare(tag, stop_tag) > 0 return; } @@ -578,7 +555,7 @@ void lf_request_stop(void) { // Iterate over environments to set their stop tag and release their barrier. for (int i = 0; i < num_environments; i++) { LF_MUTEX_LOCK(&env[i].mutex); - _lf_set_stop_tag(&env[i], (tag_t) {.time = max_current_tag.time, .microstep = max_current_tag.microstep+1}); + lf_set_stop_tag(&env[i], (tag_t) {.time = max_current_tag.time, .microstep = max_current_tag.microstep+1}); // Release the barrier on tag advancement. _lf_decrement_tag_barrier_locked(&env[i]); @@ -591,16 +568,6 @@ void lf_request_stop(void) { #endif } -/** - * Trigger 'reaction'. - * - * @param env Environment within which we are executing. - * @param reaction The reaction. - * @param worker_number The ID of the worker that is making this call. 0 should be - * used if there is only one worker (e.g., when the program is using the - * single-threaded C runtime). -1 is used for an anonymous call in a context where a - * worker number does not make sense (e.g., the caller is not a worker thread). - */ void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number) { assert(env != GLOBAL_ENVIRONMENT); @@ -654,10 +621,20 @@ void _lf_initialize_start_tag(environment_t *env) { _lf_initialize_timers(env); + env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; + +#if defined FEDERATED_DECENTRALIZED // If we have a non-zero STA offset, then we need to allow messages to arrive // prior to the start time. To avoid spurious STP violations, we temporarily // set the current time back by the STA offset. - env->current_tag = (tag_t){.time = start_time - _lf_fed_STA_offset, .microstep = 0u}; + env->current_tag.time -= lf_fed_STA_offset; + LF_PRINT_LOG("Waiting for start time " PRINTF_TIME " plus STA " PRINTF_TIME ".", + start_time, lf_fed_STA_offset); +#else + instant_t lf_fed_STA_offset = 0; + LF_PRINT_LOG("Waiting for start time " PRINTF_TIME ".", + start_time); +#endif // Call wait_until if federated. This is required because the startup procedure // in lf_synchronize_with_other_federates() can decide on a new start_time that is @@ -672,13 +649,12 @@ void _lf_initialize_start_tag(environment_t *env) { // from other federates) to hold the lock and possibly raise a tag barrier. This is // especially useful if an STA is set properly because the federate will get // a chance to process incoming messages while utilizing the STA. - LF_PRINT_LOG("Waiting for start time " PRINTF_TIME " plus STA " PRINTF_TIME ".", - start_time, _lf_fed_STA_offset); + // Here we wait until the start time and also release the environment mutex. // this means that the other worker threads will be allowed to start. We need // this to avoid potential deadlock in federated startup. - while(!wait_until(env, start_time + _lf_fed_STA_offset, &env->event_q_changed)) {}; - LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + _lf_fed_STA_offset); + while(!wait_until(env, start_time + lf_fed_STA_offset, &env->event_q_changed)) {}; + LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + lf_fed_STA_offset); LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME ". This should be close to the STA offset.", lf_time_physical() - start_time); @@ -1006,11 +982,7 @@ void* worker(void* arg) { return NULL; } -/** - * If DEBUG logging is enabled, prints the status of the event queue, - * the reaction queue, and the executing queue. - * @param env Environment within which we are executing. - */ +#ifndef NDEBUG void lf_print_snapshot(environment_t* env) { assert(env != GLOBAL_ENVIRONMENT); @@ -1025,6 +997,11 @@ void lf_print_snapshot(environment_t* env) { LF_PRINT_DEBUG(">>> END Snapshot"); } } +#else // NDEBUG +void lf_print_snapshot(environment_t* env) { + // Do nothing. +} +#endif // NDEBUG // Start threads in the thread pool. void start_threads(environment_t* env) { @@ -1077,7 +1054,7 @@ void determine_number_of_workers(void) { */ int lf_reactor_c_main(int argc, const char* argv[]) { // Invoke the function that optionally provides default command-line options. - _lf_set_default_command_line_options(); + lf_set_default_command_line_options(); // Parse command line arguments. Sets global variables like duration, fast, number_of_workers. if (!(process_args(default_argc, default_argv) @@ -1117,7 +1094,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { ctime(&physical_time_timespec.tv_sec), physical_time_timespec.tv_nsec); // Create and initialize the environments for each enclave - _lf_create_environments(); + lf_create_environments(); // Initialize the one global mutex LF_MUTEX_INIT(&global_mutex); diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 38d96abbd..b66d88478 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -1,42 +1,17 @@ -/* Global Earliest Deadline First (GEDF) non-preemptive scheduler for the -threaded runtime of the C target of Lingua Franca. */ - -/************* -Copyright (c) 2022, The University of Texas at Dallas. -Copyright (c) 2022, The University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR -ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON -ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - /** - * Global Earliest Deadline First (GEDF) non-preemptive scheduler for the - * threaded runtime of the C target of Lingua Franca. - * + * @file * @author{Soroush Bateni } * @author{Edward A. Lee } * @author{Marten Lohstroh } + * @copyright (c) 2020-2024, The University of California at Berkeley. + * License: BSD 2-clause + * @brief Global Earliest Deadline First (GEDF) non-preemptive scheduler for the + * threaded runtime of the C target of Lingua Franca. */ #include "lf_types.h" + #if SCHEDULER == SCHED_GEDF_NP + #ifndef NUMBER_OF_WORKERS #define NUMBER_OF_WORKERS 1 #endif // NUMBER_OF_WORKERS @@ -362,4 +337,4 @@ void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reacti reaction->name, LF_LEVEL(reaction->index)); _lf_sched_insert_reaction(scheduler, reaction); } -#endif +#endif // SCHEDULER == SCHED_GEDF_NP diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index b6c6b4051..fee5775f4 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -1,43 +1,16 @@ -#if !defined(LF_SINGLE_THREADED) -/* Non-preemptive scheduler for the threaded runtime of the C target of Lingua -Franca. */ - -/************* -Copyright (c) 2022, The University of Texas at Dallas. Copyright (c) 2022, The -University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR -ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON -ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - /** - * Non-preemptive scheduler for the threaded runtime of the C target of Lingua - * Franca. - * + * @file * @author{Soroush Bateni } * @author{Edward A. Lee } * @author{Marten Lohstroh } + * @copyright (c) 2020-2024, The University of California at Berkeley. + * License: BSD 2-clause + * @brief Non-preemptive scheduler for the threaded runtime of the C target of Lingua Franca. */ #include "lf_types.h" + #if SCHEDULER == SCHED_NP || !defined(SCHEDULER) + #ifndef NUMBER_OF_WORKERS #define NUMBER_OF_WORKERS 1 #endif // NUMBER_OF_WORKERS @@ -442,5 +415,4 @@ void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reacti reaction->name, LF_LEVEL(reaction->index)); _lf_sched_insert_reaction(scheduler, reaction); } -#endif -#endif +#endif // SCHEDULER == SCHED_NP || !defined(SCHEDULER) diff --git a/core/threaded/scheduler_adaptive.c b/core/threaded/scheduler_adaptive.c index fba1eae20..35921d642 100644 --- a/core/threaded/scheduler_adaptive.c +++ b/core/threaded/scheduler_adaptive.c @@ -1,34 +1,14 @@ -/************* -Copyright (c) 2022, The University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR -ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON -ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - /** - * This is a non-priority-driven scheduler. See scheduler.h for documentation. + * @file * @author{Peter Donovan } + * @copyright (c) 2020-2024, The University of California at Berkeley. + * License: BSD 2-clause + * @brief This is a non-priority-driven scheduler. See scheduler.h for documentation. */ #include "lf_types.h" + #if defined SCHEDULER && SCHEDULER == SCHED_ADAPTIVE + #ifndef NUMBER_OF_WORKERS #define NUMBER_OF_WORKERS 1 #endif // NUMBER_OF_WORKERS diff --git a/core/threaded/scheduler_instance.c b/core/threaded/scheduler_instance.c index c09d95d7a..a8aac7ff6 100644 --- a/core/threaded/scheduler_instance.c +++ b/core/threaded/scheduler_instance.c @@ -1,6 +1,7 @@ #include #include "scheduler_instance.h" -#include "reactor_common.h" +#include "environment.h" +#include "reactor.h" #include "lf_types.h" #include "util.h" diff --git a/include/api/reaction_macros_undef.h b/include/api/reaction_macros_undef.h index 0df89046f..833d91edc 100644 --- a/include/api/reaction_macros_undef.h +++ b/include/api/reaction_macros_undef.h @@ -25,4 +25,5 @@ #undef lf_tag #undef lf_time_logical #undef lf_time_logical_elapsed + #endif // REACTION_MACROS_H diff --git a/include/api/schedule.h b/include/api/schedule.h index cafbf8143..d8d0c7ec5 100644 --- a/include/api/schedule.h +++ b/include/api/schedule.h @@ -2,47 +2,57 @@ * @file * @author Edward A. Lee * @author Hou Seng (Steven) Wong + * @author Soroush Bateni * @copyright (c) 2020-2024, The University of California at Berkeley. * License: BSD 2-clause * - * This file is a translation layer that implements Lingua Franca - * APIs which interact with the internal _lf_SET and _lf_schedule APIs. + * @brief API functions for scheduling actions. + * + * Most of these functions take a `void*` pointer to an action, which will be internally cast to + * a `lf_action_base_t*` pointer. The cast could be done by macros in reaction_macros.h, but unlike + * the macros defined there, it is common for `lf_schedule` functions to be invoked outside of reaction + * bodies. This means that users writing code in separate library files are responsible for ensuring that + * the `void*` pointer is indeed a valid `lf_action_base_t*` pointer before passing it to `lf_schedule`. + * The compiler will not check this. */ #ifndef API_H #define API_H -#include // Included for backwards compatibility so that users do not need to explicitly include this #include "lf_types.h" #include "tag.h" /** - * Define a macro to suppress warnings about unused variables. - * Apparently, it is sufficient to just cast to void. - */ -#define SUPPRESS_UNUSED_WARNING(x) (void)(x) - -////////////////////////////////////////////////////////////// -///////////// Schedule Functions - -/** - * Schedule an action to occur with the specified value and time offset - * with no payload (no value conveyed). + * @brief Schedule an action to occur with the specified time offset with no payload (no value conveyed). + * + * The later tag will depend on whether the action is logical or physical. If it is logical, + * the time of the event will be the current logical time of the environment associated with + * the action plus the minimum delay of the action plus the extra delay. If that time is equal + * to the current time, then the tag will be one microstep beyond the current tag. + * If the action is physical, the time will be the current physical time plus the extra delay, + * and the microstep will be zero. + * * See lf_schedule_token(), which this uses, for details. * - * @param action Pointer to an action on the self struct. - * @param offset The time offset over and above that in the action. + * @param action The action to be triggered (a pointer to an `lf_action_base_t`). + * @param offset The time offset over and above the minimum delay of the action. * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. */ trigger_handle_t lf_schedule(void* action, interval_t offset); /** - * Schedule the specified action with an integer value at a later logical - * time that depends on whether the action is logical or physical and - * what its parameter values are. This wraps a copy of the integer value - * in a token. See lf_schedule_token() for more details. + * @brief Schedule the specified action with an integer value at a later logical time. + * + * The later tag will depend on whether the action is logical or physical. If it is logical, + * the time of the event will be the current logical time of the environment associated with + * the action plus the minimum delay of the action plus the extra delay. If that time is equal + * to the current time, then the tag will be one microstep beyond the current tag. + * If the action is physical, the time will be the current physical time plus the extra delay, + * and the microstep will be zero. + * + * This wraps a copy of the integer value in a token. See lf_schedule_token() for more details. * - * @param action The action to be triggered. + * @param action The action to be triggered (a pointer to an `lf_action_base_t`). * @param extra_delay Extra offset of the event release above that in the action. * @param value The value to send. * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. @@ -50,22 +60,27 @@ trigger_handle_t lf_schedule(void* action, interval_t offset); trigger_handle_t lf_schedule_int(void* action, interval_t extra_delay, int value); /** - * Schedule the specified action with the specified token as a payload. - * This will trigger an event at a later logical time that depends - * on whether the action is logical or physical and what its parameter - * values are. - * - * logical action: A logical action has an offset (default is zero) - * and a minimum interarrival time (MIT), which also defaults to zero. - * The logical time at which this scheduled event will trigger is - * the current time plus the offset plus the delay argument given to + * @brief Schedule the specified action at a later tag with the specified token as a payload. + * + * The later tag will depend on whether the action is logical or physical. If it is logical, + * the time of the event will be the current logical time of the environment associated with + * the action plus the minimum delay of the action plus the extra delay. If that time is equal + * to the current time, then the tag will be one microstep beyond the current tag. + * If the action is physical, the time will be the current physical time plus the extra delay, + * and the microstep will be zero. + * + * For a logical action: + * + * A logical action has a minimum delay (default is zero) and a minimum spacing, which also + * defaults to zero. The logical time at which this scheduled event will trigger is the current time + * of the environment associated with the action plus the offset plus the delay argument given to * this function. If, however, that time is not greater than a prior - * triggering of this logical action by at least the MIT, then the + * triggering of this logical action by at least the minimum spacing, then the * one of two things can happen depending on the policy specified * for the action. If the action's policy is DROP (default), then the * action is simply dropped and the memory pointed to by value argument * is freed. If the policy is DEFER, then the time will be increased - * to equal the time of the most recent triggering plus the MIT. + * to equal the time of the most recent triggering plus the minimum spacing. * * For the above, "current time" means the logical time of the * reaction that is calling this function. Logical actions should @@ -91,7 +106,7 @@ trigger_handle_t lf_schedule_int(void* action, interval_t extra_delay, int value * properties or on the command line. * The third condition is that the trigger argument is null. * - * @param action The action to be triggered. + * @param action The action to be triggered (a pointer to an `lf_action_base_t`). * @param extra_delay Extra offset of the event release above that in the action. * @param token The token to carry the payload or null for no payload. * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. @@ -99,31 +114,42 @@ trigger_handle_t lf_schedule_int(void* action, interval_t extra_delay, int value trigger_handle_t lf_schedule_token(void* action, interval_t extra_delay, lf_token_t* token); /** - * Schedule an action to occur with the specified value and time offset with a - * copy of the specified value. If the value is non-null, then it will be copied + * @brief Schedule an action to occur with the specified value and time offset with a + * copy of the specified value. + * + * If the value is non-null, then it will be copied * into newly allocated memory under the assumption that its size is given in * the trigger's token object's element_size field multiplied by the specified * length. * + * The later tag will depend on whether the action is logical or physical. If it is logical, + * the time of the event will be the current logical time of the environment associated with + * the action plus the minimum delay of the action plus the extra delay. If that time is equal + * to the current time, then the tag will be one microstep beyond the current tag. + * If the action is physical, the time will be the current physical time plus the extra delay, + * and the microstep will be zero. + * * See lf_schedule_token(), which this uses, for details. * - * @param action Pointer to an action on a self struct. + * @param action The action to be triggered (a pointer to an `lf_action_base_t`). * @param offset The time offset over and above that in the action. * @param value A pointer to the value to copy. * @param length The length, if an array, 1 if a scalar, and 0 if value is NULL. * @return A handle to the event, or 0 if no event was scheduled, or -1 for * error. */ -trigger_handle_t lf_schedule_copy(void* action, interval_t offset, void* value, int length); +trigger_handle_t lf_schedule_copy( + void* action, interval_t offset, void* value, size_t length); /** - * Variant of lf_schedule_token that creates a token to carry the specified value. + * @brief Variant of lf_schedule_token that creates a token to carry the specified value. + * * The value is required to be malloc'd memory with a size equal to the * element_size of the specified action times the length parameter. * * See lf_schedule_token(), which this uses, for details. * - * @param action The action to be triggered. + * @param action The action to be triggered (a pointer to an `lf_action_base_t`). * @param extra_delay Extra offset of the event release above that in the * action. * @param value Dynamically allocated memory containing the value to send. @@ -135,10 +161,53 @@ trigger_handle_t lf_schedule_copy(void* action, interval_t offset, void* value, trigger_handle_t lf_schedule_value(void* action, interval_t extra_delay, void* value, int length); /** - * Check the deadline of the currently executing reaction against the - * current physical time. If the deadline has passed, invoke the deadline + * @brief Schedule the specified trigger to execute in the specified environment with given delay and token. + * + * This is the most flexible version of the schedule functions and is used in the implementation + * of many of the others. End users would rarely use it. + * + * This will schedule the specified trigger at env->current_tag.time plus the offset of the + * specified trigger plus the delay. The value is required to be either + * NULL or a pointer to a token wrapping the payload. The token carries + * a reference count, and when the reference count decrements to 0, + * the will be freed. Hence, it is essential that the payload be in + * memory allocated using malloc. + * + * There are several conditions under which this function will not + * actually put an event on the event queue and decrement the reference count + * of the token (if there is one), which could result in the payload being + * freed. In all cases, this function returns 0. Otherwise, + * it returns a handle to the scheduled trigger, which is an integer + * greater than 0. + * + * The first condition is that a stop has been requested and the trigger + * offset plus the extra delay is greater than zero. + * The second condition is that the trigger offset plus the extra delay + * is greater that the requested stop time (timeout). + * A third condition is that the trigger argument is null. + * Also, an event might not be scheduled if the trigger is an action + * with a `min_spacing` parameter. See the documentation. + + * @param env The environment in which to schedule the event. + * @param trigger The action or timer to be triggered. + * @param delay Offset of the event release. + * @param token The token payload. + * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. + */ +trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, interval_t delay, lf_token_t* token); + +/** + * @brief Check the deadline of the currently executing reaction against the + * current physical time. + * + * If the deadline has passed, invoke the deadline * handler (if invoke_deadline_handler parameter is set true) and return true. * Otherwise, return false. + * + * This function is intended to be used within a reaction that has been invoked without a deadline + * violation, but that wishes to check whether the deadline gets violated _during_ the execution of + * the reaction. This can be used, for example, to implement a timeout mechanism that bounds the + * execution time of a reaction, for example to realize an "anytime" computation. * * @param self The self struct of the reactor. * @param invoke_deadline_handler When this is set true, also invoke deadline diff --git a/include/core/lf_types.h b/include/core/lf_types.h index eb626658e..023172545 100644 --- a/include/core/lf_types.h +++ b/include/core/lf_types.h @@ -4,33 +4,9 @@ * @author Marten Lohstroh (marten@berkeley.edu) * @author Chris Gill (cdgill@wustl.edu) * @author Mehrdad Niknami (mniknami@berkeley.edu) - * - * @section LICENSE - * Copyright (c) 2019, The University of California at Berkeley. - * - * Redistribution and use in source and binary forms, with or without modification, - * are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, - * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF - * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * @section DESCRIPTION - * - * Type definitions that are widely used across different parts of the runtime. + * @copyright (c) 2020-2024, The University of California at Berkeley. + * License: BSD 2-clause + * @brief Type definitions that are widely used across different parts of the runtime. * * IMPORTANT: Many of the structs defined here require matching layouts * and, if changed, will require changes in the code generator. @@ -284,16 +260,19 @@ typedef struct allocation_record_t { struct allocation_record_t *next; } allocation_record_t; - typedef struct environment_t environment_t; + /** + * @brief The base type for all reactor self structs. + * * The first element of every self struct defined in generated code * will be a pointer to an allocation record, which is either NULL * or the head of a NULL-terminated linked list of allocation records. - * Casting the self struct to this type enables access to this list - * by the function {@link _lf_free_reactor(self_base_t*)}. To allocate memory - * for the reactor that will be freed by that function, allocate the - * memory using {@link _lf_allocate(size_t,size_t,self_base_t*)}. + * This list is used to free memory that has been dynamically allocated. + * This struct also provides a pointer to the currently executing reaction, + * to the environment in which the reaction is executing, and to the mutex + * that is used to protect the reactor. If modal models are being used, + * it also records the current mode. */ typedef struct self_base_t { struct allocation_record_t *allocations; diff --git a/include/core/platform.h b/include/core/platform.h index d28a90832..5c05ed1f2 100644 --- a/include/core/platform.h +++ b/include/core/platform.h @@ -1,36 +1,13 @@ -/* Platform API support for the C target of Lingua Franca. */ - -/************* -Copyright (c) 2021, The University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - /** - * Platform API support for the C target of Lingua Franca. + * @file + * @author{Soroush Bateni } + * @brief Platform API support for the C target of Lingua Franca. + * @copyright (c) 2020-2024, The University of California at Berkeley. + * License: BSD 2-clause + * * This file detects the platform on which the C compiler is being run * (e.g. Windows, Linux, Mac) and conditionally includes platform-specific * files that define core datatypes and function signatures for Lingua Franca. - * - * @author{Soroush Bateni } */ #ifndef PLATFORM_H diff --git a/include/core/reactor.h b/include/core/reactor.h index dead5d7a6..14e1bdc40 100644 --- a/include/core/reactor.h +++ b/include/core/reactor.h @@ -8,16 +8,11 @@ * License: BSD 2-clause * @brief Definitions for the C target of Lingua Franca shared by threaded and unthreaded versions. * - * This header file defines the functions and macros that programmers use - * in the body of reactions for reading and writing inputs and outputs and - * scheduling future events. The LF compiler does not parse that C code. - * This fact strongly affects the design. - * - * The intent of the C target for Lingua Franca not to provide a safe - * programming environment (The C++ and TypeScript targets are better - * choices for that), but rather to find the lowest possible overhead - * implementation of Lingua Franca. The API herein can easily be misused, - * leading to memory leaks, nondeterminism, or program crashes. + * This header file defines functions that programmers use in the body of reactions for reading and + * writing inputs and outputs and scheduling future events. Other functions that might be useful to + * application programmers are also defined here. + * + * Many of these functions have macro wrappers defined in reaction_macros.h. */ #ifndef REACTOR_H @@ -31,87 +26,59 @@ #include "trace.h" #include "util.h" -////////////////////// Constants ////////////////////// - /** - * @brief Macro giving the minimum amount of time to sleep to wait for physical time to reach a logical time. - * - * Unless the "fast" option is given, an LF program will wait until - * physical time matches logical time before handling an event with - * a given logical time. The amount of time is less than this given - * threshold, then no wait will occur. The purpose of this is - * to prevent unnecessary delays caused by simply setting up and - * performing the wait. + * @brief Macro to suppress warnings about unused variables. */ -#define MIN_SLEEP_DURATION USEC(10) +#define SUPPRESS_UNUSED_WARNING(x) (void)(x) -/// \cond INTERNAL // Doxygen conditional. +////////////////////// Function Declarations ////////////////////// /** - * @brief Mark the given port's is_present field as true. - * @param port A pointer to the port struct as an `lf_port_base_t*`. + * @brief Return true if the provided tag is after stop tag. + * @param env Environment in which we are executing. + * @param tag The tag to check against stop tag */ -void lf_set_present(lf_port_base_t* port); - -/** - * @brief Forward declaration for the executable preamble; - * @param env Environment in which to execute to preamble - * - */ -void _lf_executable_preamble(environment_t* env); - -/// \endcond // INTERNAL - -////////////////////// Macros for reading and writing ports ////////////////////// -// NOTE: Ports passed to these macros can be cast to: -// lf_port_base_t: which has the field bool is_present (and more); -// token_template_t: which has a lf_token_t* token field; or -// token_type_t: Which has element_size, destructor, and copy_constructor fields. +bool lf_is_tag_after_stop_tag(environment_t* env, tag_t tag); /** - * Macro for extracting the deadline from the index of a reaction. - * The reaction queue is sorted according to this index, and the - * use of the deadline here results in an earliest deadline first - * (EDF) scheduling poicy. + * @brief Mark the given port's is_present field as true. + * @param port A pointer to the port struct as an `lf_port_base_t*`. */ -#define DEADLINE(index) (index & 0x7FFFFFFFFFFF0000) +void lf_set_present(lf_port_base_t* port); /** - * Macro for determining whether two reactions are in the - * same chain (one depends on the other). This is conservative. - * If it returns false, then they are surely not in the same chain, - * but if it returns true, they may be in the same chain. - * This is in reactor_threaded.c to execute reactions in parallel - * on multiple cores even if their levels are different. + * @brief Set the stop tag if it is less than the stop tag of the specified environment. + * @note In threaded programs, the environment's mutex must be locked before calling this function. */ -#define OVERLAPPING(chain1, chain2) ((chain1 & chain2) != 0) +void lf_set_stop_tag(environment_t* env, tag_t tag); -// ======== Function Declarations ======== // +#ifdef FEDERATED_DECENTRALIZED /** - * Return the global STP offset on advancement of logical - * time for federated execution. + * @brief Return the global STP offset on advancement of logical time for federated execution. */ interval_t lf_get_stp_offset(void); /** - * Set the global STP offset on advancement of logical - * time for federated execution. - * - * @param offset A positive time value to be applied - * as the STP offset. + * @brief Set the global STP offset on advancement of logical time for federated execution. + * @param offset A positive time value to be applied as the STP offset. */ void lf_set_stp_offset(interval_t offset); +#endif // FEDERATED_DECENTRALIZED + /** - * Print a snapshot of the priority queues used during execution - * (for debugging). + * @brief Print a snapshot of the priority queues used during execution (for debugging). + * + * This function implementation will be empty if the NDEBUG macro is defined; that macro + * is normally defined for release builds. * @param env The environment in which we are executing. */ void lf_print_snapshot(environment_t* env); /** - * Request a stop to execution as soon as possible. + * @brief Request a stop to execution as soon as possible. + * * In a non-federated execution with only a single enclave, this will occur * one microstep later than the current tag. In a federated execution or when * there is more than one enclave, it will likely occur at a later tag determined @@ -120,245 +87,46 @@ void lf_print_snapshot(environment_t* env); void lf_request_stop(void); /** - * Allocate zeroed-out memory and record the allocated memory on - * the specified list so that it will be freed when calling - * {@link _lf_free(allocation_record_t**)}. + * @brief Allocate memory and record on the specified allocation record (a self struct). + * + * This will allocate memory using calloc (so the allocated memory is zeroed out) + * and record the allocated memory on the specified self struct so that + * it will be freed when calling {@link free_reactor(self_base_t)}. + * * @param count The number of items of size 'size' to accomodate. * @param size The size of each item. * @param head Pointer to the head of a list on which to record - * the allocation, or NULL to not record it. + * the allocation, or NULL to not record it (an `allocation_record_t**`), + * @return A pointer to the allocated memory. */ -void* _lf_allocate( - size_t count, size_t size, struct allocation_record_t** head); +void* lf_allocate(size_t count, size_t size, struct allocation_record_t** head); /** - * Free memory allocated using - * {@link _lf_allocate(size_t, size_t, allocation_record_t**)} - * and mark the list empty by setting `*head` to NULL. - * @param head Pointer to the head of a list on which to record - * the allocation, or NULL to not record it. - */ -void _lf_free(struct allocation_record_t** head); - -/** - * Allocate memory for a new runtime instance of a reactor. + * @brief Allocate memory for a new runtime instance of a reactor. + * * This records the reactor on the list of reactors to be freed at * termination of the program. If you plan to free the reactor before * termination of the program, use - * {@link _lf_allocate(size_t, size_t, allocation_record_t**)} + * {@link lf_allocate(size_t, size_t, allocation_record_t**)} * with a null last argument instead. - * @param size The size of the self struct, obtained with sizeof(). - */ -void* _lf_new_reactor(size_t size); - -/** - * Free all the reactors that are allocated with - * {@link #_lf_new_reactor(size_t)}. - */ -void _lf_free_all_reactors(void); - -/** - * Free memory recorded on the allocations list of the specified reactor. - * @param self The self struct of the reactor. - */ -void _lf_free_reactor(self_base_t *self); - -/** - * Generated function that optionally sets default command-line options. - */ -void _lf_set_default_command_line_options(void); - -/** - * Generated function that resets outputs to be absent at the - * start of a new time step. - * @param env The environment in which we are executing - */ -void _lf_start_time_step(environment_t *env); - -/** - * Generated function that produces a table containing all triggers - * (i.e., inputs, timers, and actions). - */ -void _lf_initialize_trigger_objects(); - -/** - * Pop all events from event_q with timestamp equal to current_time, extract all - * the reactions triggered by these events, and stick them into the reaction - * queue. - * @param env The environment in which we are executing - */ -void _lf_pop_events(environment_t *env); - - - -/** - * Internal version of the lf_schedule() function, used by generated - * _lf_start_timers() function. - * @param env The environment in which we are executing - * @param trigger The action or timer to be triggered. - * @param delay Offset of the event release. - * @param token The token payload. - * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. - */ -trigger_handle_t _lf_schedule(environment_t* env, trigger_t* trigger, interval_t delay, lf_token_t* token); - - -/** - * Function to initialize mutexes for watchdogs - */ -void _lf_initialize_watchdogs(environment_t *env); - -/** Terminates all watchdogs inside the environment. */ -void _lf_watchdog_terminate_all(environment_t *env); - -/** - * @brief Get the array of ids of enclaves directly upstream of the specified enclave. - * This updates the specified result pointer to point to a statically allocated array of IDs - * and returns the length of the array. The implementation is code-generated. * - * @param enclave_id The enclave for which to report upstream IDs. - * @param result The pointer to dereference and update to point to the resulting array. - * @return The number of direct upstream enclaves. + * @param size The size of the self struct, obtained with sizeof(). */ -int _lf_get_upstream_of(int enclave_id, int** result); +self_base_t* lf_new_reactor(size_t size); /** - * @brief Get the array of ids of enclaves directly downstream of the specified enclave. - * This updates the specified result pointer to point to a statically allocated array of IDs - * and returns the length of the array. The implementation is code-generated. - * - * @param enclave_id The enclave for which to report downstream IDs. - * @param result The pointer to dereference and update to point to the resulting array. - * @return The number of direct downstream enclaves. + * @brief Free all the reactors that are allocated with {@link #lf_new_reactor(size_t)}. */ -int _lf_get_downstream_of(int enclave_id, int** result); +void lf_free_all_reactors(void); /** - * @brief Retrive the delays on the connections to direct upstream enclaves. - * This updates the result pointer to point to a statically allocated array of delays. - * The implementation is code-generated. + * @brief Free the specified reactor. * - * @param enclave_id The enclave for which to search for upstream delays. - * @param result The pointer to dereference and update to point to the resulting array. - * @return int The number of direct upstream enclaves. - */ -int _lf_get_upstream_delay_of(int enclave_id, interval_t** result); - -/** - * Function (to be code generated) to terminate execution. - * This will be invoked after all shutdown actions have completed. - * @param env The environment in which we are executing - */ -void terminate_execution(environment_t* env); - -void termination(); - -/** - * Schedule the specified action with an integer value at a later logical - * time that depends on whether the action is logical or physical and - * what its parameter values are. This wraps a copy of the integer value - * in a token. See schedule_token() for more details. - * @param action The action to be triggered. - * @param extra_delay Extra offset of the event release above that in the action. - * @param value The value to send. - * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. - */ -trigger_handle_t _lf_schedule_int(lf_action_base_t* action, interval_t extra_delay, int value); - -/** - * Create a dummy event to be used as a spacer in the event queue. - */ -event_t* _lf_create_dummy_event(trigger_t* trigger, instant_t time, event_t* next, unsigned int offset); - -/** - * Schedule the specified action with the specified token as a payload. - * This will trigger an event at a later logical time that depends - * on whether the action is logical or physical and what its parameter - * values are. - * - * logical action: A logical action has an offset (default is zero) - * and a minimum interarrival time (MIT), which also defaults to zero. - * The logical time at which this scheduled event will trigger is - * the current time plus the offset plus the delay argument given to - * this function. If, however, that time is not greater than a prior - * triggering of this logical action by at least the MIT, then the - * one of two things can happen depending on the policy specified - * for the action. If the action's policy is DROP (default), then the - * action is simply dropped and the memory pointed to by value argument - * is freed. If the policy is DEFER, then the time will be increased - * to equal the time of the most recent triggering plus the MIT. - * - * For the above, "current time" means the logical time of the - * reaction that is calling this function. Logical actions should - * always be scheduled within a reaction invocation, never asynchronously - * from the outside. FIXME: This needs to be checked. - * - * physical action: A physical action has all the same parameters - * as a logical action, but its timestamp will be the larger of the - * current physical time and the time it would be assigned if it - * were a logical action. - * - * There are three conditions under which this function will not - * actually put an event on the event queue and decrement the reference count - * of the token (if there is one), which could result in the payload being - * freed. In all three cases, this function returns 0. Otherwise, - * it returns a handle to the scheduled trigger, which is an integer - * greater than 0. - * - * The first condition is that stop() has been called and the time offset - * of this event is greater than zero. - * The second condition is that the logical time of the event - * is greater that the stop time (timeout) that is specified in the target - * properties or on the command line. - * The third condition is that the trigger argument is null. - * - * @param action The action to be triggered. - * @param extra_delay Extra offset of the event release above that in the action. - * @param token The token to carry the payload or null for no payload. - * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. - */ -trigger_handle_t _lf_schedule_token(lf_action_base_t* action, interval_t extra_delay, lf_token_t* token); - -/** - * Variant of schedule_token that creates a token to carry the specified value. - * The value is required to be malloc'd memory with a size equal to the - * element_size of the specifies action times the length parameter. - * See _lf_schedule_token() for details. - * @param action The action to be triggered. - * @param extra_delay Extra offset of the event release above that in the action. - * @param value Dynamically allocated memory containing the value to send. - * @param length The length of the array, if it is an array, or 1 for a - * scalar and 0 for no payload. - * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. - */ -trigger_handle_t _lf_schedule_value(lf_action_base_t* action, interval_t extra_delay, void* value, size_t length); - -/** - * Schedule an action to occur with the specified value and time offset - * with a copy of the specified value. If the value is non-null, - * then it will be copied into newly allocated memory under the assumption - * that its size is given in the trigger's token object's element_size field - * multiplied by the specified length. - * See _lf_schedule_token(), which this uses, for details. - * @param action Pointer to an action on a self struct. - * @param offset The time offset over and above that in the action. - * @param value A pointer to the value to copy. - * @param length The length, if an array, 1 if a scalar, and 0 if value is NULL. - * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. - */ -trigger_handle_t _lf_schedule_copy(lf_action_base_t* action, interval_t offset, void* value, size_t length); - -/** - * @brief Will create and initialize the required number of environments for the program - * @note Will be code generated by the compiler + * This will free the memory recorded on the allocations list of the specified reactor + * and then free the specified self struct. + * @param self The self struct of the reactor. */ -void _lf_create_environments(); - - -/** - * These functions must be implemented by both threaded and single-threaded - * runtime. Should be routed to appropriate API calls in platform.h -*/ +void lf_free_reactor(self_base_t *self); #endif /* REACTOR_H */ /** @} */ diff --git a/include/core/reactor_common.h b/include/core/reactor_common.h index 47b336b61..97ec976b8 100644 --- a/include/core/reactor_common.h +++ b/include/core/reactor_common.h @@ -1,3 +1,22 @@ +/** + * @file + * @author Edward A. Lee (eal@berkeley.edu) + * @author Marten Lohstroh + * @author Soroush Bateni + * @author Mehrdad Niknami + * @author Alexander Schulz-Rosengarten + * @author Erling Rennemo Jellum + * @copyright (c) 2020-2024, The University of California at Berkeley. + * License: BSD 2-clause + * @brief Declarations of functions with implementations in reactor.c and reactor_threaded.c. + * + * The functions declared in this file, as opposed to the ones in reactor.h, are not meant to be + * called by application programmers. They should be viewed as private functions that make up the + * C runtime. In some cases, the implementation of these functions is in reactor_common.c, and in + * other cases, alternative implementations are provided in reactor.c and reactor_threaded.c. + * A third possibility is that the function is code generated. + */ + #ifndef REACTOR_COMMON_H #define REACTOR_COMMON_H @@ -10,36 +29,170 @@ #include "modes.h" #include "port.h" -// ******** Global Variables :( ******** // -extern unsigned int _lf_number_of_workers; -extern bool fast; -extern instant_t duration; -extern bool keepalive_specified; -extern interval_t _lf_fed_STA_offset; +////////////////////// Constants & Macros ////////////////////// -/** Flag used to disable cleanup operations on normal termination. */ -extern bool _lf_normal_termination; +/** + * @brief Constant giving the minimum amount of time to sleep to wait + * for physical time to reach a logical time. + * + * Unless the "fast" option is given, an LF program will wait until + * physical time matches logical time before handling an event with + * a given logical time. The amount of time is less than this given + * threshold, then no wait will occur. The purpose of this is + * to prevent unnecessary delays caused by simply setting up and + * performing the wait. + */ +#define MIN_SLEEP_DURATION USEC(10) + +////////////////////// Global Variables ////////////////////// +// The following variables are defined in reactor_common.c and used in reactor.c, +// reactor_threaded.c, modes.c, and by the code generator. +extern bool _lf_normal_termination; +extern unsigned int _lf_number_of_workers; extern int default_argc; extern const char** default_argv; +extern instant_t duration; +extern bool fast; +extern bool keepalive_specified; + +#ifdef FEDERATED_DECENTRALIZED +extern interval_t lf_fed_STA_offset; +#endif +// The following is defined by the code generator. extern struct allocation_record_t* _lf_reactors_to_free; -void* _lf_new_reactor(size_t size); -void _lf_free(struct allocation_record_t** head); -void _lf_free_reactor(self_base_t *self); -void _lf_free_all_reactors(void); -void _lf_set_stop_tag(environment_t* env, tag_t tag); -extern interval_t lf_get_stp_offset(); -void lf_set_stp_offset(interval_t offset); -void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number); + +////////////////////// Functions ////////////////////// + +/** + * @brief Create and initialize the required number of environments for the program. + * @note This function will be code generated by the compiler. + */ +void lf_create_environments(void); + +/** + * @brief Free memory on the specified allocation record (a self struct). + * + * This will mark the allocation record empty by setting `*head` to NULL. + * If the argument is NULL, do nothing. + * + * @param head Pointer to the head of a list on which allocations are recorded. + */ +void lf_free(struct allocation_record_t** head); + +/** + * Get a new event. If there is a recycled event available, use that. + * If not, allocate a new one. In either case, all fields will be zero'ed out. + * @param env Environment in which we are executing. + */ +event_t* lf_get_new_event(environment_t* env); + +/** + * @brief Recycle the given event. + * + * This will zero out the event and push it onto the recycle queue. + * @param env Environment in which we are executing. + * @param e The event to recycle. + */ +void lf_recycle_event(environment_t* env, event_t* e); + +/** + * Replace the token on the specified event with the specified + * token and free the old token. + * @param event The event. + * @param token The token. + */ +void lf_replace_token(event_t* event, lf_token_t* token); + +/** + * @brief Generated function that optionally sets default command-line options. + */ +void lf_set_default_command_line_options(void); + +/** + * @brief Perform whatever is needed to start a time step. + * + * For example, this function resets outputs to be absent at the start of a new time step. + * + * @param env The environment in which we are executing + */ void _lf_start_time_step(environment_t *env); -bool _lf_is_tag_after_stop_tag(environment_t* env, tag_t tag); -void _lf_pop_events(environment_t *env); + +/** + * @brief Function that is called when the program is about to exit. + * + * This function will be invoked after all shutdown actions have completed. + * For non-federated programs, the code generator generates an empty function to implement this. + * For federated programs, the function is implemented in federate.c. + * + * @param env The environment in which we are executing + */ +void lf_terminate_execution(environment_t* env); + +/** + * Generated function that produces a table containing all triggers + * (i.e., inputs, timers, and actions). + */ +void _lf_initialize_trigger_objects(); + +/** + * @brief Perform final wrap-up on exit. + * + * This function will be registered to execute on exit. + * It reports elapsed logical and physical times and reports if any + * memory allocated for tokens has not been freed. + */ +void termination(void); + +/** + * @brief Trigger the specified reaction on the specified worker in the specified environment. + * @param env Environment in which we are executing. + * @param reaction The reaction. + * @param worker_number The ID of the worker that is making this call. 0 should be + * used if there is only one worker (e.g., when the program is using the + * single-threaded C runtime). -1 is used for an anonymous call in a context where a + * worker number does not make sense (e.g., the caller is not a worker thread). + */ +void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number); + +/** + * @brief Initialize the given timer. + * If this timer has a zero offset, enqueue the reactions it triggers. + * If this timer is to trigger reactions at a _future_ tag as well, + * schedule it accordingly. + * @param env Environment in which we are executing. + * @param timer The timer to initialize. + */ void _lf_initialize_timer(environment_t* env, trigger_t* timer); + +/** + * @brief Initialize all the timers in the environment + * @param env Environment in which we are executing. + */ void _lf_initialize_timers(environment_t* env); + +/** + * @brief Trigger all the startup reactions in the specified environment. + * @param env Environment in which we are executing. + */ void _lf_trigger_startup_reactions(environment_t* env); + +/** + * @brief Trigger all the shutdown reactions in the specified environment. + * @param env Environment in which we are executing. + */ void _lf_trigger_shutdown_reactions(environment_t *env); -void _lf_recycle_event(environment_t* env, event_t* e); + +/** + * Create dummy events to be used as spacers in the event queue. + * @param env Environment in which we are executing. + * @param trigger The eventual event to be triggered. + * @param time The logical time of that event. + * @param next The event to place after the dummy events. + * @param offset The number of dummy events to insert. + * @return A pointer to the first dummy event. + */ event_t* _lf_create_dummy_events( environment_t* env, trigger_t* trigger, @@ -47,8 +200,44 @@ event_t* _lf_create_dummy_events( event_t* next, microstep_t offset ); + +/** + * @brief Schedule an event at a specific tag (time, microstep). + * + * If there is an event found at the requested tag, the payload + * is replaced and 0 is returned. + * + * Note that this function is an internal API that must be called with a tag that is in the future + * relative to the current tag (or the environment has not started executing). Also, it must be called + * with tags that are in order for a given trigger. This means that the following order is illegal: + * ``` + * _lf_schedule_at_tag(trigger1, bigger_tag, ...); + * _lf_schedule_at_tag(trigger1, smaller_tag, ...); + * ``` + * where `bigger_tag > smaller_tag`. This function is primarily + * used for network communication (which is assumed to be in order). + * + * This function assumes the caller holds the mutex lock. + * + * @param env Environment in which we are executing. + * @param trigger The trigger to be invoked at a later logical time. + * @param tag Logical tag of the event + * @param token The token wrapping the payload or NULL for no payload. + * + * @return A positive trigger handle for success, 0 if no new event was scheduled + * (instead, the payload was updated), or -1 for error (the tag is equal to or less + * than the current tag). + */ trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag_t tag, lf_token_t* token); -trigger_handle_t _lf_schedule(environment_t* env, trigger_t* trigger, interval_t extra_delay, lf_token_t* token); + +/** + * @brief Insert reactions triggered by trigger to the reaction queue. + * @param env Environment in which we are executing. + * @param trigger The trigger. + * @param token The token wrapping the payload or NULL for no payload. + * @return 1 if successful, or 0 if no new reaction was scheduled because the function + * was called incorrectly. + */ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t* trigger, lf_token_t* token); /** @@ -59,14 +248,24 @@ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t* * @param next_time The time step to advance to. */ void _lf_advance_logical_time(environment_t *env, instant_t next_time); -trigger_handle_t _lf_schedule_int(lf_action_base_t* action, interval_t extra_delay, int value); + +/** + * @brief Pop all events from event_q with tag equal to current tag. + * + * This will extract all the reactions triggered by these events and stick them onto the + * reaction queue. + * + * @param env The environment in which we are executing + */ +void _lf_pop_events(environment_t *env); + void _lf_invoke_reaction(environment_t* env, reaction_t* reaction, int worker); void schedule_output_reactions(environment_t *env, reaction_t* reaction, int worker); int process_args(int argc, const char* argv[]); + +/** + * @brief Initialize global variables and start tracing before calling the `_lf_initialize_trigger_objects` function. + */ void initialize_global(); -void termination(void); -int lf_notify_of_event(environment_t* env); -int lf_critical_section_enter(environment_t* env); -int lf_critical_section_exit(environment_t* env); #endif diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index f0f3d424b..a62e5b7e4 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -1,5 +1,15 @@ +/** + * @file + * @author Edward A. Lee (eal@berkeley.edu) + * @author{Marten Lohstroh } + * @author{Soroush Bateni } + * @copyright (c) 2020-2024, The University of California at Berkeley. + * License: BSD 2-clause + * @brief Runtime infrastructure for the threaded version of the C target of Lingua Franca. + */ #ifndef REACTOR_THREADED_H #define REACTOR_THREADED_H + #include "lf_types.h" /** diff --git a/include/core/threaded/watchdog.h b/include/core/threaded/watchdog.h index fe0066821..0b8d0d12c 100644 --- a/include/core/threaded/watchdog.h +++ b/include/core/threaded/watchdog.h @@ -2,7 +2,7 @@ * @file * @author Benjamin Asch * @author Edward A. Lee - * @copyright (c) 2023, The University of California at Berkeley. + * @copyright (c) 2023-2024, The University of California at Berkeley. * License: BSD 2-clause * @brief Declarations for watchdogs. */ @@ -12,6 +12,7 @@ #include "lf_types.h" #include "environment.h" +#include "platform.h" // For lf_thread_t. #ifdef __cplusplus extern "C" { @@ -25,10 +26,7 @@ extern "C" { typedef void(*watchdog_function_t)(void*); /** Typdef for watchdog_t struct, used to call watchdog handler. */ -typedef struct watchdog_t watchdog_t; - -/** Watchdog struct for handler. */ -struct watchdog_t { +typedef struct watchdog_t { struct self_base_t* base; // The reactor that contains the watchdog. trigger_t* trigger; // The trigger associated with this watchdog. instant_t expiration; // The expiration instant for the watchdog. (Initialized to NEVER) @@ -38,7 +36,7 @@ struct watchdog_t { bool active; // Boolean indicating whether or not thread is active. bool terminate; // Boolean indicating whether termination of the thread has been requested. watchdog_function_t watchdog_function; // The function/handler for the watchdog. -}; +} watchdog_t; /** * @brief Start or restart the watchdog timer. @@ -63,6 +61,21 @@ void lf_watchdog_start(watchdog_t* watchdog, interval_t additional_timeout); */ void lf_watchdog_stop(watchdog_t* watchdog); + +///////////////////// Internal functions ///////////////////// +// The following functions are internal to the runtime and should not be documented by Doxygen. +/// \cond INTERNAL // Doxygen conditional. + +/** + * Function to initialize mutexes for watchdogs + */ +void _lf_initialize_watchdogs(environment_t *env); + +/** Terminates all watchdogs inside the environment. */ +void _lf_watchdog_terminate_all(environment_t *env); + +/// \endcond // INTERNAL + #ifdef __cplusplus } #endif diff --git a/lib/schedule.c b/lib/schedule.c index c837dc2e1..28ac9b120 100644 --- a/lib/schedule.c +++ b/lib/schedule.c @@ -8,131 +8,73 @@ #include "schedule.h" #include "reactor.h" +#include "reactor_common.h" +#include "environment.h" + +#include +#include // Defines memcpy. -/** - * Schedule an action to occur with the specified value and time offset - * with no payload (no value conveyed). - * See schedule_token(), which this uses, for details. - * - * @param action Pointer to an action on the self struct. - * @param offset The time offset over and above that in the action. - * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. - */ trigger_handle_t lf_schedule(void* action, interval_t offset) { - return _lf_schedule_token((lf_action_base_t*)action, offset, NULL); + return lf_schedule_token((lf_action_base_t*)action, offset, NULL); } -/** - * Schedule the specified action with an integer value at a later logical - * time that depends on whether the action is logical or physical and - * what its parameter values are. This wraps a copy of the integer value - * in a token. See schedule_token() for more details. - * - * @param action The action to be triggered. - * @param extra_delay Extra offset of the event release above that in the action. - * @param value The value to send. - * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. - */ trigger_handle_t lf_schedule_int(void* action, interval_t extra_delay, int value) { - return _lf_schedule_int((lf_action_base_t*)action, extra_delay, value); + token_template_t* template = (token_template_t*)action; + + // NOTE: This doesn't acquire the mutex lock in the multithreaded version + // until schedule_value is called. This should be OK because the element_size + // does not change dynamically. + if (template->type.element_size != sizeof(int)) { + lf_print_error("Action type is not an integer. element_size is %zu", template->type.element_size); + return -1; + } + int* container = (int*)malloc(sizeof(int)); + *container = value; + return lf_schedule_value(action, extra_delay, container, 1); } -/** - * Schedule the specified action with the specified token as a payload. - * This will trigger an event at a later logical time that depends - * on whether the action is logical or physical and what its parameter - * values are. - * - * logical action: A logical action has an offset (default is zero) - * and a minimum interarrival time (MIT), which also defaults to zero. - * The logical time at which this scheduled event will trigger is - * the current time plus the offset plus the delay argument given to - * this function. If, however, that time is not greater than a prior - * triggering of this logical action by at least the MIT, then the - * one of two things can happen depending on the policy specified - * for the action. If the action's policy is DROP (default), then the - * action is simply dropped and the memory pointed to by value argument - * is freed. If the policy is DEFER, then the time will be increased - * to equal the time of the most recent triggering plus the MIT. - * - * For the above, "current time" means the logical time of the - * reaction that is calling this function. Logical actions should - * always be scheduled within a reaction invocation, never asynchronously - * from the outside. FIXME: This needs to be checked. - * - * physical action: A physical action has all the same parameters - * as a logical action, but its timestamp will be the larger of the - * current physical time and the time it would be assigned if it - * were a logical action. - * - * There are three conditions under which this function will not - * actually put an event on the event queue and decrement the reference count - * of the token (if there is one), which could result in the payload being - * freed. In all three cases, this function returns 0. Otherwise, - * it returns a handle to the scheduled trigger, which is an integer - * greater than 0. - * - * The first condition is that stop() has been called and the time offset - * of this event is greater than zero. - * The second condition is that the logical time of the event - * is greater that the stop time (timeout) that is specified in the target - * properties or on the command line. - * The third condition is that the trigger argument is null. - * - * @param action The action to be triggered. - * @param extra_delay Extra offset of the event release above that in the action. - * @param token The token to carry the payload or null for no payload. - * @return A handle to the event, or 0 if no event was scheduled, or -1 for error. - */ trigger_handle_t lf_schedule_token(void* action, interval_t extra_delay, lf_token_t* token) { - return _lf_schedule_token((lf_action_base_t*)action, extra_delay, token); + environment_t* env = ((lf_action_base_t*)action)->parent->environment; + + LF_CRITICAL_SECTION_ENTER(env); + int return_value = lf_schedule_trigger(env, ((lf_action_base_t*)action)->trigger, extra_delay, token); + // Notify the main thread in case it is waiting for physical time to elapse. + lf_notify_of_event(env); + LF_CRITICAL_SECTION_EXIT(env); + return return_value; } -/** - * Schedule an action to occur with the specified value and time offset with a - * copy of the specified value. If the value is non-null, then it will be copied - * into newly allocated memory under the assumption that its size is given in - * the trigger's token object's element_size field multiplied by the specified - * length. - * - * See schedule_token(), which this uses, for details. - * - * @param action Pointer to an action on a self struct. - * @param offset The time offset over and above that in the action. - * @param value A pointer to the value to copy. - * @param length The length, if an array, 1 if a scalar, and 0 if value is NULL. - * @return A handle to the event, or 0 if no event was scheduled, or -1 for - * error. - */ -trigger_handle_t lf_schedule_copy(void* action, interval_t offset, void* value, int length) { +trigger_handle_t lf_schedule_copy(void* action, interval_t offset, void* value, size_t length) { if (length < 0) { lf_print_error( "schedule_copy():" - " Ignoring request to copy a value with a negative length (%d).", + " Ignoring request to copy a value with a negative length (%zu).", length ); return -1; } - return _lf_schedule_copy((lf_action_base_t*)action, offset, value, (size_t)length); + if (value == NULL) { + return lf_schedule_token(action, offset, NULL); + } + environment_t* env = ((lf_action_base_t*)action)->parent->environment; + token_template_t* template = (token_template_t*)action; + if (action == NULL || template->type.element_size <= 0) { + lf_print_error("schedule: Invalid element size."); + return -1; + } + LF_CRITICAL_SECTION_ENTER(env); + // Initialize token with an array size of length and a reference count of 0. + lf_token_t* token = _lf_initialize_token(template, length); + // Copy the value into the newly allocated memory. + memcpy(token->value, value, template->type.element_size * length); + // The schedule function will increment the reference count. + trigger_handle_t result = lf_schedule_trigger(env, ((lf_action_base_t*)action)->trigger, offset, token); + // Notify the main thread in case it is waiting for physical time to elapse. + lf_notify_of_event(env); + LF_CRITICAL_SECTION_EXIT(env); + return result; } - -/** - * Variant of schedule_token that creates a token to carry the specified value. - * The value is required to be malloc'd memory with a size equal to the - * element_size of the specifies action times the length parameter. - * - * See schedule_token(), which this uses, for details. - * - * @param action The action to be triggered. - * @param extra_delay Extra offset of the event release above that in the - * action. - * @param value Dynamically allocated memory containing the value to send. - * @param length The length of the array, if it is an array, or 1 for a scalar - * and 0 for no payload. - * @return A handle to the event, or 0 if no event was scheduled, or -1 for - * error. - */ trigger_handle_t lf_schedule_value(void* action, interval_t extra_delay, void* value, int length) { if (length < 0) { lf_print_error( @@ -142,7 +84,15 @@ trigger_handle_t lf_schedule_value(void* action, interval_t extra_delay, void* v ); return -1; } - return _lf_schedule_value((lf_action_base_t*)action, extra_delay, value, (size_t)length); + token_template_t* template = (token_template_t*)action; + environment_t* env = ((lf_action_base_t*)action)->parent->environment; + LF_CRITICAL_SECTION_ENTER(env); + lf_token_t* token = _lf_initialize_token_with_value(template, value, length); + int return_value = lf_schedule_trigger(env, ((lf_action_base_t*)action)->trigger, extra_delay, token); + // Notify the main thread in case it is waiting for physical time to elapse. + lf_notify_of_event(env); + LF_CRITICAL_SECTION_EXIT(env); + return return_value; } /** @@ -166,3 +116,230 @@ bool lf_check_deadline(void* self, bool invoke_deadline_handler) { } return false; } + +trigger_handle_t lf_schedule_trigger(environment_t *env, trigger_t* trigger, interval_t extra_delay, lf_token_t* token) { + assert(env != GLOBAL_ENVIRONMENT); + if (lf_is_tag_after_stop_tag(env, env->current_tag)) { + // If schedule is called after stop_tag + // This is a critical condition. + _lf_done_using(token); + lf_print_warning("lf_schedule() called after stop tag."); + return 0; + } + + if (extra_delay < 0LL) { + lf_print_warning("schedule called with a negative extra_delay " PRINTF_TIME ". Replacing with zero.", extra_delay); + extra_delay = 0LL; + } + + LF_PRINT_DEBUG("lf_schedule_trigger: scheduling trigger %p with delay " PRINTF_TIME " and token %p.", + trigger, extra_delay, token); + + // Increment the reference count of the token. + if (token != NULL) { + token->ref_count++; + LF_PRINT_DEBUG("lf_schedule_trigger: Incremented ref_count of %p to %zu.", + token, token->ref_count); + } + + // The trigger argument could be null, meaning that nothing is triggered. + // Doing this after incrementing the reference count ensures that the + // payload will be freed, if there is one. + if (trigger == NULL) { + _lf_done_using(token); + return 0; + } + + // Compute the tag (the logical timestamp for the future event). + // We first do this assuming it is logical action and then, if it is a + // physical action, modify it if physical time exceeds the result. + interval_t delay = extra_delay; + // Add the offset if this is not a timer because, in that case, + // it is the minimum delay. + if (!trigger->is_timer) { + delay += trigger->offset; + } + tag_t intended_tag = (tag_t){.time = env->current_tag.time + delay, .microstep = 0}; + + LF_PRINT_DEBUG("lf_schedule_trigger: env->current_tag.time = " PRINTF_TIME ". Total logical delay = " PRINTF_TIME "", + env->current_tag.time, delay); + interval_t min_spacing = trigger->period; + + event_t* e = lf_get_new_event(env); + + // Initialize the next pointer. + e->next = NULL; + + // Set the payload. + e->token = token; + + // Make sure the event points to this trigger so when it is + // dequeued, it will trigger this trigger. + e->trigger = trigger; + + // If the trigger is physical, then we need to check whether + // physical time is larger than the intended time and, if so, + // modify the intended time. + if (trigger->is_physical) { + // Get the current physical time and assign it as the intended time. + intended_tag.time = lf_time_physical() + delay; + } else { + // FIXME: We need to verify that we are executing within a reaction? + // See reactor_threaded. + // If a logical action is scheduled asynchronously (which should never be + // done) the computed tag can be smaller than the current tag, in which case + // it needs to be adjusted. + // FIXME: This can go away once: + // - we have eliminated the possibility to have a negative additional delay; and + // - we detect the asynchronous use of logical actions + #ifndef NDEBUG + if (intended_tag.time < env->current_tag.time) { + lf_print_warning("Attempting to schedule an event earlier than current time by " PRINTF_TIME " nsec! " + "Revising to the current time " PRINTF_TIME ".", + env->current_tag.time - intended_tag.time, env->current_tag.time); + intended_tag.time = env->current_tag.time; + } + #endif + } + +#ifdef FEDERATED_DECENTRALIZED + // Event inherits the original intended_tag of the trigger + // set by the network stack (or the default, which is (NEVER,0)) + e->intended_tag = trigger->intended_tag; +#endif + + // Check for conflicts (a queued event with the same trigger and time). + if (min_spacing <= 0) { + // No minimum spacing defined. + e->time = intended_tag.time; + event_t* found = (event_t *)pqueue_find_equal_same_priority(env->event_q, e); + // Check for conflicts. Let events pile up in super dense time. + if (found != NULL) { + intended_tag.microstep++; + // Skip to the last node in the linked list. + while(found->next != NULL) { + found = found->next; + intended_tag.microstep++; + } + if (lf_is_tag_after_stop_tag(env, intended_tag)) { + LF_PRINT_DEBUG("Attempt to schedule an event after stop_tag was rejected."); + // Scheduling an event will incur a microstep + // after the stop tag. + lf_recycle_event(env, e); + return 0; + } + // Hook the event into the list. + found->next = e; + trigger->last_tag = intended_tag; + return(0); // FIXME: return value + } + // If there are not conflicts, schedule as usual. If intended time is + // equal to the current logical time, the event will effectively be + // scheduled at the next microstep. + } else if (!trigger->is_timer && trigger->last_tag.time != NEVER) { + // There is a min_spacing and there exists a previously + // scheduled event. It determines the + // earliest time at which the new event can be scheduled. + // Check to see whether the event is too early. + instant_t earliest_time = trigger->last_tag.time + min_spacing; + LF_PRINT_DEBUG("There is a previously scheduled event; earliest possible time " + "with min spacing: " PRINTF_TIME, + earliest_time); + // If the event is early, see which policy applies. + if (earliest_time > intended_tag.time) { + LF_PRINT_DEBUG("Event is early."); + switch(trigger->policy) { + case drop: + LF_PRINT_DEBUG("Policy is drop. Dropping the event."); + // Recycle the new event and decrement the + // reference count of the token. + _lf_done_using(token); + lf_recycle_event(env, e); + return(0); + case replace: + LF_PRINT_DEBUG("Policy is replace. Replacing the previous event."); + // If the event with the previous time is still on the event + // queue, then replace the token. To find this event, we have + // to construct a dummy event_t struct. + event_t* dummy = lf_get_new_event(env); + dummy->next = NULL; + dummy->trigger = trigger; + dummy->time = trigger->last_tag.time; + event_t* found = (event_t *)pqueue_find_equal_same_priority(env->event_q, dummy); + + if (found != NULL) { + // Recycle the existing token and the new event + // and update the token of the existing event. + lf_replace_token(found, token); + lf_recycle_event(env, e); + lf_recycle_event(env, dummy); + // Leave the last_tag the same. + return(0); + } + lf_recycle_event(env, dummy); + + // If the preceding event _has_ been handled, then adjust + // the tag to defer the event. + intended_tag = (tag_t){.time = earliest_time, .microstep = 0}; + break; + default: + // Default policy is defer + intended_tag = (tag_t){.time = earliest_time, .microstep = 0}; + break; + } + } + } + + // Check if the intended time is in the future + // This is a sanity check for the logic above + // FIXME: This is a development assertion and might + // not be necessary for end-user LF programs + #ifndef NDEBUG + if (intended_tag.time < env->current_tag.time) { + lf_print_error("Attempting to schedule an event earlier than current time by " PRINTF_TIME " nsec! " + "Revising to the current time " PRINTF_TIME ".", + env->current_tag.time - intended_tag.time, env->current_tag.time); + intended_tag.time = env->current_tag.time; + } + #endif + + // Set the tag of the event. + e->time = intended_tag.time; + + // Do not schedule events if if the event time is past the stop time + // (current microsteps are checked earlier). + LF_PRINT_DEBUG("Comparing event with elapsed time " PRINTF_TIME " against stop time " PRINTF_TIME ".", e->time - lf_time_start(), env->stop_tag.time - lf_time_start()); + if (e->time > env->stop_tag.time) { + LF_PRINT_DEBUG("lf_schedule_trigger: event time is past the timeout. Discarding event."); + _lf_done_using(token); + lf_recycle_event(env, e); + return(0); + } + + // Store the time in order to check the min spacing + // between this and any following event. + trigger->last_tag = intended_tag; + + // Queue the event. + // NOTE: There is no need for an explicit microstep because + // when this is called, all events at the current tag + // (time and microstep) have been pulled from the queue, + // and any new events added at this tag will go into the reaction_q + // rather than the event_q, so anything put in the event_q with this + // same time will automatically be executed at the next microstep. + LF_PRINT_LOG("Inserting event in the event queue with elapsed time " PRINTF_TIME ".", + e->time - lf_time_start()); + pqueue_insert(env->event_q, e); + + tracepoint_schedule(env->trace, trigger, e->time - env->current_tag.time); + + // FIXME: make a record of handle and implement unschedule. + // NOTE: Rather than wrapping around to get a negative number, + // we reset the handle on the assumption that much earlier + // handles are irrelevant. + trigger_handle_t return_value = env->_lf_handle++; + if (env->_lf_handle < 0) { + env->_lf_handle = 1; + } + return return_value; +} diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 1f7391f92..791fce4ce 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master +further-cleanup diff --git a/python/include/pythontarget.h b/python/include/pythontarget.h index 9acf2390b..d7f2d1177 100644 --- a/python/include/pythontarget.h +++ b/python/include/pythontarget.h @@ -35,7 +35,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * Note for target language developers. This is one way of developing a target language where * the C core runtime is adopted. This file is a translation layer that implements Lingua Franca - * APIs which interact with the internal _lf_SET and _lf_schedule APIs. This file can act as a + * APIs which interact with the lf_set and lf_schedule APIs. This file can act as a * template for future runtime developement for target languages. * For source generation, see xtext/org.icyphy.linguafranca/src/org/icyphy/generator/PythonGenerator.xtend. */ diff --git a/python/lib/pythontarget.c b/python/lib/pythontarget.c index 551f9a71e..c939895ab 100644 --- a/python/lib/pythontarget.c +++ b/python/lib/pythontarget.c @@ -38,10 +38,11 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "python_port.h" #include "python_tag.h" #include "python_time.h" -#include "reactor_common.h" +#include "reactor.h" #include "reactor.h" #include "tag.h" #include "util.h" +#include "environment.h" ////////////// Global variables /////////////// // The global Python object that holds the .py module that the @@ -105,7 +106,7 @@ PyObject* py_schedule(PyObject *self, PyObject *args) { // Pass the token along - _lf_schedule_token(action, offset, t); + lf_schedule_token(action, offset, t); // FIXME: handle is not passed to the Python side @@ -134,7 +135,7 @@ PyObject* py_schedule_copy(PyObject *self, PyObject *args) { exit(1); } - _lf_schedule_copy(action, offset, value, length); + lf_schedule_copy(action, offset, value, length); // FIXME: handle is not passed to the Python side diff --git a/test/src_gen_stub.c b/test/src_gen_stub.c index 9517cb249..ce397822f 100644 --- a/test/src_gen_stub.c +++ b/test/src_gen_stub.c @@ -11,10 +11,10 @@ environment_t _env; -void _lf_initialize_trigger_objects() {} -void terminate_execution() {} -void _lf_set_default_command_line_options() {} -void _lf_initialize_watchdogs() {} +void _lf_initialize_trigger_objects(void) {} +void lf_terminate_execution(void) {} +void lf_set_default_command_line_options(void) {} +void _lf_initialize_watchdogs(environment_t ** envs) {} void logical_tag_complete(tag_t tag_to_send) {} int _lf_get_environments(environment_t ** envs) { *envs = &_env; diff --git a/util/sensor_simulator.c b/util/sensor_simulator.c index 223d9f0ab..f96d1b1f7 100644 --- a/util/sensor_simulator.c +++ b/util/sensor_simulator.c @@ -25,13 +25,13 @@ #define LF_SENSOR_TRIGGER_TABLE_SIZE 96 /** Table of Lingua Franca trigger objects to schedule in response to keypresses. */ -trigger_t* _lf_sensor_trigger_table[LF_SENSOR_TRIGGER_TABLE_SIZE]; +lf_action_base_t* _lf_sensor_trigger_table[LF_SENSOR_TRIGGER_TABLE_SIZE]; /** Trigger for the newline character '\n', which is platform dependent. */ -trigger_t* _lf_sensor_sensor_newline_trigger = NULL; +lf_action_base_t* _lf_sensor_sensor_newline_trigger = NULL; /** Trigger for any key. */ -trigger_t* _lf_sensor_any_key_trigger = NULL; +lf_action_base_t* _lf_sensor_any_key_trigger = NULL; lf_mutex_t _lf_sensor_mutex; lf_cond_t _lf_sensor_simulator_cond_var; @@ -432,19 +432,19 @@ int register_sensor_key(char key, void* action) { if (_lf_sensor_sensor_newline_trigger != NULL) { result = 1; } else { - _lf_sensor_sensor_newline_trigger = action; + _lf_sensor_sensor_newline_trigger = (lf_action_base_t*)action; } } else if (key == '\0') { // Any key trigger. if (_lf_sensor_any_key_trigger != NULL) { result = 1; } else { - _lf_sensor_any_key_trigger = action; + _lf_sensor_any_key_trigger = (lf_action_base_t*)action; } } else if (_lf_sensor_trigger_table[index] != NULL) { result = 1; } else { - _lf_sensor_trigger_table[index] = action; + _lf_sensor_trigger_table[index] = (lf_action_base_t*)action; } LF_MUTEX_UNLOCK(&_lf_sensor_mutex); return result; diff --git a/util/sensor_simulator.h b/util/sensor_simulator.h index 4b18a828e..82749f3d3 100644 --- a/util/sensor_simulator.h +++ b/util/sensor_simulator.h @@ -94,7 +94,7 @@ void show_tick(const char* character); * (error code 3). * @param key The key to register. * @param action The action to trigger when the key is pressed - * (a pointer to a trigger_t struct). + * (a pointer to an lf_action_based_t struct). * @return 0 for success, error code for failure. */ int register_sensor_key(char key, void* action);