Newer
Older
#!/usr/bin/env python3
import pika
import dbus
from random import randint
from time import sleep
import os
import socket
import logging
#####################################################################
#####################################################################
class Config:
server_host = 'rabbitmq.edu-al.unipmn.it';
server_port = 5672;
server_user = 'labmanager-client';
server_password = '3aB7=2zV9:1jN3-4hE2';
server_virtualhost = 'labmanager';
server_exchange_name = 'labmanager';
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
server_exchange_type = 'x-recent-history'; # It's a plugin. Enable with 'rabbitmq-plugins enable rabbitmq_recent_history_exchange'
server_credentials = pika.PlainCredentials(server_user, server_password, True)
systemd_getconfig_unit_name = 'esame-get-config.service'
systemd_manager = None
#####################################################################
#####################################################################
# LOGGER Configuration
#####################################################################
# Values: DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_LEVEL = logging.INFO
LOG_FORMAT = ('%(levelname) -10s %(asctime)s: %(message)s')
LOGGER = logging.getLogger(__name__)
#####################################################################
#####################################################################
'''
Creates an object to interact with Systemd via DBUS
'''
#####################################################################
def create_systemd_manager():
bus = dbus.SystemBus()
# Get systemd reference
systemd = bus.get_object(
'org.freedesktop.systemd1',
'/org/freedesktop/systemd1')
# Create systemd manager
systemd_manager = dbus.Interface(
systemd,
'org.freedesktop.systemd1.Manager')
return systemd_manager
def random_sleep(min, max):
sleep_secs = randint(min, max)
LOGGER.info('Sleep %d seconds' % sleep_secs)
sleep(sleep_secs)
'''
On message callback
'''
def on_message_callback(ch, method, properties, body):
LOGGER.info('Received message \'%s\'' % body.decode('UTF-8'))
# Sleep a random number of seconds (between 1 and 10)
# to avoid all machines reboot at the same time
random_sleep(1, 10)
LOGGER.info('Starting Systemd service \'%s\'' %
Config.systemd_getconfig_unit_name)
start_unit = Config.systemd_manager.StartUnit(Config.systemd_getconfig_unit_name, 'fail')
'''
Main flow
'''
def main():
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT)
# If we don't use AMQP, just exit (if the variable is not set, set it to 0)
ESAME_AMQP = os.environ.get('ESAME_AMQP', '0')
LOGGER.info('ESAME_AMQP=%s', ESAME_AMQP)
if ESAME_AMQP != '1':
exit(0)
Config.systemd_manager = create_systemd_manager()
if Config.systemd_manager == None:
exit(0)
# Main loop
while True:
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=Config.server_host,
port=Config.server_port,
virtual_host=Config.server_virtualhost,
credentials=Config.server_credentials
)
)
LOGGER.info('Connected to %s on port %d' % (Config.server_host, Config.server_port))
'''
The publisher declares the exchange
Here we just check it exists before binding a queue to it
'''
check_exchange = channel.exchange_declare(exchange=Config.server_exchange_name, exchange_type=Config.server_exchange_type, passive=True)
LOGGER.info("check_exchange.method = %s", check_exchange.method)
if type(check_exchange.method) is pika.spec.Exchange.DeclareOk:
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange=Config.server_exchange_name, queue=queue_name)
LOGGER.info('Waiting for RabbitMQ messages')
channel.basic_consume(
queue=queue_name, on_message_callback=on_message_callback, auto_ack=True)
# Intercept some error conditions and continue
except (pika.exceptions.ConnectionClosedByBroker,
pika.exceptions.AMQPChannelError,
pika.exceptions.AMQPConnectionError,
socket.gaierror,
ValueError) as error:
LOGGER.error('Error: %s', error.args)