Skip to content
Snippets Groups Projects
Commit 5bef8a90 authored by sguazt's avatar sguazt
Browse files

Specify what version of Kafka/Redis (both server and python bindings) classes...

Specify what version of Kafka/Redis (both server and python bindings) classes have been tested. Other minor changes
parent 732012ca
No related branches found
No related tags found
No related merge requests found
##
## Python module for common measure store types and functions.
## Python module for storing measures to plain files.
##
## Copyright 2020 Marco Guazzone (marco.guazzone@gmail.com)
##
......@@ -37,7 +37,7 @@ class FileStore(measures_store.MeasuresStore):
def __init__(self, basepath=DEFAULT_BASEPATH):
super(FileStore, self).__init__()
self._basepath = basepath if not None else self.DEFAULT_BASEPATH
self._basepath = basepath if basepath is not None else self.DEFAULT_BASEPATH
#self._basepath_tmp = os.path.join(self.basepath_, 'tmp')
os.makedirs(self._basepath, exist_ok=True)
......
##
## Python module for common measure store types and functions.
## Python module for storing measures to Apache Kafka.
##
## Copyright 2020 Marco Guazzone (marco.guazzone@gmail.com)
##
......@@ -16,6 +16,7 @@
## limitations under the License.
##
#from confluent_kafka import KafkaException
from confluent_kafka import Producer
from core import measures_store
import json
......@@ -23,7 +24,15 @@ import logging
class KafkaStore(measures_store.MeasuresStore):
"""Measures store that interfaces with Apache Kafka."""
"""Measures store that interfaces with Apache Kafka
(https://kafka.apache.org/).
Tested with Apache Kafka v. 2.4.0 and Confluent's Kafka Python client v. 1.3.0.
References:
- Apache Kafka: https://kafka.apache.org/
- Confluent's Kafka Python client: https://github.com/confluentinc/confluent-kafka-python
"""
DEFAULT_BROKERS = ['localhost:9092']
DEFAULT_TOPIC = 'easycloud'
......@@ -32,14 +41,14 @@ class KafkaStore(measures_store.MeasuresStore):
def from_conf(cls, conf):
brokers = None
if 'brokers' in conf:
brokers = conf['brokers']
brokers = conf['brokers'].split(',')
topic = None
if 'topic' in conf:
topic = conf['topic']
return cls(brokers=brokers, topic=topic)
def __init__(self, brokers=DEFAULT_BROKERS, topic=DEFAULT_TOPIC):
self._brokers = brokers if not None else self.DEFAULT_BROKERS
self._brokers = brokers if brokers is not None else self.DEFAULT_BROKERS
self._topic = topic if topic is not None else self.DEFAULT_TOPIC
self._prod = Producer({'bootstrap.servers': ','.join(self._brokers)})
......@@ -76,7 +85,9 @@ class KafkaStore(measures_store.MeasuresStore):
msg = self._make_message(measure)
try:
self._prod.produce(self._topic, key = msg_key, value = msg, callback=self._delivery_callback)
#TODO: handle failures...
#except KafkaException as e:
# logging.error("Produce message failed: {}".format(e))
except BufferError:
logging.debug('Local producer queue is full ({} messages awaiting delivery): try again'.format(len(self._prod)))
......
##
## Python module for common measure store types and functions.
## Python module for storing measures to Redis.
##
## Copyright 2020 Marco Guazzone (marco.guazzone@gmail.com)
##
......@@ -25,7 +25,11 @@ import logging
class RedisStore(measures_store.MeasuresStore):
"""Measures store that interfaces with Redis (https://redis.io).
Tested with Redis v. 5.0.7.
Tested with Redis v. 5.0.7 and redis-py Python client v. 3.4.1.
References:
- Redis: https://redis.io
- redis-py Python client: https://github.com/andymccurdy/redis-py
"""
DEFAULT_URL = 'redis://localhost:6379/'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment