Skip to content

Commit

Permalink
Merge branch 'main' into transient-fed
Browse files Browse the repository at this point in the history
  • Loading branch information
ChadliaJerad committed Aug 7, 2024
2 parents b0b0ea6 + 6ef9154 commit 3408e35
Show file tree
Hide file tree
Showing 25 changed files with 375 additions and 47 deletions.
20 changes: 18 additions & 2 deletions .github/workflows/build-rti.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,32 @@ on:
workflow_call:

jobs:
run:
native-build:
strategy:
matrix:
platform: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.platform }}

steps:
- name: Check out reactor-c repository
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Build the RTI with AUTH=OFF
run: .github/scripts/build-rti.sh -DAUTH=OFF
- name: Build the RTI with AUTH=ON
run: .github/scripts/build-rti.sh -DAUTH=ON

docker-build:
runs-on: ubuntu-latest
steps:
- name: Check out reactor-c repository
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build Docker image
uses: docker/build-push-action@v6
with:
file: ./core/federated/RTI/rti.Dockerfile
context: .
platforms: linux/amd64, linux/arm64, linux/arm/v7, linux/riscv64
push: false
tags: lflang/rti:latest
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:

concurrency:
group: ci-${{ github.ref }}-${{ github.event_path }}
cancel-in-progress: ${{ github.ref != 'refs/heads/master' }}
cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}

jobs:
check-labels:
Expand Down Expand Up @@ -97,4 +97,4 @@ jobs:
compiler-ref: ${{ needs.fetch-lf.outputs.ref }}
scheduler: ADAPTIVE
all-platforms: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'mac') || contains( github.event.pull_request.labels.*.name, 'windows') }}
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'schedulers') }}
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'schedulers') }}
26 changes: 14 additions & 12 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ include(${LF_ROOT}/core/lf_utils.cmake)
list(APPEND GENERAL_SOURCES tag.c clock.c port.c mixed_radix.c reactor_common.c lf_token.c environment.c)

# Add tracing support if requested
if (DEFINED LF_TRACE)
if(DEFINED LF_TRACE)
message(STATUS "Including sources specific to tracing.")
list(APPEND GENERAL_SOURCES tracepoint.c)
endif()
Expand All @@ -16,7 +16,7 @@ endif()
list(APPEND REACTORC_SOURCES ${GENERAL_SOURCES})

# Add sources for either threaded or single-threaded runtime
if (DEFINED FEDERATED)
if(DEFINED FEDERATED)
include(federated/CMakeLists.txt)
include(federated/network/CMakeLists.txt)
endif()
Expand All @@ -35,14 +35,14 @@ endif()

# Add sources for the local RTI if we are using scheduling enclaves
if(DEFINED LF_ENCLAVES)
include(federated/RTI/local_rti.cmake)
include(federated/RTI/local_rti.cmake)
endif()

# Include sources from subdirectories
include(utils/CMakeLists.txt)

if (DEFINED MODAL_REACTORS)
include(modal_models/CMakeLists.txt)
if(DEFINED MODAL_REACTORS)
include(modal_models/CMakeLists.txt)
endif()

# Print sources used for compilation
Expand All @@ -53,7 +53,7 @@ add_library(reactor-c)
target_sources(reactor-c PRIVATE ${REACTORC_SOURCES})
lf_enable_compiler_warnings(reactor-c)

if (DEFINED LF_TRACE)
if(DEFINED LF_TRACE)
include(${LF_ROOT}/trace/api/CMakeLists.txt)
target_link_libraries(reactor-c PUBLIC lf::trace-api)
# If the user specified an external trace plugin. Find it and link with it
Expand Down Expand Up @@ -106,18 +106,19 @@ target_include_directories(reactor-c PUBLIC ../include/core/threaded)
target_include_directories(reactor-c PUBLIC ../include/core/utils)
target_include_directories(reactor-c PUBLIC federated/RTI/)

if (APPLE)
SET(CMAKE_C_ARCHIVE_CREATE "<CMAKE_AR> Scr <TARGET> <LINK_FLAGS> <OBJECTS>")
if(APPLE)
SET(CMAKE_C_ARCHIVE_CREATE "<CMAKE_AR> Scr <TARGET> <LINK_FLAGS> <OBJECTS>")
SET(CMAKE_CXX_ARCHIVE_CREATE "<CMAKE_AR> Scr <TARGET> <LINK_FLAGS> <OBJECTS>")
SET(CMAKE_C_ARCHIVE_FINISH "<CMAKE_RANLIB> -no_warning_for_no_symbols -c <TARGET>")
SET(CMAKE_C_ARCHIVE_FINISH "<CMAKE_RANLIB> -no_warning_for_no_symbols -c <TARGET>")
SET(CMAKE_CXX_ARCHIVE_FINISH "<CMAKE_RANLIB> -no_warning_for_no_symbols -c <TARGET>")
endif()

# Link with OpenSSL library
if(DEFINED FEDERATED_AUTHENTICATED)
if (APPLE)
if(APPLE)
set(OPENSSL_ROOT_DIR /usr/local/opt/openssl)
endif()

find_package(OpenSSL REQUIRED)
target_link_libraries(reactor-c PUBLIC OpenSSL::SSL)
endif()
Expand All @@ -130,10 +131,11 @@ if(DEFINED FEDERATED)
endif()

# Unless specified otherwise initial event queue and reaction queue to size 10
if (NOT DEFINED INITIAL_EVENT_QUEUE_SIZE)
if(NOT DEFINED INITIAL_EVENT_QUEUE_SIZE)
set(INITIAL_EVENT_QUEUE_SIZE 10)
endif()
if (NOT DEFINED INITIAL_REACT_QUEUE_SIZE)

if(NOT DEFINED INITIAL_REACT_QUEUE_SIZE)
set(INITIAL_REACT_QUEUE_SIZE 10)
endif()

Expand Down
5 changes: 4 additions & 1 deletion core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
* @brief Initialize the federation-specific parts of the environment struct.
*/
static void environment_init_federated(environment_t* env, int num_is_present_fields) {
#ifdef FEDERATED_DECENTRALIZED
#if defined(FEDERATED_CENTRALIZED)
env->need_to_send_LTC = false;
(void)num_is_present_fields;
#elif defined(FEDERATED_DECENTRALIZED)
if (num_is_present_fields > 0) {
env->_lf_intended_tag_fields = (tag_t**)calloc(num_is_present_fields, sizeof(tag_t*));
LF_ASSERT_NON_NULL(env->_lf_intended_tag_fields);
Expand Down
17 changes: 13 additions & 4 deletions core/federated/RTI/rti.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Docker file for building the image of the rti
FROM alpine:latest
ARG BASEIMAGE=alpine:latest
FROM ${BASEIMAGE} as builder

Check warning on line 2 in core/federated/RTI/rti.Dockerfile

View workflow job for this annotation

GitHub Actions / build-rti / docker-build

The 'as' keyword should match the case of the 'from' keyword

FromAsCasing: 'as' and 'FROM' keywords' casing do not match More info: https://docs.docker.com/go/dockerfile/rule/from-as-casing/
COPY . /lingua-franca
WORKDIR /lingua-franca/core/federated/RTI
RUN set -ex && apk add --no-cache gcc musl-dev cmake make && \
Expand All @@ -9,5 +9,14 @@ RUN set -ex && apk add --no-cache gcc musl-dev cmake make && \
make && \
make install

# Use ENTRYPOINT not CMD so that command-line arguments go through
ENTRYPOINT ["RTI"]
WORKDIR /lingua-franca

# application stage
FROM ${BASEIMAGE} as app

Check warning on line 15 in core/federated/RTI/rti.Dockerfile

View workflow job for this annotation

GitHub Actions / build-rti / docker-build

The 'as' keyword should match the case of the 'from' keyword

FromAsCasing: 'as' and 'FROM' keywords' casing do not match More info: https://docs.docker.com/go/dockerfile/rule/from-as-casing/
LABEL maintainer="lf-lang"
LABEL source="https://github.com/lf-lang/reactor-c/tree/main/core/federated/RTI"
COPY --from=builder /usr/local/bin/RTI /usr/local/bin/RTI

WORKDIR /lingua-franca

ENTRYPOINT ["/usr/local/bin/RTI"]
2 changes: 1 addition & 1 deletion core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void _logical_tag_complete(scheduling_node_t* enclave, tag_t completed) {

enclave->completed = completed;

LF_PRINT_LOG("RTI received from federate/enclave %d the latest tag complete (LTC) " PRINTF_TAG ".", enclave->id,
LF_PRINT_LOG("RTI received from federate/enclave %d the latest tag confirmed (LTC) " PRINTF_TAG ".", enclave->id,
enclave->completed.time - start_time, enclave->completed.microstep);

// Check downstream scheduling_nodes to see whether they should now be granted a TAG.
Expand Down
2 changes: 1 addition & 1 deletion core/federated/RTI/rti_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag);
* If M is equal to the NET of the federate, then return PTAG(M).
*
* This should be called whenever an immediately upstream federate sends to
* the RTI an LTC (latest tag complete), or when a transitive upstream
* the RTI an LTC (latest tag confirmed), or when a transitive upstream
* federate sends a NET (Next Event Tag) message.
* It is also called when an upstream federate resigns from the federation.
*
Expand Down
6 changes: 3 additions & 3 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
LF_MUTEX_UNLOCK(&rti_mutex);
}

void handle_latest_tag_complete(federate_info_t* fed) {
void handle_latest_tag_confirmed(federate_info_t* fed) {
unsigned char buffer[sizeof(int64_t) + sizeof(uint32_t)];
read_from_socket_fail_on_error(&fed->socket, sizeof(int64_t) + sizeof(uint32_t), buffer, NULL,
"RTI failed to read the content of the logical tag complete from federate %d.",
Expand Down Expand Up @@ -1485,8 +1485,8 @@ void* federate_info_thread_TCP(void* fed) {
case MSG_TYPE_NEXT_EVENT_TAG:
handle_next_event_tag(my_fed);
break;
case MSG_TYPE_LATEST_TAG_COMPLETE:
handle_latest_tag_complete(my_fed);
case MSG_TYPE_LATEST_TAG_CONFIRMED:
handle_latest_tag_confirmed(my_fed);
break;
case MSG_TYPE_STOP_REQUEST:
handle_stop_request_message(my_fed); // FIXME: Reviewed until here.
Expand Down
6 changes: 3 additions & 3 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,14 @@ void handle_port_absent_message(federate_info_t* sending_federate, unsigned char
void handle_timed_message(federate_info_t* sending_federate, unsigned char* buffer);

/**
* Handle a latest tag complete (LTC) message. @see
* MSG_TYPE_LATEST_TAG_COMPLETE in rti.h.
* Handle a latest tag confirmed (LTC) message. @see
* MSG_TYPE_LATEST_TAG_CONFIRMED in rti.h.
*
* This function assumes the caller does not hold the mutex.
*
* @param fed The federate that has completed a logical tag.
*/
void handle_latest_tag_complete(federate_info_t* fed);
void handle_latest_tag_confirmed(federate_info_t* fed);

/**
* Handle a next event tag (NET) message. @see MSG_TYPE_NEXT_EVENT_TAG in rti.h.
Expand Down
21 changes: 14 additions & 7 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ static void send_time(unsigned char type, instant_t time) {
/**
* Send a tag to the RTI.
* This function acquires the lf_outbound_socket_mutex.
* @param type The message type (MSG_TYPE_NEXT_EVENT_TAG or MSG_TYPE_LATEST_TAG_COMPLETE).
* @param type The message type (MSG_TYPE_NEXT_EVENT_TAG or MSG_TYPE_LATEST_TAG_CONFIRMED).
* @param tag The tag.
*/
static void send_tag(unsigned char type, tag_t tag) {
Expand Down Expand Up @@ -1280,7 +1280,7 @@ static void handle_provisional_tag_advance_grant() {
// TAG. In either case, we know that at the PTAG tag, all outputs
// have either been sent or are absent, so we can send an LTC.
// Send an LTC to indicate absent outputs.
lf_latest_tag_complete(PTAG);
lf_latest_tag_confirmed(PTAG);
// Nothing more to do.
LF_MUTEX_UNLOCK(&env->mutex);
return;
Expand Down Expand Up @@ -2226,14 +2226,21 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) {
return NULL;
}

void lf_latest_tag_complete(tag_t tag_to_send) {
int compare_with_last_tag = lf_tag_compare(_fed.last_sent_LTC, tag_to_send);
if (compare_with_last_tag >= 0) {
void lf_latest_tag_confirmed(tag_t tag_to_send) {
environment_t* env;
if (lf_tag_compare(_fed.last_sent_LTC, tag_to_send) >= 0) {
return; // Already sent this or later tag.
}
_lf_get_environments(&env);
if (!env->need_to_send_LTC) {
LF_PRINT_LOG("Skip sending Latest Tag Confirmed (LTC) to the RTI because there was no tagged message with the "
"tag " PRINTF_TAG " that this federate has received.",
tag_to_send.time - start_time, tag_to_send.microstep);
return;
}
LF_PRINT_LOG("Sending Latest Tag Complete (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time,
LF_PRINT_LOG("Sending Latest Tag Confirmed (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time,
tag_to_send.microstep);
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send);
send_tag(MSG_TYPE_LATEST_TAG_CONFIRMED, tag_to_send);
_fed.last_sent_LTC = tag_to_send;
}

Expand Down
4 changes: 4 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ void _lf_start_time_step(environment_t* env) {
}
#endif // FEDERATED_DECENTRALIZED

#ifdef FEDERATED_CENTRALIZED
env->need_to_send_LTC = false;
#endif // FEDERATED_CENTRALIZED

// Reset absent fields on network ports because
// their status is unknown
lf_reset_status_fields_on_input_port_triggers();
Expand Down
9 changes: 9 additions & 0 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,15 @@ void _lf_worker_do_work(environment_t* env, int worker_number) {
worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index),
current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->deadline);

#ifdef FEDERATED_CENTRALIZED
if (current_reaction_to_execute->is_an_input_reaction) {
// This federate has received a tagged message with the current tag and
// must send LTC at the current tag to confirm that the federate has successfully
// received and processed tagged messages with the current tag.
env->need_to_send_LTC = true;
}
#endif // FEDERATED_CENTRALIZED

bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute);

if (!violation) {
Expand Down
16 changes: 14 additions & 2 deletions core/threaded/watchdog.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@ void watchdog_wait(watchdog_t* watchdog) {
instant_t physical_time = lf_time_physical();
while (watchdog->expiration != NEVER && physical_time < watchdog->expiration && !watchdog->terminate) {
// Wait for expiration, or a signal to stop or terminate.
LF_PRINT_DEBUG("Watchdog %p sleeps until " PRINTF_TIME, (void*)watchdog, watchdog->expiration);
lf_clock_cond_timedwait(&watchdog->cond, watchdog->expiration);
physical_time = lf_time_physical();
LF_PRINT_DEBUG("Watchdog %p woke up at " PRINTF_TIME " expires at " PRINTF_TIME, (void*)watchdog, physical_time,
watchdog->expiration);
}
LF_PRINT_DEBUG("Watchdog %p returns with expired=%d terminated=%d", (void*)watchdog,
physical_time >= watchdog->expiration, watchdog->terminate);
}

/**
Expand Down Expand Up @@ -94,24 +99,31 @@ static void* watchdog_thread_main(void* arg) {

// Step 1: Wait for a timeout to start watching for.
if (watchdog->expiration == NEVER) {
LF_PRINT_DEBUG("Watchdog %p waiting on cond var to be started", (void*)watchdog);
// Watchdog has been stopped.
// Let the runtime know that we are in an inactive/stopped state.
watchdog->active = false;
// Wait here until the watchdog is started and we can enter the active state.
LF_COND_WAIT(&watchdog->cond);
LF_PRINT_DEBUG("Watchdog %p woke up from cond var", (void*)watchdog);
continue;
} else {
// Watchdog has been started.
LF_PRINT_DEBUG("Watchdog %p started", (void*)watchdog);
watchdog_wait(watchdog);

// At this point we have returned from the watchdog wait. But it could
// be that it was to terminate the watchdog.
if (watchdog->terminate)
if (watchdog->terminate) {
LF_PRINT_DEBUG("Watchdog %p was terminated", (void*)watchdog);
break;
}

// It could also be that the watchdog was stopped
if (watchdog->expiration == NEVER)
if (watchdog->expiration == NEVER) {
LF_PRINT_DEBUG("Watchdog %p was stopped", (void*)watchdog);
continue;
}

// If we reach here, the watchdog actually timed out. Handle it.
LF_PRINT_DEBUG("Watchdog %p timed out", (void*)watchdog);
Expand Down
1 change: 1 addition & 0 deletions include/core/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ typedef struct environment_t {
#if defined(FEDERATED)
tag_t** _lf_intended_tag_fields;
int _lf_intended_tag_fields_size;
bool need_to_send_LTC;
#endif // FEDERATED
#ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef
enclave_info_t* enclave_info;
Expand Down
8 changes: 4 additions & 4 deletions include/core/federated/federate.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ typedef struct federate_instance_t {
bool received_stop_request_from_rti;

/**
* A record of the most recently sent LTC (latest tag complete) message.
* A record of the most recently sent LTC (latest tag confirmed) message.
* In some situations, federates can send logical_tag_complete for
* the same tag twice or more in-a-row to the RTI. For example, when
* _lf_next() returns without advancing tag. To prevent overwhelming
* the RTI with extra messages, record the last sent logical tag
* complete message and check against it in lf_latest_tag_complete().
* complete message and check against it in lf_latest_tag_confirmed().
*
* @note Here, the underlying assumption is that the TCP stack will
* deliver the Logical TAG Complete message to the RTI eventually
Expand Down Expand Up @@ -297,7 +297,7 @@ void lf_enqueue_port_absent_reactions(environment_t* env);
void* lf_handle_p2p_connections_from_federates(void*);

/**
* @brief Send a latest tag complete (LTC) signal to the RTI.
* @brief Send a latest tag confirmed (LTC) signal to the RTI.
*
* This avoids the send if an equal or later LTC has previously been sent.
*
Expand All @@ -306,7 +306,7 @@ void* lf_handle_p2p_connections_from_federates(void*);
*
* @param tag_to_send The tag to send.
*/
void lf_latest_tag_complete(tag_t);
void lf_latest_tag_confirmed(tag_t);

/**
* @brief Parse the address of the RTI and store them into the global federation_metadata struct.
Expand Down
Loading

0 comments on commit 3408e35

Please sign in to comment.