#!/usr/bin/env python3 import pika import dbus from random import randint from time import sleep import os import socket import logging ##################################################################### # Configuration class ##################################################################### class Config: server_host = 'rabbitmq.edu-al.unipmn.it'; server_port = 5672; server_user = 'labmanager-client'; server_password = '3aB7=2zV9:1jN3-4hE2'; server_virtualhost = 'labmanager'; server_exchange_name = 'labmanager'; server_exchange_type = 'x-recent-history'; # It's a plugin. Enable with 'rabbitmq-plugins enable rabbitmq_recent_history_exchange' server_credentials = pika.PlainCredentials(server_user, server_password, True) systemd_getconfig_unit_name = 'esame-get-config.service' systemd_manager = None ##################################################################### ##################################################################### # LOGGER Configuration ##################################################################### # Values: DEBUG, INFO, WARNING, ERROR, CRITICAL LOG_LEVEL = logging.INFO LOG_FORMAT = ('%(levelname) -10s %(asctime)s: %(message)s') LOGGER = logging.getLogger(__name__) ##################################################################### ##################################################################### ''' Creates an object to interact with Systemd via DBUS ''' ##################################################################### def create_systemd_manager(): bus = dbus.SystemBus() # Get systemd reference systemd = bus.get_object( 'org.freedesktop.systemd1', '/org/freedesktop/systemd1') # Create systemd manager systemd_manager = dbus.Interface( systemd, 'org.freedesktop.systemd1.Manager') return systemd_manager def random_sleep(min, max): sleep_secs = randint(min, max) LOGGER.info('Sleep %d seconds' % sleep_secs) sleep(sleep_secs) ''' On message callback ''' def on_message_callback(ch, method, properties, body): LOGGER.info('Received message \'%s\'' % body.decode('UTF-8')) # Sleep a random number of seconds (between 1 and 10) # to avoid all machines reboot at the same time random_sleep(1, 10) LOGGER.info('Starting Systemd service \'%s\'' % Config.systemd_getconfig_unit_name) start_unit = Config.systemd_manager.StartUnit(Config.systemd_getconfig_unit_name, 'fail') ''' Main flow ''' def main(): logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT) # If we don't use AMQP, just exit (if the variable is not set, set it to 0) ESAME_AMQP = os.environ.get('ESAME_AMQP', '0') LOGGER.info('ESAME_AMQP=%s', ESAME_AMQP) if ESAME_AMQP != '1': exit(0) Config.systemd_manager = create_systemd_manager() if Config.systemd_manager == None: exit(0) # Main loop while True: try: connection = pika.BlockingConnection( pika.ConnectionParameters( host=Config.server_host, port=Config.server_port, virtual_host=Config.server_virtualhost, credentials=Config.server_credentials ) ) LOGGER.info('Connected to %s on port %d' % (Config.server_host, Config.server_port)) channel = connection.channel() ''' The publisher declares the exchange Here we just check it exists before binding a queue to it ''' check_exchange = channel.exchange_declare(exchange=Config.server_exchange_name, exchange_type=Config.server_exchange_type, passive=True) LOGGER.info("check_exchange.method = %s", check_exchange.method) if type(check_exchange.method) is pika.spec.Exchange.DeclareOk: result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=Config.server_exchange_name, queue=queue_name) LOGGER.info('Waiting for RabbitMQ messages') channel.basic_consume( queue=queue_name, on_message_callback=on_message_callback, auto_ack=True) channel.start_consuming() # Intercept some error conditions and continue except (pika.exceptions.ConnectionClosedByBroker, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError, socket.gaierror, ValueError) as error: LOGGER.error('Error: %s', error.args) random_sleep(10, 20) continue if __name__ == '__main__': main()