Skip to content
Snippets Groups Projects
rabbitmq-client.py 4.66 KiB
Newer Older
#!/usr/bin/env python3
import pika
import dbus
from random import randint
from time import sleep
import os
import socket
import logging

#####################################################################
# Configuration class
#####################################################################
class Config:
    server_host = 'rabbitmq.edu-al.unipmn.it';
    server_port = 5672;
    server_user = 'labmanager-client';
    server_password = '3aB7=2zV9:1jN3-4hE2';
    server_virtualhost = 'labmanager';
    server_exchange_name = 'labmanager';
    server_exchange_type = 'x-recent-history'; # It's a plugin. Enable with 'rabbitmq-plugins enable rabbitmq_recent_history_exchange'
    server_credentials = pika.PlainCredentials(server_user, server_password, True)
    systemd_getconfig_unit_name = 'esame-get-config.service'
    systemd_manager = None
#####################################################################


#####################################################################
# LOGGER Configuration
#####################################################################
# Values: DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_LEVEL = logging.INFO
LOG_FORMAT = ('%(levelname) -10s %(asctime)s: %(message)s')

LOGGER = logging.getLogger(__name__)
#####################################################################


#####################################################################
'''
Creates an object to interact with Systemd via DBUS
'''
#####################################################################
def create_systemd_manager():
    bus = dbus.SystemBus()
    # Get systemd reference
    systemd = bus.get_object(
        'org.freedesktop.systemd1',
        '/org/freedesktop/systemd1')
    # Create systemd manager
    systemd_manager = dbus.Interface(
        systemd,
        'org.freedesktop.systemd1.Manager')
    return systemd_manager


def random_sleep(min, max):
    sleep_secs = randint(min, max)
    LOGGER.info('Sleep %d seconds' % sleep_secs)
    sleep(sleep_secs)


'''
On message callback
'''
def on_message_callback(ch, method, properties, body):
    LOGGER.info('Received message \'%s\'' % body.decode('UTF-8'))

    # Sleep a random number of seconds (between 1 and 10)
    # to avoid all machines reboot at the same time
    random_sleep(1, 10)
    LOGGER.info('Starting Systemd service \'%s\'' %
        Config.systemd_getconfig_unit_name)
    start_unit = Config.systemd_manager.StartUnit(Config.systemd_getconfig_unit_name, 'fail')


'''
Main flow
'''
def main():
    logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT)

    # If we don't use AMQP, just exit (if the variable is not set, set it to 0)
    ESAME_AMQP = os.environ.get('ESAME_AMQP', '0')
    LOGGER.info('ESAME_AMQP=%s', ESAME_AMQP)

    if ESAME_AMQP != '1':
        exit(0)

    Config.systemd_manager = create_systemd_manager()

    if Config.systemd_manager == None:
        exit(0)

    # Main loop
    while True:
        try:
            connection = pika.BlockingConnection(
                pika.ConnectionParameters(
                    host=Config.server_host,
                    port=Config.server_port,
                    virtual_host=Config.server_virtualhost,
                    credentials=Config.server_credentials
                )
            )
            LOGGER.info('Connected to %s on port %d' % (Config.server_host, Config.server_port))
            channel = connection.channel()
            '''
            The publisher declares the exchange
            Here we just check it exists before binding a queue to it
            '''
            check_exchange = channel.exchange_declare(exchange=Config.server_exchange_name, exchange_type=Config.server_exchange_type, passive=True)
            LOGGER.info("check_exchange.method = %s", check_exchange.method)
            if type(check_exchange.method) is pika.spec.Exchange.DeclareOk:
                result = channel.queue_declare(queue='', exclusive=True)
                queue_name = result.method.queue
                channel.queue_bind(exchange=Config.server_exchange_name, queue=queue_name)
                LOGGER.info('Waiting for RabbitMQ messages')
                channel.basic_consume(
                    queue=queue_name, on_message_callback=on_message_callback, auto_ack=True)
                channel.start_consuming()
        # Intercept some error conditions and continue
        except (pika.exceptions.ConnectionClosedByBroker,
                pika.exceptions.AMQPChannelError,
                pika.exceptions.AMQPConnectionError,
                socket.gaierror,
                ValueError) as error:
            LOGGER.error('Error: %s', error.args)
            random_sleep(10, 20)
            continue
if __name__ == '__main__':
    main()