Skip to content

Commit

Permalink
Merge pull request #445 from lf-lang/LTC-optimization
Browse files Browse the repository at this point in the history
Optimization of LTC signals
  • Loading branch information
byeonggiljun authored Aug 1, 2024
2 parents 1fae32c + 1a248a0 commit 543b738
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 24 deletions.
5 changes: 4 additions & 1 deletion core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
* @brief Initialize the federation-specific parts of the environment struct.
*/
static void environment_init_federated(environment_t* env, int num_is_present_fields) {
#ifdef FEDERATED_DECENTRALIZED
#if defined(FEDERATED_CENTRALIZED)
env->need_to_send_LTC = false;
(void)num_is_present_fields;
#elif defined(FEDERATED_DECENTRALIZED)
if (num_is_present_fields > 0) {
env->_lf_intended_tag_fields = (tag_t**)calloc(num_is_present_fields, sizeof(tag_t*));
LF_ASSERT_NON_NULL(env->_lf_intended_tag_fields);
Expand Down
2 changes: 1 addition & 1 deletion core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) {

enclave->completed = completed;

LF_PRINT_LOG("RTI received from federate/enclave %d the latest tag complete (LTC) " PRINTF_TAG ".", enclave->id,
LF_PRINT_LOG("RTI received from federate/enclave %d the latest tag confirmed (LTC) " PRINTF_TAG ".", enclave->id,
enclave->completed.time - start_time, enclave->completed.microstep);

// Check downstream scheduling_nodes to see whether they should now be granted a TAG.
Expand Down
2 changes: 1 addition & 1 deletion core/federated/RTI/rti_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag);
* If M is equal to the NET of the federate, then return PTAG(M).
*
* This should be called whenever an immediately upstream federate sends to
* the RTI an LTC (latest tag complete), or when a transitive upstream
* the RTI an LTC (latest tag confirmed), or when a transitive upstream
* federate sends a NET (Next Event Tag) message.
* It is also called when an upstream federate resigns from the federation.
*
Expand Down
6 changes: 3 additions & 3 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
LF_MUTEX_UNLOCK(&rti_mutex);
}

void handle_latest_tag_complete(federate_info_t* fed) {
void handle_latest_tag_confirmed(federate_info_t* fed) {
unsigned char buffer[sizeof(int64_t) + sizeof(uint32_t)];
read_from_socket_fail_on_error(&fed->socket, sizeof(int64_t) + sizeof(uint32_t), buffer, NULL,
"RTI failed to read the content of the logical tag complete from federate %d.",
Expand Down Expand Up @@ -1110,8 +1110,8 @@ void* federate_info_thread_TCP(void* fed) {
case MSG_TYPE_NEXT_EVENT_TAG:
handle_next_event_tag(my_fed);
break;
case MSG_TYPE_LATEST_TAG_COMPLETE:
handle_latest_tag_complete(my_fed);
case MSG_TYPE_LATEST_TAG_CONFIRMED:
handle_latest_tag_confirmed(my_fed);
break;
case MSG_TYPE_STOP_REQUEST:
handle_stop_request_message(my_fed); // FIXME: Reviewed until here.
Expand Down
6 changes: 3 additions & 3 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ void handle_port_absent_message(federate_info_t* sending_federate, unsigned char
void handle_timed_message(federate_info_t* sending_federate, unsigned char* buffer);

/**
* Handle a latest tag complete (LTC) message. @see
* MSG_TYPE_LATEST_TAG_COMPLETE in rti.h.
* Handle a latest tag confirmed (LTC) message. @see
* MSG_TYPE_LATEST_TAG_CONFIRMED in rti.h.
*
* This function assumes the caller does not hold the mutex.
*
* @param fed The federate that has completed a logical tag.
*/
void handle_latest_tag_complete(federate_info_t* fed);
void handle_latest_tag_confirmed(federate_info_t* fed);

/**
* Handle a next event tag (NET) message. @see MSG_TYPE_NEXT_EVENT_TAG in rti.h.
Expand Down
21 changes: 14 additions & 7 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ static void send_time(unsigned char type, instant_t time) {
/**
* Send a tag to the RTI.
* This function acquires the lf_outbound_socket_mutex.
* @param type The message type (MSG_TYPE_NEXT_EVENT_TAG or MSG_TYPE_LATEST_TAG_COMPLETE).
* @param type The message type (MSG_TYPE_NEXT_EVENT_TAG or MSG_TYPE_LATEST_TAG_CONFIRMED).
* @param tag The tag.
*/
static void send_tag(unsigned char type, tag_t tag) {
Expand Down Expand Up @@ -1277,7 +1277,7 @@ static void handle_provisional_tag_advance_grant() {
// TAG. In either case, we know that at the PTAG tag, all outputs
// have either been sent or are absent, so we can send an LTC.
// Send an LTC to indicate absent outputs.
lf_latest_tag_complete(PTAG);
lf_latest_tag_confirmed(PTAG);
// Nothing more to do.
LF_MUTEX_UNLOCK(&env->mutex);
return;
Expand Down Expand Up @@ -2202,14 +2202,21 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) {
return NULL;
}

void lf_latest_tag_complete(tag_t tag_to_send) {
int compare_with_last_tag = lf_tag_compare(_fed.last_sent_LTC, tag_to_send);
if (compare_with_last_tag >= 0) {
void lf_latest_tag_confirmed(tag_t tag_to_send) {
environment_t* env;
if (lf_tag_compare(_fed.last_sent_LTC, tag_to_send) >= 0) {
return; // Already sent this or later tag.
}
_lf_get_environments(&env);
if (!env->need_to_send_LTC) {
LF_PRINT_LOG("Skip sending Latest Tag Confirmed (LTC) to the RTI because there was no tagged message with the "
"tag " PRINTF_TAG " that this federate has received.",
tag_to_send.time - start_time, tag_to_send.microstep);
return;
}
LF_PRINT_LOG("Sending Latest Tag Complete (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time,
LF_PRINT_LOG("Sending Latest Tag Confirmed (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time,
tag_to_send.microstep);
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send);
send_tag(MSG_TYPE_LATEST_TAG_CONFIRMED, tag_to_send);
_fed.last_sent_LTC = tag_to_send;
}

Expand Down
4 changes: 4 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ void _lf_start_time_step(environment_t* env) {
}
#endif // FEDERATED_DECENTRALIZED

#ifdef FEDERATED_CENTRALIZED
env->need_to_send_LTC = false;
#endif // FEDERATED_CENTRALIZED

// Reset absent fields on network ports because
// their status is unknown
lf_reset_status_fields_on_input_port_triggers();
Expand Down
9 changes: 9 additions & 0 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,15 @@ void _lf_worker_do_work(environment_t* env, int worker_number) {
worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index),
current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->deadline);

#ifdef FEDERATED_CENTRALIZED
if (current_reaction_to_execute->is_an_input_reaction) {
// This federate has received a tagged message with the current tag and
// must send LTC at the current tag to confirm that the federate has successfully
// received and processed tagged messages with the current tag.
env->need_to_send_LTC = true;
}
#endif // FEDERATED_CENTRALIZED

bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute);

if (!violation) {
Expand Down
1 change: 1 addition & 0 deletions include/core/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ typedef struct environment_t {
#if defined(FEDERATED)
tag_t** _lf_intended_tag_fields;
int _lf_intended_tag_fields_size;
bool need_to_send_LTC;
#endif // FEDERATED
#ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef
enclave_info_t* enclave_info;
Expand Down
8 changes: 4 additions & 4 deletions include/core/federated/federate.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ typedef struct federate_instance_t {
bool received_stop_request_from_rti;

/**
* A record of the most recently sent LTC (latest tag complete) message.
* A record of the most recently sent LTC (latest tag confirmed) message.
* In some situations, federates can send logical_tag_complete for
* the same tag twice or more in-a-row to the RTI. For example, when
* _lf_next() returns without advancing tag. To prevent overwhelming
* the RTI with extra messages, record the last sent logical tag
* complete message and check against it in lf_latest_tag_complete().
* complete message and check against it in lf_latest_tag_confirmed().
*
* @note Here, the underlying assumption is that the TCP stack will
* deliver the Logical TAG Complete message to the RTI eventually
Expand Down Expand Up @@ -291,7 +291,7 @@ void lf_enqueue_port_absent_reactions(environment_t* env);
void* lf_handle_p2p_connections_from_federates(void*);

/**
* @brief Send a latest tag complete (LTC) signal to the RTI.
* @brief Send a latest tag confirmed (LTC) signal to the RTI.
*
* This avoids the send if an equal or later LTC has previously been sent.
*
Expand All @@ -300,7 +300,7 @@ void* lf_handle_p2p_connections_from_federates(void*);
*
* @param tag_to_send The tag to send.
*/
void lf_latest_tag_complete(tag_t);
void lf_latest_tag_confirmed(tag_t);

/**
* @brief Parse the address of the RTI and store them into the global federation_metadata struct.
Expand Down
6 changes: 3 additions & 3 deletions include/core/federated/network/net_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* each federate has a valid event at the start tag (start time, 0) and it will
* inform the RTI of this event.
* Subsequently, at the conclusion of each tag, each federate will send a
* `MSG_TYPE_LATEST_TAG_COMPLETE` followed by a `MSG_TYPE_NEXT_EVENT_TAG` (see
* `MSG_TYPE_LATEST_TAG_CONFIRMED` followed by a `MSG_TYPE_NEXT_EVENT_TAG` (see
* the comment for each message for further explanation). Each federate would
* have to wait for a `MSG_TYPE_TAG_ADVANCE_GRANT` or a
* `MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT` before it can advance to a
Expand Down Expand Up @@ -434,12 +434,12 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#define MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT 8

/**
* Byte identifying a latest tag complete (LTC) message sent by a federate
* Byte identifying a latest tag confirmed (LTC) message sent by a federate
* to the RTI.
* The next eight bytes will be the timestep of the completed tag.
* The next four bytes will be the microsteps of the completed tag.
*/
#define MSG_TYPE_LATEST_TAG_COMPLETE 9
#define MSG_TYPE_LATEST_TAG_CONFIRMED 9

/////////// Messages used in lf_request_stop() ///////////////
//// Overview of the algorithm:
Expand Down
2 changes: 1 addition & 1 deletion lingua-franca-ref.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
master
LTC-optimization

0 comments on commit 543b738

Please sign in to comment.