Skip to content

Commit

Permalink
Enable reload of Mirror/Aviso configuration ECFLOW-1986
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosbento authored Nov 14, 2024
2 parents 27d1cdc + 349c1ac commit 3c97cfe
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 43 deletions.
5 changes: 4 additions & 1 deletion docs/client_api/api/alter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ alter
For change:
[ variable | clock_type | clock_gain | clock_date | clock_sync | event | meter | label |
trigger | complete | repeat | limit_max | limit_value | defstatus | late | time |
today, aviso, mirror ]
today | aviso | mirror ]
*NOTE* If the clock is changed, then the suite will need to be re-queued in order for
the change to take effect fully.
For add:
Expand All @@ -47,6 +47,9 @@ alter
* for mirror, "--listener '{ \"event\": \"mars\", \"request\": { \"class\": "od" } }'
--url http://aviso/ --schema /path/to/schema --polling 60"
For both aviso and mirror, the special value "reload" can be used to force reloading the configuration.
n.b. This is typically useful after updating variables used to configure these kind of attributes.
Usage:
ecflow_client --alter=add variable GLOBAL "value" / # add server variable
Expand Down
25 changes: 19 additions & 6 deletions docs/glossary.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,15 @@
the user.

Each aviso attribute implies that a background thread is spawned whenever
the associated :term:`node` is (re)queued. This background thread is
responsible for polling the Aviso server, and periodically processing the
latest notifications.
the associated :term:`node` is (re)queued. This independent background thread,
responsible for polling the Aviso server and periodically processing the latest notifications,
uses the configuriguration available when the associated task is queued.

.. note::

If any variables provinding the configuration are updated, the Aviso configuration
can be reloaded (without unqueuing the Task) by issuing an Alter change command with
the value :code:`reload` to the relevant Aviso attribute.

The authentication credentials file is expected to be in JSON format, following the `ECMWF Web API <https://www.ecmwf.int/en/computing/software/ecmwf-web-api>`_:

Expand Down Expand Up @@ -1424,9 +1430,16 @@
(empty string), which effectively disables Authentication

Each mirror attribute implies that a background thread is spawned whenever
the ecFlow server is :term:`running<server states>`. This background thread is
responsible for polling the remote ecFlow server, and periodically
synchronise node status.
the ecFlow server is :term:`running<server states>` (i.e. when the server is shutdown or halted the
thread is terminated and the mirroring process is completely stopped).
This independent background thread, responsible for polling the remote ecFlow server and periodically
synchronise node status, uses the configuration available when the server is restarted.

.. note::

If any variables provinding the configuration are updated, the Mirror configuration can be
reloaded (without restarting the Server) by issuing an Alter change command with the value
:code:`reload` to the relevant attributes.

The authentication credentials file is expected to be in JSON, according to the following format:

Expand Down
5 changes: 4 additions & 1 deletion libs/base/src/ecflow/base/cts/user/AlterCmd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ const char* AlterCmd::desc() {
" For change:\n"
" [ variable | clock_type | clock_gain | clock_date | clock_sync | event | meter | label |\n"
" trigger | complete | repeat | limit_max | limit_value | defstatus | late | time |\n"
" today, aviso, mirror ]\n"
" today | aviso | mirror ]\n"
" *NOTE* If the clock is changed, then the suite will need to be re-queued in order for\n"
" the change to take effect fully.\n"
" For add:\n"
Expand All @@ -640,6 +640,9 @@ const char* AlterCmd::desc() {
" * for mirror, \"--listener '{ \\\"event\\\": \\\"mars\\\", \\\"request\\\": { \\\"class\\\": \"od\" } }'\n"
" --url http://aviso/ --schema /path/to/schema --polling 60\"\n"
"\n"
"For both aviso and mirror, the special value \"reload\" can be used to force reloading the configuration.\n"
" n.b. This is typically useful after updating variables used to configure these kind of attributes.\n"
"\n"
"Usage:\n\n"
" ecflow_client --alter=add variable GLOBAL \"value\" / # add server variable\n"
" ecflow_client --alter=add variable FRED \"value\" /path/to/node # add node variable\n"
Expand Down
61 changes: 37 additions & 24 deletions libs/core/src/ecflow/core/Log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ void Log::create(const std::string& filename) {
}

void Log::destroy() {
if (instance_)
if (instance_) {
instance_->flush();
}

delete instance_;
instance_ = nullptr;
Expand All @@ -52,11 +53,9 @@ void Log::create_logimpl() {
}

bool Log::log(Log::LogType lt, const std::string& message) {
create_logimpl();
std::lock_guard lock(mx_);

// if (!logImpl_->log_open_error().empty()) {
// cerr << "Log::log: " << message << "\n";
// }
create_logimpl();

if (!logImpl_->log(lt, message)) {
// handle write failure and Get the failure reason. This will delete logImpl_ & recreate
Expand All @@ -70,11 +69,9 @@ bool Log::log(Log::LogType lt, const std::string& message) {
}

bool Log::log_no_newline(Log::LogType lt, const std::string& message) {
create_logimpl();
std::lock_guard lock(mx_);

// if (!logImpl_->log_open_error().empty()) {
// cerr << "Log::log_no_newline : " << message << "\n";
// }
create_logimpl();

if (!logImpl_->log_no_newline(lt, message)) {
// handle write failure and Get the failure reason. This will delete logImpl_ & recreate
Expand All @@ -88,11 +85,9 @@ bool Log::log_no_newline(Log::LogType lt, const std::string& message) {
}

bool Log::append(const std::string& message) {
create_logimpl();
std::lock_guard lock(mx_);

// if (!logImpl_->log_open_error().empty()) {
// cerr << "Log::append : " << message << "\n";
// }
create_logimpl();

if (!logImpl_->append(message)) {
// handle write failure and Get the failure reason. This will delete logImpl_ & recreate
Expand All @@ -106,26 +101,36 @@ bool Log::append(const std::string& message) {
}

void Log::cache_time_stamp() {
std::lock_guard lock(mx_);

create_logimpl();
logImpl_->create_time_stamp();
}

const std::string& Log::get_cached_time_stamp() const {
std::lock_guard lock(mx_);

return (logImpl_) ? logImpl_->get_cached_time_stamp() : Str::EMPTY();
}

void Log::flush() {
std::lock_guard lock(mx_);

// will close ofstream and force data to be written to disk.
// Forcing writing to physical medium can't be guaranteed though!
logImpl_.reset();
}

void Log::flush_only() {
if (logImpl_)
std::lock_guard lock(mx_);

if (logImpl_) {
logImpl_->flush();
}
}

void Log::clear() {
std::lock_guard lock(mx_);
flush();

// Open and truncate the file.
Expand All @@ -136,6 +141,8 @@ void Log::clear() {
}

void Log::new_path(const std::string& the_new_path) {
std::lock_guard lock(mx_);

check_new_path(the_new_path);

// flush and close log file
Expand Down Expand Up @@ -174,6 +181,8 @@ void Log::check_new_path(const std::string& new_path) {
}

std::string Log::path() const {
std::lock_guard lock(mx_);

if (!fileName_.empty() && fileName_[0] == '/') {
// Path is absolute return as is
return fileName_;
Expand All @@ -185,6 +194,8 @@ std::string Log::path() const {
}

std::string Log::contents(int get_last_n_lines) {
std::lock_guard lock(mx_);

if (get_last_n_lines == 0) {
return string();
}
Expand All @@ -200,6 +211,8 @@ std::string Log::contents(int get_last_n_lines) {
}

std::string Log::handle_write_failure() {
std::lock_guard lock(mx_);

std::string msg = logImpl_->log_open_error();
if (msg.empty()) {
msg += "\nFailed to write to log file: ";
Expand All @@ -213,22 +226,20 @@ std::string Log::handle_write_failure() {
logImpl_.reset();
create_logimpl();

if (logImpl_->log_open_error().empty())
if (logImpl_->log_open_error().empty()) {
msg += "\nAttempting to close/reopen log file.";
else
}
else {
msg += "\nAttempting to close/reopen log file did not work!";
}

if (LogToCout::ok())
if (LogToCout::ok()) {
Indentor::indent(cout) << msg << '\n';
}
return msg;
}

bool log(Log::LogType lt, const std::string& message) {
// For debug of simulator enable this
// if (LogToCout::ok()) {
// Indentor::indent(cout) << message << '\n';
// }

if (Log::instance()) {
return Log::instance()->log(lt, message);
}
Expand Down Expand Up @@ -367,8 +378,9 @@ bool LogImpl::do_log(Log::LogType lt, const std::string& message, bool newline)

// XXX:[HH:MM:SS D.M.YYYY] chd:fullname [+additional information]
// XXX:[HH:MM:SS D.M.YYYY] --<user_cmd> [+additional information]
if (time_stamp_.empty() || lt == Log::ERR || lt == Log::WAR || lt == Log::DBG)
if (time_stamp_.empty() || lt == Log::ERR || lt == Log::WAR || lt == Log::DBG) {
create_time_stamp();
}

// re-use memory allocated to log_type_and_time_stamp_
log_type_and_time_stamp_.clear();
Expand All @@ -377,8 +389,9 @@ bool LogImpl::do_log(Log::LogType lt, const std::string& message, bool newline)

if (message.find("\n") == std::string::npos) {
file_ << log_type_and_time_stamp_ << message;
if (newline)
if (newline) {
file_ << '\n';
}
}
else {
// If message has \n then split into multiple lines
Expand Down
3 changes: 3 additions & 0 deletions libs/core/src/ecflow/core/Log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include <fstream>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

Expand Down Expand Up @@ -121,6 +122,8 @@ class Log {
std::unique_ptr<LogImpl> logImpl_;
std::string fileName_;
std::string log_error_;

mutable std::recursive_mutex mx_;
};

// Flush log on destruction
Expand Down
8 changes: 8 additions & 0 deletions libs/node/src/ecflow/node/AvisoAttr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ void AvisoAttr::reset() {
}
}

void AvisoAttr::reload() {
if (controller_) {
state_change_no_ = Ecf::incr_state_change_no();
finish();
start();
}
}

bool AvisoAttr::isFree() const {

if (controller_ == nullptr) {
Expand Down
13 changes: 13 additions & 0 deletions libs/node/src/ecflow/node/AvisoAttr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class AvisoAttr {
static constexpr const char* default_polling = "%ECF_AVISO_POLLING%";
static constexpr const char* default_auth = "%ECF_AVISO_AUTH%";

static constexpr const char* reload_option_value = "reload";

static bool is_valid_name(const std::string& name);

/**
Expand Down Expand Up @@ -98,8 +100,19 @@ class AvisoAttr {

bool why(std::string& theReasonWhy) const;

/**
* Initialises the Aviso procedure, which effectively starts the background polling mechanism.
* Typically, called when traversing the tree -- does nothing if Aviso service is already set up.
*/
void reset();

/**
* Restarts the Aviso procedure, which effectively stops before restarting the background polling mechanism.
* Typicallly, called explicitly via Alter command -- forces the reinitialisation of the Aviso service,
* guaranteeing that parameters, given as ECF variables, are reevaluated.
*/
void reload();

[[nodiscard]] bool isFree() const;

void start() const;
Expand Down
16 changes: 13 additions & 3 deletions libs/node/src/ecflow/node/MirrorAttr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ void MirrorAttr::reset() {
start_controller();
}

void MirrorAttr::reload() {
if (controller_) {
state_change_no_ = Ecf::incr_state_change_no();
stop_controller();
start_controller();
}
}

void MirrorAttr::finish() {
state_change_no_ = Ecf::incr_state_change_no();
stop_controller();
}

Expand Down Expand Up @@ -190,7 +199,7 @@ std::string MirrorAttr::resolve_cfg(const std::string& value,
}

void MirrorAttr::start_controller() {
if (controller_ == nullptr) {
if (!controller_) {

// Resolve variables in configuration
// In the case of the 'remote_host', we have to resolve the configuration
Expand All @@ -217,7 +226,8 @@ void MirrorAttr::start_controller() {

SLOG(D,
"MirrorAttr: start polling Mirror attribute '" << absolute_name() << "', from " << remote_path_ << " @ "
<< remote_host << ':' << remote_port << ")");
<< remote_host << ':' << remote_port << ") using polling: "
<< polling << " s");

std::uint32_t polling_value;
try {
Expand Down Expand Up @@ -247,7 +257,7 @@ void MirrorAttr::start_controller() {
}

void MirrorAttr::stop_controller() {
if (controller_ != nullptr) {
if (controller_) {
SLOG(D,
"MirrorAttr: finishing polling for Mirror attribute \"" << parent_->absNodePath() << ":" << name_
<< "\", from host: " << remote_host_
Expand Down
9 changes: 9 additions & 0 deletions libs/node/src/ecflow/node/MirrorAttr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class MirrorAttr {
static constexpr const char* fallback_polling = "120";
static constexpr const char* fallback_remote_auth = "";

static constexpr const char* reload_option_value = "reload";

static bool is_valid_name(const std::string& name);

/**
Expand Down Expand Up @@ -104,8 +106,15 @@ class MirrorAttr {

/**
* Initialises the Mirror procedure, which effectively starts the background polling mechanism.
* Typically, called when traversing the tree -- does nothing if Mirror service is already set up.
*/
void reset();
/**
* Restarts the Mirror procedure, which effectively stops before restarting the background polling mechanism.
* Typicallly, called explicitly via Alter command -- forces the reinitialisation of the Mirror service,
* guaranteeing that parameters, given as ECF variables, are reevaluated.
*/
void reload();
void finish();

/**
Expand Down
Loading

0 comments on commit 3c97cfe

Please sign in to comment.