Skip to content
Snippets Groups Projects
Commit 23aa5a06 authored by Alberto LIVIO BECCARIA's avatar Alberto LIVIO BECCARIA
Browse files

Servizio AMQP (ex MQTT)

parent 5f7b2b27
No related branches found
No related tags found
No related merge requests found
......@@ -224,8 +224,8 @@ start() {
# reset and remove exam configuration files
echo "$NORMAL_CONFIG_NAME" > /local/config_name
echo "" > /local/examid
rm /local/esame-machine.conf >/dev/null 2>&1
rm /local/esame-machine.conf.prev >/dev/null 2>&1
#rm /local/esame-machine.conf >/dev/null 2>&1
#rm /local/esame-machine.conf.prev >/dev/null 2>&1
rm /local/iptables >/dev/null 2>&1
rm /local/iptables.prev >/dev/null 2>&1
rm /local/mk-homedir >/dev/null 2>&1
......@@ -250,6 +250,14 @@ start() {
ESAME_USB_GUARD=${RES_USB_GUARD}
fi
# extract ESAME_AMQP from configuration file
ESAME_AMQP=0
RES_AMQP=`awk -F= -v key="ESAME_AMQP" '$1==key {print $2}' /local/esame-machine.conf | sed "s/\"//g"`
if [[ "${RES_AMQP}" != "" ]]; then
ESAME_AMQP=${RES_AMQP}
fi
if [[ -n "${CONFIG_NAME}" ]]; then
echo "Configuration: ${CONFIG_NAME}"
if [ "${EXAM_ID}" != "" ]; then
......@@ -287,8 +295,8 @@ start() {
# if [[ "$CONFIG_NAME" == "$NORMAL_CONFIG_NAME" ]]; then
if [[ "${EXAM_ID}" == "" ]]; then
# remove some files used by exam mode
rm /local/esame-machine.conf >/dev/null 2>&1
rm /local/esame-machine.conf.prev >/dev/null 2>&1
#rm /local/esame-machine.conf >/dev/null 2>&1
#rm /local/esame-machine.conf.prev >/dev/null 2>&1
rm /local/iptables >/dev/null 2>&1
rm /local/iptables.prev >/dev/null 2>&1
rm /local/mk-homedir >/dev/null 2>&1
......@@ -319,6 +327,16 @@ start() {
fi
fi
# if we use AMQP server, just disable esame-get-config.timer
if [[ ${ESAME_AMQP} == 1 ]]; then
systemctl disable esame-get-config.timer
systemctl stop esame-get-config.timer
# otherwise, enable it
else
systemctl enable esame-get-config.timer
systemctl start esame-get-config.timer
fi
# choose the correct target
# we have just booted/rebooted, update the current configuration via web service
updateCurrentConfigId
......
#!/bin/bash
#/usr/local/bin/lm-subscriber -b penelope.di.unipmn.it -t "lab98/config" -c "echo reboot..." -f /local/config_name \
/usr/local/bin/lm-subscriber -b penelope.di.unipmn.it -t "lab98/config" -c "/usr/libexec/labmanager/get-config start" -f /local/config_name \
--port 8883 \
--auth-user sub_client \
--auth-pwd zse45rdx \
--ca-cert /etc/ssl/certs/mqtt_ca.crt \
-l DEBUG
......@@ -3,76 +3,142 @@ import pika
import dbus
from random import randint
from time import sleep
import os
import socket
import logging
#####################################################################
# Conguration class
#####################################################################
class Config:
server_host = 'rabbitmq.edu-al.unipmn.it';
server_port = 5672;
server_user = 'labmanager-client';
server_password = '3aB7=2zV9:1jN3-4hE2';
server_virtualhost = 'labmanager';
server_exchange_name = 'labmanager-test';
server_exchange_type = 'x-recent-history'; # It's a plugin. Enable with 'rabbitmq-plugins enable rabbitmq_recent_history_exchange'
server_credentials = pika.PlainCredentials(server_user, server_password, True)
systemd_getconfig_unit_name = 'esame-get-config.service'
systemd_manager = None
#####################################################################
#####################################################################
# LOGGER Configuration
#####################################################################
# Values: DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_LEVEL = logging.INFO
LOG_FORMAT = ('%(levelname) -10s %(asctime)s: %(message)s')
LOGGER = logging.getLogger(__name__)
#####################################################################
#####################################################################
'''
Creates an object to interact with Systemd via DBUS
'''
#####################################################################
def create_systemd_manager():
bus = dbus.SystemBus()
# Get systemd reference
systemd = bus.get_object(
'org.freedesktop.systemd1',
'/org/freedesktop/systemd1')
# Create systemd manager
systemd_manager = dbus.Interface(
systemd,
'org.freedesktop.systemd1.Manager')
return systemd_manager
def random_sleep(min, max):
sleep_secs = randint(min, max)
LOGGER.info('Sleep %d seconds' % sleep_secs)
sleep(sleep_secs)
'''
On message callback
'''
def on_message_callback(ch, method, properties, body):
LOGGER.info('Received message \'%s\'' % body.decode('UTF-8'))
# Sleep a random number of seconds (between 1 and 10)
# to avoid all machines reboot at the same time
random_sleep(1, 10)
LOGGER.info('Starting Systemd service \'%s\'' %
Config.systemd_getconfig_unit_name)
start_unit = Config.systemd_manager.StartUnit(Config.systemd_getconfig_unit_name, 'fail')
'''
Main flow
'''
def main():
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT)
# If we don't use AMQP, just exit (if the variable is not set, set it to 0)
ESAME_AMQP = os.environ.get('ESAME_AMQP', '0')
LOGGER.info('ESAME_AMQP=%s', ESAME_AMQP)
if ESAME_AMQP != '1':
exit(0)
Config.systemd_manager = create_systemd_manager()
if Config.systemd_manager == None:
exit(0)
# Main loop
while True:
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=Config.server_host,
port=Config.server_port,
virtual_host=Config.server_virtualhost,
credentials=Config.server_credentials
)
)
server_host = 'rabbitmq.edu-al.unipmn.it';
server_port = 5672;
server_username = 'labmanager-client';
server_pwd = '3aB7=2zV9:1jN3-4hE2';
server_vhost = 'labmanager';
exchange_name = 'labmanager-test';
exchange_type = 'x-recent-history'; # It's a plugin. Enable with 'rabbitmq-plugins enable rabbitmq_recent_history_exchange'
systemd_getconfig_unit_name = 'esame-get-config.service'
bus = dbus.SystemBus()
systemd = bus.get_object(
'org.freedesktop.systemd1',
'/org/freedesktop/systemd1'
)
manager = dbus.Interface(
systemd,
'org.freedesktop.systemd1.Manager'
)
LOGGER.info('Connected to %s on port %d' % (Config.server_host, Config.server_port))
server_credentials = pika.PlainCredentials(server_username, server_pwd, True)
channel = connection.channel()
while True:
try:
'''
The publisher declares the exchange
Here we just check it exists before binding a queue to it
'''
check_exchange = channel.exchange_declare(exchange=Config.server_exchange_name, exchange_type=Config.server_exchange_type, passive=True)
LOGGER.info("check_exchange.method = %s", check_exchange.method)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=server_host,
port=server_port,
virtual_host=server_vhost,
credentials=server_credentials
)
)
if type(check_exchange.method) is pika.spec.Exchange.DeclareOk:
channel = connection.channel()
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
'''
L'exchange lo dichiara il publisher
TODO: qui serve sapere se esiste gia' e in caso negativo aspettare e ripetere (valutare se usare passive=true)
'''
# channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)
channel.queue_bind(exchange=Config.server_exchange_name, queue=queue_name)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
LOGGER.info('Waiting for RabbitMQ messages')
channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.basic_consume(
queue=queue_name, on_message_callback=on_message_callback, auto_ack=True)
print('Waiting for RabbitMQ messages')
channel.start_consuming()
def callback(ch, method, properties, body):
# Sleep a random number of seconds (between 1 and 10)
# to avoid all machines reboot at the same time
sleep(randint(1,10))
print("RabbitMQ message: %s" % body.decode('UTF-8'))
start_unit = manager.StartUnit(systemd_getconfig_unit_name, 'fail')
# Intercept some error conditions and continue
except (pika.exceptions.ConnectionClosedByBroker,
pika.exceptions.AMQPChannelError,
pika.exceptions.AMQPConnectionError,
socket.gaierror,
ValueError) as error:
LOGGER.error('Error: %s', error.args)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
random_sleep(10, 20)
continue
channel.start_consuming()
except pika.exceptions.ConnectionClosedByBroker:
break
# Don't recover on channel errors
except pika.exceptions.AMQPChannelError:
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
continue
if __name__ == '__main__':
main()
......@@ -248,9 +248,11 @@ echo
echo -e "\e[1mEnable systemd services...\e[0m"
systemctl enable esame-*.service
systemctl enable esamesync-esame.service
# comment following 2 lines to use MQTT configuration change listener
systemctl enable esame-get-config.timer
systemctl start esame-get-config.timer
# The following 2 commands are not needed anymore as get-config at boot time
# will switch and activate the timer on and off when needed
# (according to AMQP configuration)
#systemctl enable esame-get-config.timer
#systemctl start esame-get-config.timer
echo
echo
......
......@@ -5,4 +5,5 @@ Wants=network-online.target
[Service]
Type=oneshot
EnvironmentFile=-/local/esame-machine.conf
ExecStart=/usr/libexec/labmanager/get-config start
[Unit]
Description=Subscriber to LabManager MQTT server
After=network-online.target esamesync-esame.service
Wants=network-online.target esamesync-esame.service
[Service]
Type=oneshot
RemainAfterExit=yes
EnvironmentFile=-/local/esame-machine.conf
ExecStart=/usr/libexec/labmanager/subscribe
#ExecStop=
[Install]
WantedBy=esame.target graphical.target esamekiosk.target esameshow.target
......@@ -5,7 +5,9 @@ Wants=network-online.target
[Service]
Type=oneshot
ExecStart=/usr/bin/python3 /usr/libexec/labmanager/rabbitmq/rabbitmq-client.py
RemainAfterExit=yes
EnvironmentFile=-/local/esame-machine.conf
ExecStart=/usr/bin/python3 /usr/libexec/labmanager/rabbitmq-client.py
[Install]
WantedBy=esame.target esamekiosk.target esameshow.target graphical.target
\ No newline at end of file
WantedBy=esame.target esamekiosk.target esameshow.target graphical.target
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment