Hey all , I know it has been a long time since I posted any articles on my site due to and extremely busy schedule. But I wanted to start again giving out my experience so that someone can benefit from that.
In the last week or so I have been tasked to create an event driven architecture to handle 10000+ EPS (events per second). The platform I chose was to use message queue that will act as the main bus for all messages to be streams. For this purpose I have used RabbitMQ and pika (as the system is written in Python). One of the key challenges was to create an asynchronous publisher which had to be thread safe for 10000+ EPS. Searching over the net I could not find any good resource for this but found several supporting articles that helped me to build this out.
Following is a simple python pika publisher to publish messages asynchronously and in a thread safe environment.
import logging
import pika
import json
import time
from logging.handlers import RotatingFileHandler
from threading import Thread
class Publisher(Thread):
def __init__(self, RABBITMQ_SETTINGS,LOG4PY_SETTINGS):
Thread.__init__(self)
self.logger = logging.getLogger("Publisher.py")
self.connection = None
self.channel = None
self._deliveries = []
self._acked = 0
self._nacked = 0
self._message_number = 0
self._stopping = False
self.queue = 'alienvault_replicate'
self.routing_key = 'alienvault_replicate'
self.exchange = 'alienvault_replicate'
self.message = None
self.ready = False
self._closing = False
log4py_file = LOG4PY_SETTINGS['log4py_file']
log4py_log_level = LOG4PY_SETTINGS['log4py_log_level']
self.PUBLISH_INTERVAL=0.1
self.RABBITMQ_SETTINGS =RABBITMQ_SETTINGS
if log4py_log_level == 'DEBUG':
self.log_level = logging.DEBUG
elif log4py_log_level == 'INFO':
self.log_level = logging.INFO
elif log4py_log_level == 'WARN':
self.log_level = logging.WARN
elif log4py_log_level == 'ERROR':
self.log_level = logging.ERROR
self.logger.setLevel(self.log_level)
# create console handler and set level to debug
rfh = RotatingFileHandler(filename=log4py_file, mode='a', maxBytes=100*1024*1024,backupCount=2)
rfh.setLevel(self.log_level)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
rfh.setFormatter(formatter)
# add ch to logger
self.logger.addHandler(rfh)
amqp_url = 'amqp://'+self.RABBITMQ_SETTINGS['user']+':'+self.RABBITMQ_SETTINGS['passwd']+'@'+self.RABBITMQ_SETTINGS['host']+':'+str('5672')+'/%2F'
self._url = amqp_url
def is_ready(self):
return self.ready
def set_message(self,message):
self.message = message
self.logger.info('Message set to publish to {0}'.format(self.message))
def connect(self):
self.logger.info('Connecting to %s', self._url)
return pika.SelectConnection(pika.URLParameters(self._url),
self.onconnection_open)
def close_connection(self):
self.logger.info('Closing connection')
self._closing = True
self.connection.close()
def add_onconnection_close_callback(self):
self.logger.info('Adding connection close callback')
self.connection.add_on_close_callback(self.onconnection_closed)
def onconnection_closed(self, connection, reply_code, reply_text):
self.channel = None
if self._closing:
self.connection.ioloop.stop()
else:
self.logger.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
self.ready = False
self.reconnect()
def onconnection_open(self, unusedconnection):
self.logger.info('Connection opened')
self.add_onconnection_close_callback()
self.openchannel()
def reconnect(self):
self.connection.ioloop.stop()
self.connection = self.connect()
self.connection.ioloop.start()
def add_onchannel_close_callback(self):
self.logger.info('Adding channel close callback')
self.channel.add_on_close_callback(self.onchannel_closed)
def onchannel_closed(self, channel, reply_code, reply_text):
self.logger.warning('Channel was closed: (%s) %s', reply_code, reply_text)
if not self._closing:
self.connection.close()
def onchannel_open(self, channel):
self.logger.info('Channel opened')
self.channel = channel
self.add_onchannel_close_callback()
self.setup_exchange(self.exchange)
def setup_exchange(self, exchange_name):
self.logger.info('Declaring exchange %s', exchange_name)
self.channel.exchange_declare(self.on_exchange_declareok,
exchange_name)
def on_exchange_declareok(self, unused_frame):
self.logger.info('Exchange declared')
self.setup_queue(self.queue)
def setup_queue(self, queue_name):
self.logger.info('Declaring queue %s', queue_name)
self.channel.queue_declare(self.on_queue_declareok, queue_name,durable=True)
def on_queue_declareok(self, method_frame):
self.logger.info('Binding %s to %s with %s',
self.exchange, self.queue, self.routing_key)
self.channel.queue_bind(self.on_bindok, self.queue,
self.exchange, self.routing_key)
def publish_message(self):
if self._stopping:
return
if self.message == None:
return
try:
properties = pika.BasicProperties(delivery_mode = 1)
self.channel.basic_publish(self.exchange, self.routing_key,
self.message,
properties)
self.logger.info('Published message # %i', self._message_number)
except Exception as err:
import trace
self.logger.info("Error in sending message ... {0}".format(err.message))
self.ready = False
def start_publishing(self):
self.logger.info('Issuing consumer related RPC commands')
self.ready = True
self.publish_message()
def on_bindok(self, unused_frame):
self.logger.info('Queue bound')
self.start_publishing()
def closechannel(self):
self.logger.info('Closing the channel')
if self.channel:
self.channel.close()
def openchannel(self):
self.logger.info('Creating a new channel')
self.connection.channel(on_open_callback=self.onchannel_open)
def run(self):
self.connection = self.connect()
self.connection.ioloop.start()
def stop(self):
self.logger.info('Stopping')
self._stopping = True
self.closechannel()
self.close_connection()
self.connection.ioloop.start()
self.logger.info('Stopped')
if __name__ == '__main__':
try:
RABBITMQ_SETTINGS = {"user":"user","passwd":"pw","host":"xxx.xxx.xxx.xxx"}
LOG4PY_SETTINGS = {"log4py_file":"./log4py.log","log4py_log_level":"INFO"}
publisher = Publisher(RABBITMQ_SETTINGS,LOG4PY_SETTINGS)
publisher.start()
for i in range(1 , 100000):
message = '{"message":"Hello Fellasss...."'+str(i)+'}'
publisher.set_message(message)
while not publisher.is_ready():
time.sleep(.001)
publisher.publish_message()
publisher.stop()
except KeyboardInterrupt:
publisher.stop()