Skip to content

Commit

Permalink
Merge pull request #354 from lf-lang/further-cleanup
Browse files Browse the repository at this point in the history
Further cleanup
  • Loading branch information
lhstrh authored Feb 23, 2024
2 parents 188fb3c + 028ba34 commit aaa38d8
Show file tree
Hide file tree
Showing 30 changed files with 977 additions and 1,407 deletions.
1 change: 1 addition & 0 deletions core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ static void send_failed_signal(federate_info_t* fed) {

/**
* @brief Function to run upon termination.
*
* This function will be invoked both after main() returns and when a signal
* that results in terminating the process, such as SIGINT. In the former
* case, it should do nothing. In the latter case, it will send a MSG_TYPE_FAILED
Expand Down
6 changes: 3 additions & 3 deletions core/federated/RTI/rti_local.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ void initialize_local_rti(environment_t *envs, int num_envs) {
rti_local->base.scheduling_nodes[i] = (scheduling_node_t *) enclave_info;

// Encode the connection topology into the enclave_info object.
enclave_info->base.num_downstream = _lf_get_downstream_of(i, &enclave_info->base.downstream);
enclave_info->base.num_upstream = _lf_get_upstream_of(i, &enclave_info->base.upstream);
_lf_get_upstream_delay_of(i, &enclave_info->base.upstream_delay);
enclave_info->base.num_downstream = lf_get_downstream_of(i, &enclave_info->base.downstream);
enclave_info->base.num_upstream = lf_get_upstream_of(i, &enclave_info->base.upstream);
lf_get_upstream_delay_of(i, &enclave_info->base.upstream_delay);

enclave_info->base.state = GRANTED;
}
Expand Down
81 changes: 70 additions & 11 deletions core/federated/RTI/rti_local.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/**
* @file
* @author Erling Jellum (erling.r.jellum@ntnu.no)
* @author Edward A. Lee (eal@berkeley.edu)
* @author Chadlia Jerad (chadlia.jerad@ensi-uma.tn)
* @author Soroush Bateni (soroush@utdallas.edu)
* @copyright (c) 2020-2024, The University of California at Berkeley
* License in [BSD 2-clause](https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md)
*
* @brief This file declares functions used to implement scheduling enclaves.
*
* A scheduling enclave is portion of the runtime system that maintains its own event
* and reaction queues and has its own scheduler. It uses a local runtime infrastructure (RTI)
* to coordinate the advancement of tags across enclaves.
*/

#ifndef RTI_LOCAL_H
#define RTI_LOCAL_H

Expand All @@ -8,9 +24,9 @@
#include "rti_common.h"

/**
* @brief Structure holding information about each enclave in the program
* The first field is the generic scheduling_node_info struct
* @brief Structure holding information about each enclave in the program.
*
* The first field is the generic scheduling_node_info struct
*/
typedef struct enclave_info_t {
scheduling_node_t base;
Expand All @@ -21,14 +37,15 @@ typedef struct enclave_info_t {

/**
* @brief Structure holding information about the local RTI
*
*/
typedef struct {
rti_common_t base;
} rti_local_t;

/**
* @brief Dynamically create and initialize the local RTI.
* @param envs Array of environments.
* @param num_envs Number of environments.
*/
void initialize_local_rti(environment_t* envs, int num_envs);

Expand All @@ -39,13 +56,15 @@ void free_local_rti();

/**
* @brief Initialize the enclave object.
*
* @param enclave
* @param enclave The enclave object to initialize.
* @param idx The index of the enclave.
* @param env The environment of the enclave.
*/
void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t *env);

/**
* @brief Notify the local RTI of a next event tag (NET).
*
* This function call may block. A call to this function serves two purposes.
* 1) It is a promise that, unless receiving events from other enclaves, this
* enclave will not produce any event until the next_event_tag (NET) argument.
Expand All @@ -65,8 +84,9 @@ void initialize_enclave_info(enclave_info_t* enclave, int idx, environment_t *en
tag_t rti_next_event_tag_locked(enclave_info_t* enclave, tag_t next_event_tag);

/**
* @brief This function informs the local RTI that `enclave` has completed tag
* `completed`. This will update the data structures and can release other
* @brief Inform the local RTI that `enclave` has completed tag `completed`.
*
* This will update the data structures and can release other
* enclaves waiting on a TAG.
*
* This assumes the caller is holding the environment mutex of the source enclave.
Expand All @@ -77,16 +97,55 @@ tag_t rti_next_event_tag_locked(enclave_info_t* enclave, tag_t next_event_tag);
void rti_logical_tag_complete_locked(enclave_info_t* enclave, tag_t completed);

/**
* @brief This function is called after scheduling an event onto the event queue
* of another enclave. The source enclave must call this function to potentially update
* @brief Notify the local RTI to update the next event tag (NET) of a target enclave.
*
* This function is called after scheduling an event onto the event queue of another enclave.
* The source enclave must call this function to potentially update
* the NET of the target enclave.
*
* This assumes the caller is holding the environment mutex of the target enclave.
*
* @param target The enclave of which we want to update the NET of
* @param net The proposed next event tag
* @param src The enclave that has scheduled an event.
* @param target The enclave of which we want to update the NET of.
* @param net The proposed next event tag.
*/
void rti_update_other_net_locked(enclave_info_t* src, enclave_info_t* target, tag_t net);

/**
* @brief Get the array of ids of enclaves directly upstream of the specified enclave.
*
* This updates the specified result pointer to point to a statically allocated array of IDs
* and returns the length of the array. The implementation is code-generated.
*
* @param enclave_id The enclave for which to report upstream IDs.
* @param result The pointer to dereference and update to point to the resulting array.
* @return The number of direct upstream enclaves.
*/
int lf_get_upstream_of(int enclave_id, int** result);

/**
* @brief Get the array of ids of enclaves directly downstream of the specified enclave.
*
* This updates the specified result pointer to point to a statically allocated array of IDs
* and returns the length of the array. The implementation is code-generated.
*
* @param enclave_id The enclave for which to report downstream IDs.
* @param result The pointer to dereference and update to point to the resulting array.
* @return The number of direct downstream enclaves.
*/
int lf_get_downstream_of(int enclave_id, int** result);

/**
* @brief Retrieve the delays on the connections to direct upstream enclaves.
*
* This updates the result pointer to point to a statically allocated array of delays.
* The implementation is code-generated.
*
* @param enclave_id The enclave for which to search for upstream delays.
* @param result The pointer to dereference and update to point to the resulting array.
* @return int The number of direct upstream enclaves.
*/
int lf_get_upstream_delay_of(int enclave_id, interval_t** result);

#endif // LF_ENCLAVES
#endif // RTI_LOCAL_H
15 changes: 8 additions & 7 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "reactor.h"
#include "reactor_common.h"
#include "reactor_threaded.h"
#include "api/schedule.h"
#include "scheduler.h"
#include "trace.h"

Expand Down Expand Up @@ -371,7 +372,7 @@ static trigger_handle_t schedule_message_received_from_network_locked(
LF_PRINT_LOG("Calling schedule with 0 delay and intended tag " PRINTF_TAG ".",
trigger->intended_tag.time - start_time,
trigger->intended_tag.microstep);
return_value = _lf_schedule(env, trigger, extra_delay, token);
return_value = lf_schedule_trigger(env, trigger, extra_delay, token);
#endif
} else {
// In case the message is in the future, call
Expand Down Expand Up @@ -501,7 +502,7 @@ static int handle_message(int* socket, int fed_id) {
LF_PRINT_LOG("Message received by federate: %s. Length: %zu.", message_contents, length);

LF_PRINT_DEBUG("Calling schedule for message received on a physical connection.");
_lf_schedule_value(action, 0, message_contents, length);
lf_schedule_value(action, 0, message_contents, length);
return 0;
}

Expand Down Expand Up @@ -1122,7 +1123,7 @@ static void* update_ports_from_staa_offsets(void* args) {
staa_t* staa_elem = staa_lst[i];
// The staa_elem is adjusted in the code generator to have subtracted the delay on the connection.
// The list is sorted in increasing order of adjusted STAA offsets.
// The wait_until function automatically adds the _lf_fed_STA_offset to the wait time.
// The wait_until function automatically adds the lf_fed_STA_offset to the wait time.
interval_t wait_until_time = env->current_tag.time + staa_elem->STAA;
LF_PRINT_DEBUG("**** (update thread) original wait_until_time: " PRINTF_TIME, wait_until_time - lf_time_start());

Expand All @@ -1136,7 +1137,7 @@ static void* update_ports_from_staa_offsets(void* args) {
// block progress of any execution that is actually processing events.
// It only slightly delays the decision that an event is absent, and only
// if the STAA and STA are extremely small.
if (_lf_fed_STA_offset + staa_elem->STAA < 5 * MIN_SLEEP_DURATION) {
if (lf_fed_STA_offset + staa_elem->STAA < 5 * MIN_SLEEP_DURATION) {
wait_until_time += 5 * MIN_SLEEP_DURATION;
}
while (a_port_is_unknown(staa_elem)) {
Expand Down Expand Up @@ -1363,7 +1364,7 @@ static void handle_stop_granted_message() {
received_stop_tag.microstep++;
}

_lf_set_stop_tag(&env[i], received_stop_tag);
lf_set_stop_tag(&env[i], received_stop_tag);
LF_PRINT_DEBUG("Setting the stop tag to " PRINTF_TAG ".",
env[i].stop_tag.time - start_time,
env[i].stop_tag.microstep);
Expand Down Expand Up @@ -1662,7 +1663,7 @@ static bool bounded_NET(tag_t* tag) {
* generates an empty implementation.
* @param env The environment of the federate
*/
void terminate_execution(environment_t* env) {
void lf_terminate_execution(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

// For an abnormal termination (e.g. a SIGINT), we need to send a
Expand Down Expand Up @@ -2668,7 +2669,7 @@ int lf_send_tagged_message(environment_t* env,
// tag of the outgoing message.
tag_t current_message_intended_tag = lf_delay_tag(env->current_tag, additional_delay);

if (_lf_is_tag_after_stop_tag(env, current_message_intended_tag)) {
if (lf_is_tag_after_stop_tag(env, current_message_intended_tag)) {
// Message tag is past the timeout time (the stop time) so it should not be sent.
LF_PRINT_LOG("Dropping message because it will be after the timeout time.");
return -1;
Expand Down
32 changes: 4 additions & 28 deletions core/lf_token.c
Original file line number Diff line number Diff line change
@@ -1,33 +1,9 @@
/**
* @file
* @author Edward A. Lee (eal@berkeley.edu)
*
* @section LICENSE
* Copyright (c) 2022, The University of California at Berkeley.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* @section DESCRIPTION
*
* Functions supporting token types. See lf_token.h for docs.
* * @copyright (c) 2020-2024, The University of California at Berkeley.
* License: <a href="https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md">BSD 2-clause</a>
* @brief Functions supporting token types. See lf_token.h for docs.
*/

#if !defined NDEBUG
Expand All @@ -47,7 +23,7 @@ int _lf_count_token_allocations;
#include "lf_types.h"
#include "hashset/hashset_itr.h"
#include "util.h"
#include "reactor_common.h" // Enter/exit critical sections
#include "platform.h" // Enter/exit critical sections
#include "port.h" // Defines lf_port_base_t.

lf_token_t* _lf_tokens_allocated_in_reactions = NULL;
Expand Down
11 changes: 6 additions & 5 deletions core/modal_models/modes.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "platform.h"
#include "lf_types.h"
#include "modes.h"
#include "reactor.h"
#include "reactor_common.h"

// Bit masks for the internally used flags on modes
Expand Down Expand Up @@ -390,7 +391,7 @@ void _lf_process_mode_changes(
for (int j = 0; j < timer_triggers_size; j++) {
trigger_t* timer = timer_triggers[j];
if (timer->period == 0 && timer->mode == state->next_mode) {
_lf_schedule(env, timer, timer->offset, NULL);
lf_schedule_trigger(env, timer, timer->offset, NULL);
}
}
}
Expand All @@ -407,7 +408,7 @@ void _lf_process_mode_changes(
LF_PRINT_DEBUG("Modes: Re-enqueuing reset timer.");
// Reschedule the timer with no additional delay.
// This will take care of super dense time when offset is 0.
_lf_schedule(env, timer, event->trigger->offset, NULL);
lf_schedule_trigger(env, timer, event->trigger->offset, NULL);
}
// No further processing; drops all events upon reset (timer event was recreated by schedule and original can be removed here)
} else if (state->next_mode != state->current_mode && event->trigger != NULL) { // History transition to a different mode
Expand All @@ -430,11 +431,11 @@ void _lf_process_mode_changes(
event_t* tmp = e->next;
e = tmp->next;
// A fresh event was created by schedule, hence, recycle old one
_lf_recycle_event(env, tmp);
lf_recycle_event(env, tmp);
}
}
// A fresh event was created by schedule, hence, recycle old one
_lf_recycle_event(env, event);
lf_recycle_event(env, event);

// Remove suspended event and continue
suspended_event = _lf_remove_suspended_event(suspended_event);
Expand Down Expand Up @@ -541,7 +542,7 @@ void _lf_process_mode_changes(
void _lf_terminate_modal_reactors(environment_t* env) {
_lf_suspended_event_t* suspended_event = _lf_suspended_events_head;
while(suspended_event != NULL) {
_lf_recycle_event(env, suspended_event->event);
lf_recycle_event(env, suspended_event->event);
_lf_suspended_event_t* next = suspended_event->next;
free(suspended_event);
suspended_event = next;
Expand Down
2 changes: 1 addition & 1 deletion core/platform/lf_zephyr_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "lf_zephyr_support.h"
#include "lf_zephyr_board_support.h"
#include "platform.h"
#include "reactor_common.h"
#include "reactor.h"
#include "utils/util.h"
#include "tag.h"

Expand Down
Loading

0 comments on commit aaa38d8

Please sign in to comment.