diff --git a/src/gesacmonit.py b/src/gesacmonit.py index a4cbef5d4776bf9f7903b61e1da5a6f82cc9cb06..af668e6c8dc0876fe08bd309a98c353f01930ee3 100755 --- a/src/gesacmonit.py +++ b/src/gesacmonit.py @@ -607,7 +607,7 @@ class GesacMonit(multiprocessing.Process): def __init__(self, config, points=None, daemon=False): multiprocessing.Process.__init__(self) self._logger = logging.getLogger(self.__class__.__name__) - self._logger.warning('Starting %s', self.name) + self._logger.info('Starting %s', self.name) insert_rate = self._get_estimate_insertions_per_second(len(points)) self.points = points @@ -680,7 +680,7 @@ class GesacMonit(multiprocessing.Process): def shutdown(self, signal=None, frame=None): """Gracefully close this process.""" - self._logger.warning('Stopping %s', self.name) + self._logger.info('Stopping %s', self.name) self._consumer.shutdown() self._exit.set() @@ -853,9 +853,8 @@ def get_gesac_points(config, logger): if config['db_enabled']: try: db_conn = psycopg2.connect(config['dsn']) - except Exception: - logger.exception("Couldn't connect to database, check database conf file") + logger.error("Couldn't connect to database, check database conf file") db_return = False; if db_return: @@ -867,14 +866,15 @@ def get_gesac_points(config, logger): url = config['points_url'] r = requests.get(url); if r.status_code != 200: - logger.exception("Couldn't get data from API, check database conf file") + logger.error("Couldn't get data from API, check database conf file") api_return = False; if api_return: data = r.json() if (not db_return) and (not api_return): - return () + return None + # This is a legacy part from a older simmc version, the format of the data # recieved is not defined yet. Once it is defined this part can be edited points = [] @@ -936,6 +936,23 @@ def check_and_restore_data(config, logger, children): logger.info('Restore complete...') +def ensure_proccesses_are_running(config, logger, children, points): + """ Ensure all monitoring processes are still running + Args: + config: configuration dictionary + logger: an instance of the logger. + children: a list containing the GesacMonit processess. + """ + cores = multiprocessing.cpu_count() + + for idx, p in enumerate(children): + if not p.is_alive() and p.exit_code > 0: + proc_pts = list(chunks(points, cores))[idx] + proc = GesacMonit(config, Points(proc_pts)) + proc.start() + children[idx] = proc + + ######################################################################## # MAIN ################################################################# ######################################################################## @@ -964,10 +981,14 @@ if __name__ == '__main__': # Create db connection and get cursor config = get_conf(conf_file) - # number of cores - cores = multiprocessing.cpu_count() - points = get_gesac_points(config, logger) + if not points: + logger.error('Error fetching points. Check DB or API connection and try again') + sys.exit(1) + + + # Get the number of available cores + cores = multiprocessing.cpu_count() # before we start, let's check for data to be restored check_and_restore_data(config, logger, []) @@ -982,9 +1003,18 @@ if __name__ == '__main__': signal.signal(signal.SIGINT, exit_handler) signal.signal(signal.SIGTERM, exit_handler) + # Count the number of five minute intervals passed + count_day = 0 while True: - # Every 24h check if new GESAC points were registered - time.sleep(86400) - - update_gesac_points(config, logger, children) - check_and_restore_data(config, logger, children) + # Check every five minutes if no process have died + time.sleep(300) + ensure_proccesses_are_running(config, logger, children, points) + + # Update once a day (every 288 five minute interval) the list of + # monitored points and check for data to restore + if count_day == 288: + update_gesac_points(config, logger, children) + check_and_restore_data(config, logger, children) + count_day = 0 + else: + count_day += 1