Skip to content
Snippets Groups Projects
Commit b31fd522 authored by sguazt's avatar sguazt
Browse files

Added Redis measures store

parent 5dcd8884
No related branches found
No related tags found
No related merge requests found
##
## Python module for common measure store types and functions.
##
## Copyright 2020 Marco Guazzone (marco.guazzone@gmail.com)
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
import redis
from core import measures_store
import json
import logging
class RedisStore(measures_store.MeasuresStore):
"""Measures store that interfaces with Redis (https://redis.io).
Tested with Redis v. 5.0.7.
"""
DEFAULT_URL = 'redis://localhost:6379/'
DEFAULT_DB = None
@classmethod
def from_conf(cls, conf):
#FIXME: the Redis client will throw an exception if kwargs contains unrecognized options
# Thus, for now, we are commenting this code below and we just pass only the url and db options to Redis
# In the future, we will look for the following parameters in the conf explicitly:
# host='localhost',
# port=6379,
# db=0, password=None,
# socket_timeout=None,
# socket_connect_timeout=None,
# socket_keepalive=None,
# socket_keepalive_options=None,
# connection_pool=None,
# unix_socket_path=None,
# encoding='utf-8',
# encoding_errors='strict',
# charset=None,
# errors=None,
# decode_responses=False,
# retry_on_timeout=False,
# ssl=False,
# ssl_keyfile=None,
# ssl_certfile=None,
# ssl_cert_reqs='required',
# ssl_ca_certs=None,
# ssl_check_hostname=False,
# max_connections=None,
# single_connection_client=False,
# health_check_interval=0,
# client_name=None,
# username=None
# See https://redis.io and https://github.com/andymccurdy/redis-py/ for the meaning of those parameters.
url = None
db = None
kwargs = dict()
#logging.debug("FROM CONF {}".format(conf))
#for key in conf:
# if key == 'url':
# url = conf['url']
# elif key == 'db':
# db = conf['db']
# else:
# kwargs[key] = conf[key]
if 'url' in conf:
url = conf['url']
if 'db' in conf:
db = conf['db']
return cls(url=url, db=db, **kwargs)
def __init__(self, url=DEFAULT_URL, db=DEFAULT_DB, **kwargs):
self._url = url if url is not None else self.DEFAULT_URL
self._db = db if db is not None else self.DEFAULT_DB
self._redis = redis.Redis.from_url(url=self._url, db=self._db, **kwargs)
def put(self, measure):
msg_key = self._measure_group_encode(self._measure_group(measure))
msg = self._make_message(measure)
self._redis.rpush(msg_key, msg)
logging.debug('Message added to key {}'.format(msg_key))
def mput(self, measures):
with self._redis.pipeline(transaction=False) as pipe: # NOTE: we don't need transactional pipelines, so we disable transactions to improve performance
# Groups measures by object namespace, object ID and metric to optimize writes on CSV files
measures_groups = dict() # {namespace => {object-id => {metric => [measure1, measure2, ...]}}}
for measure in measures:
group_id = self._measure_group_encode(self._measure_group(measure))
if group_id not in measures_groups:
measures_groups[group_id] = []
measures_groups[group_id].append(measure)
for group_id in measures_groups:
msg_key = group_id
#FIXME: as a possible optimization, we could put all the
# *encoded* measures belonging to group_id in a list and
# then call rpush by passing that list as argument; the
# drawback is the additional memory consumed by that new
# list.
for measure in measures_groups[group_id]:
msg = self._make_message(measure)
pipe.rpush(msg_key, msg)
logging.debug('Messages added to key {} in pipeline'.format(msg_key))
pipe.execute()
def _make_message(self, measure):
return json.dumps({'timestamp': measure.timestamp.isoformat(),
'object-id': measure.object_id,
'object_ns': measure.object_ns,
'metric': measure.metric,
'unit': measure.unit,
'value': measure.value})
......@@ -60,7 +60,7 @@ minimum_positive = 3
# As shown in the above example, each subsection *must* contain at least two special options, namely:
# - module, whose value represents the fully qualified module name containing the class that implements a given measures store, and
# - class, whose value represents the name of the class that implements a given measures store.
measures_stores = file, kafka
measures_stores = file, kafka, redis
# Settings related to file measures stores
......@@ -74,4 +74,20 @@ basepath = /tmp/easycloud
module = core.measures_store.kafka
class = KafkaStore
# A comma-separated list of broker servers
# Default: localhost:9092
brokers = localhost:9092
# The Kafka topic where to publish measures collected from cloud platforms
# Defaut: easycloud
topic = easycloud
[measures_store.redis]
module = core.measures_store.redis
class = RedisStore
# The URL to the Redis server (see `redis.from_url()` at https://redis-py.readthedocs.io/en/latest/)
# Default: redis://localhost:6379/
url = redis://localhost:6379/
# The db parameter is the database number. You can manage multiple databases in Redis at once, and each is identified by an integer. The max number of databases is 16 by default
# Default: 0
db = 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment