Skip to content

Commit

Permalink
Merge pull request #365 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Nov 12, 2024
2 parents 8b9558b + fa74062 commit 32a23cf
Show file tree
Hide file tree
Showing 15 changed files with 322 additions and 39 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ RUN ln -fs /opt/idds/config/idds/supervisord_idds.ini /etc/supervisord.d/idds.in
RUN ln -fs /opt/idds/config/idds/supervisord_httpd.ini /etc/supervisord.d/httpd.ini
# RUN ln -fs /opt/idds/config/idds/supervisord_syslog-ng.ini /etc/supervisord.d/syslog-ng.ini
RUN ln -fs /opt/idds/config/idds/supervisord_logrotate.ini /etc/supervisord.d/logrotate.ini
RUN ln -fs /opt/idds/config/idds/supervisord_healthmonitor.ini /etc/supervisord.d/healthmonitor.ini
RUN ln -fs /opt/idds/config/idds/logrotate_idds /etc/logrotate.d/idds

# for syslog-ng
Expand Down
27 changes: 23 additions & 4 deletions common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def get_log_dir():
return "/var/log/idds"


def setup_logging(name, stream=None, loglevel=None):
def setup_logging(name, stream=None, log_file=None, loglevel=None):
"""
Setup logging
"""
Expand All @@ -79,14 +79,33 @@ def setup_logging(name, stream=None, loglevel=None):
loglevel = loglevel.upper()
loglevel = getattr(logging, loglevel)

if stream is None:
if log_file is not None:
if not log_file.startswith("/"):
logdir = None
if config_has_section('common') and config_has_option('common', 'logdir'):
logdir = config_get('common', 'logdir')
if not logdir:
logdir = '/var/log/idds'
log_file = os.path.join(logdir, log_file)

if log_file:
logging.basicConfig(filename=log_file,
level=loglevel,
format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
elif stream is None:
if os.environ.get('IDDS_LOG_FILE', None):
idds_log_file = os.environ.get('IDDS_LOG_FILE', None)
logging.basicConfig(filename=idds_log_file,
level=loglevel,
format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
elif config_has_section('common') and config_has_option('common', 'logdir'):
logging.basicConfig(filename=os.path.join(config_get('common', 'logdir'), name),
elif ((config_has_section('common') and config_has_option('common', 'logdir') and config_has_option('common', 'logfile')) or log_file):
if log_file:
log_filename = log_file
else:
log_filename = config_get('common', 'logfile')
if not log_filename.startswith("/"):
log_filename = os.path.join(config_get('common', 'logdir'), log_filename)
logging.basicConfig(filename=log_filename,
level=loglevel,
format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
else:
Expand Down
4 changes: 4 additions & 0 deletions main/config_default/healthmonitor_daemon
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
# while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status -d /etc/logrotate.d/idds; sleep 86400; done

while true; do python /opt/idds/config/idds/idds_health_check.py; sleep 600; done
223 changes: 223 additions & 0 deletions main/config_default/idds_health_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
#!/usr/bin/python

"""
check iDDS health
"""

import json
import os
import re
import subprocess
import time


def check_command(command, check_string):
print("Checking command : {0}".format(command))
print("For string : {0}".format(check_string))

tmp_array = command.split()
output = (
subprocess.Popen(tmp_array, stdout=subprocess.PIPE)
.communicate()[0]
.decode("ascii")
)

if re.search(check_string, output):
print("Found the string, return 100")
return 100
else:
print("String not found, return 0")
return 0


def is_logrotate_running():
# get the count of logrotate processes - if >=1 then logrotate is running
output = (
subprocess.Popen(
"ps -eo pgid,args | grep logrotate | grep -v grep | wc -l",
stdout=subprocess.PIPE,
shell=True,
)
.communicate()[0]
.decode("ascii")
)

try:
cleaned_output = output.strip()
n_logrotate_processes = int(cleaned_output)
except ValueError:
print(
"The string has an unexpected format and couldn't be converted to an integer."
)

# logrotate process found
if n_logrotate_processes >= 1:
print("Logrotate is running")
return True

return False


def is_restarting():
# get the count of logrotate processes - if >=1 then logrotate is running
output = (
subprocess.Popen(
"ps -eo pgid,args | grep restart|grep http | grep -v grep | wc -l",
stdout=subprocess.PIPE,
shell=True,
)
.communicate()[0]
.decode("ascii")
)

try:
cleaned_output = output.strip()
n_restarting_processes = int(cleaned_output)
except ValueError:
print(
"The string has an unexpected format and couldn't be converted to an integer."
)

# logrotate process found
if n_restarting_processes >= 1:
print("http is restarting")
return True

return False


def http_availability(host):
# check the http
avail = 0
if os.environ.get('X509_USER_PROXY', None):
curl = "curl -i -k --cert $X509_USER_PROXY --key $X509_USER_PROXY --cacert $X509_USER_PROXY https://%s:8443/idds/ping" % host
avail = check_command(curl, '"Status": "OK"')
print("http check availability (with proxy): %s" % avail)
elif os.environ.get('PANDA_AUTH', None) and os.environ.get('PANDA_AUTH_VO', None) and os.environ.get('PANDA_AUTH_ID_TOKEN', None):
curl = "curl -i -k -H \"X-IDDS-Auth-Type: ${PANDA_AUTH}\" -H \"X-IDDS-Auth-VO: ${PANDA_AUTH_VO}\" -H \"X-Idds-Auth-Token: ${PANDA_AUTH_ID_TOKEN}\" https://%s:8443/idds/ping" % host
avail = check_command(curl, '"Status": "OK"')
print("http check availability (with oidc token): %s" % avail)
if not avail or avail == 0:
curl = "curl -i -k https://%s:8443/idds/ping" % host
avail = check_command(curl, 'IDDSException')
print("http check availability (without proxy): %s" % avail)

if not avail or avail == 0:
logrotate_running = is_logrotate_running()
restarting = is_restarting()
if logrotate_running and restarting:
print("log rotation is running and http is restarting")
return 1
return avail


def process_availability():
# check the http
process_avail = 0
output = (
subprocess.Popen(
"ps -eo pgid,args | grep 'idds/agents/main.py' | grep -v grep | uniq",
stdout=subprocess.PIPE,
shell=True,
)
.communicate()[0]
.decode("ascii")
)
count = 0
for line in output.split("\n"):
line = line.strip()
if line == "":
continue
count += 1
if count >= 1:
process_avail = 100

print("agent process check availability: %s" % process_avail)
return process_avail


def heartbeat_availability(log_location):
avail = 100
hang_workers = 0
heartbeat_file = os.path.join(log_location, 'idds_availability')
if not os.path.exists(heartbeat_file):
avail = 0
print("idds_heartbeat at %s not exist, avail: %s" % (heartbeat_file, avail))
return avail, hang_workers

mod_time = os.path.getmtime(heartbeat_file)
print("idds_heartbeat updated at %s (currently is %s, %s seconds ago)" % (mod_time, time.time(), time.time() - mod_time))
if mod_time < time.time() - 1800:
avail = 0
return avail, hang_workers

try:
with open(heartbeat_file, 'r') as f:
d = json.load(f)
for agent in d:
info = d[agent]
num_hang_workers = info['num_hang_workers']
num_active_workers = info['num_active_workers']
if num_active_workers > 0 and num_hang_workers > 0:
hang_workers += num_hang_workers
agent_avail = int(num_hang_workers * 100 / num_active_workers)
if agent_avail < avail:
avail = agent_avail
print("iDDS agent %s has % hang workers" % num_hang_workers)
except Exception as ex:
print("Failed to parse idds_heartbeat: %s" % str(ex))
avail = 50

return avail, hang_workers


def idds_availability(host, log_location):
infos = {}
http_avail = http_availability(host)
print(f"http avail: {http_avail}")

process_avail = process_availability()
print(f"agent daemon avail: {process_avail}")

heartbeat_avail, hang_workers = heartbeat_availability(log_location)
print(f"heartbeat avail: {heartbeat_avail}, hang workers: {hang_workers}")
infos['num_hang_workers'] = hang_workers

if not http_avail:
availability = 0
avail_info = "iDDS http rest service is not running"
elif not process_avail:
availability = 50
avail_info = "iDDS agents are not running"
else:
if not heartbeat_avail:
availability = 50
avail_info = "iDDS agents are running. However heartbeat file is not found (or not renewed)"
elif heartbeat_avail < 100:
availability = heartbeat_avail
avail_info = "iDDS agents are running. However there are hanging workers"
else:
availability = heartbeat_avail
avail_info = "iDDS is OK"

print("availability: %s, avail_info: %s, infos: %s" % (availability, avail_info, infos))

return availability, avail_info, infos


def main():
host = 'localhost'
log_location = '/var/log/idds'
avail, avail_info, infos = idds_availability(host, log_location)

health_file = os.path.join(log_location, 'idds_health')
if avail >= 100:
with open(health_file, 'w') as f:
f.write('OK')
else:
if os.path.exists(health_file):
os.remove(health_file)


if __name__ == '__main__':
main()
4 changes: 3 additions & 1 deletion main/config_default/logrotate_daemon
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!/bin/bash
# while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status -d /etc/logrotate.d/idds; sleep 86400; done

while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds >> /var/log/idds/logrotate.log 2>&1; sleep 3600; done
# while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds >> /var/log/idds/logrotate.log 2>&1; sleep 3600; done

while true; do /usr/sbin/logrotate -s /var/log/idds/logrotate.status /etc/logrotate.d/idds; sleep 86400; done
17 changes: 17 additions & 0 deletions main/config_default/supervisord_healthmonitor.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[program:health-monitor]
# command=/usr/sbin/logrotate -s /var/log/idds/logrotate.status -d /etc/logrotate.d/idds
command=/opt/idds/config/idds/healthmonitor_daemon
# process_name=%(process_num)02d
# user=atlpan
childlogdir=/var/log/idds
stdout_logfile=/var/log/idds/%(program_name)s-stdout.log
stderr_logfile=/var/log/idds/%(program_name)s-stderr.log
stdout_logfile_maxbytes=2GB
stderr_logfile_maxbytes=2GB
stdout_logfile_backups=1
stderr_logfile_backups=1
redirect_stderr=false
autorestart=true
stopsignal=TERM
stopasgroup=true
exitcodes=1
3 changes: 2 additions & 1 deletion main/lib/idds/agents/carrier/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ def update_processing(self, processing, processing_model, use_bulk_update_mappin
new_contents_ext=processing.get('new_contents_ext', None),
update_contents_ext=processing.get('update_contents_ext', None),
new_input_dependency_contents=processing.get('new_input_dependency_contents', None),
use_bulk_update_mappings=use_bulk_update_mappings)
use_bulk_update_mappings=use_bulk_update_mappings,
message_bulk_size=self.message_bulk_size)
except exceptions.DatabaseException as ex:
if 'ORA-00060' in str(ex):
self.logger.warn(log_prefix + "update_processing (cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/core/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def add_messages(messages, bulk_size=1000, session=None):
@transactional_session
def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=None,
source=None, request_id=None, workload_id=None, transform_id=None,
processing_id=None, use_poll_period=False, retries=None, delay=60,
processing_id=None, use_poll_period=False, retries=None, delay=None,
min_request_id=None, fetching_id=None, internal_id=None,
record_fetched=False, record_fetched_status=MessageStatus.Fetched,
session=None):
Expand Down
21 changes: 5 additions & 16 deletions main/lib/idds/rest/v1/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <wen.guan@cern.ch>, 2019 - 2023
# - Wen Guan, <wen.guan@cern.ch>, 2019 - 2024

"""----------------------
Web service app
----------------------"""

import logging
import sys

import flask
from flask import Flask, Response

from idds.common import exceptions
# from idds.common.authentication import authenticate_x509, authenticate_oidc, authenticate_is_super_user
from idds.common.config import (config_has_section, config_has_option, config_get)
from idds.common.constants import HTTP_STATUS_CODE
from idds.common.utils import get_rest_debug
# from idds.common.utils import get_rest_debug, setup_logging, get_logger
from idds.core.authentication import authenticate_x509, authenticate_oidc, authenticate_is_super_user
# from idds.common.utils import get_rest_url_prefix
from idds.rest.v1 import requests
Expand All @@ -39,7 +39,6 @@

class LoggingMiddleware(object):
def __init__(self, app, logger, url_map):
import logging
self._app = app
self._logger = logger
self._url_map = url_map
Expand Down Expand Up @@ -155,20 +154,10 @@ def after_request(response):
return response


def setup_logging(loglevel=None):
if loglevel is None:
if config_has_section('common') and config_has_option('common', 'loglevel'):
loglevel = getattr(logging, config_get('common', 'loglevel').upper())
else:
loglevel = logging.INFO

logging.basicConfig(stream=sys.stdout, level=loglevel,
format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')


def create_app(auth_type=None):

setup_logging()
# setup_logging(name='idds_app', log_file="idds_rest.log")
# get_logger(name='idds_app', filename='idds_rest.log')

# url_prefix = get_rest_url_prefix()
application = Flask(__name__)
Expand Down
Loading

0 comments on commit 32a23cf

Please sign in to comment.