Connecting RabbitMQ with PIKA for 10000 EPS


Hey all , I know it has been a long time since I posted any articles on my site due to and extremely busy schedule. But I wanted to start again giving out my experience so that someone can benefit from that.

In the last week or so I have been tasked to create an event driven architecture to handle 10000+ EPS (events per second). The platform I chose  was to use message queue that will act as the main bus for all messages to be streams. For this purpose I have used RabbitMQ and pika (as the system is written in Python). One of the key challenges was to create an asynchronous publisher which had to be thread safe for 10000+ EPS. Searching over the net I could not find any good resource for this but found several supporting articles that helped me to build this out.

Following is a simple python pika publisher to publish messages asynchronously and in a thread safe environment.

import logging
import pika
import json
import time
from logging.handlers import RotatingFileHandler
from threading import Thread



class Publisher(Thread):

    def __init__(self, RABBITMQ_SETTINGS,LOG4PY_SETTINGS):
        Thread.__init__(self)       
        self.logger = logging.getLogger("Publisher.py")
        self.connection = None
        self.channel = None
        self._deliveries = []
        self._acked = 0
        self._nacked = 0
        self._message_number = 0
        self._stopping = False
        self.queue = 'alienvault_replicate'
        self.routing_key = 'alienvault_replicate'
        self.exchange = 'alienvault_replicate'
        self.message = None
        self.ready = False
        self._closing = False
        log4py_file  = LOG4PY_SETTINGS['log4py_file']
        log4py_log_level = LOG4PY_SETTINGS['log4py_log_level']
        self.PUBLISH_INTERVAL=0.1
        self.RABBITMQ_SETTINGS =RABBITMQ_SETTINGS

        if log4py_log_level == 'DEBUG':
           self.log_level = logging.DEBUG
        elif log4py_log_level == 'INFO': 
           self.log_level = logging.INFO
        elif log4py_log_level == 'WARN':
           self.log_level = logging.WARN
        elif log4py_log_level == 'ERROR': 
           self.log_level = logging.ERROR

        self.logger.setLevel(self.log_level)
        # create console handler and set level to debug
        rfh = RotatingFileHandler(filename=log4py_file, mode='a', maxBytes=100*1024*1024,backupCount=2)
        rfh.setLevel(self.log_level)
        # create formatter
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    
        # add formatter to ch
        rfh.setFormatter(formatter)
    
        # add ch to logger
        self.logger.addHandler(rfh)
        
        amqp_url = 'amqp://'+self.RABBITMQ_SETTINGS['user']+':'+self.RABBITMQ_SETTINGS['passwd']+'@'+self.RABBITMQ_SETTINGS['host']+':'+str('5672')+'/%2F'
        self._url = amqp_url
        
    def is_ready(self):
        return self.ready
        
    def set_message(self,message):
        self.message = message
        self.logger.info('Message set to publish to {0}'.format(self.message))
    
    
    def connect(self):
        self.logger.info('Connecting to %s', self._url)
        return pika.SelectConnection(pika.URLParameters(self._url),
                                     self.onconnection_open)
        
    def close_connection(self):
        self.logger.info('Closing connection')
        self._closing = True
        self.connection.close()

    def add_onconnection_close_callback(self):
        self.logger.info('Adding connection close callback')
        self.connection.add_on_close_callback(self.onconnection_closed)

    def onconnection_closed(self, connection, reply_code, reply_text):

        self.channel = None
        if self._closing:
            self.connection.ioloop.stop()
        else:
            self.logger.warning('Connection closed, reopening in 5 seconds: (%s) %s',
                           reply_code, reply_text)
            self.ready = False                   
            self.reconnect()


    def onconnection_open(self, unusedconnection):
        self.logger.info('Connection opened')
        self.add_onconnection_close_callback()
        self.openchannel()

    def reconnect(self):
        self.connection.ioloop.stop()
        self.connection = self.connect()
        self.connection.ioloop.start()

    def add_onchannel_close_callback(self):
        self.logger.info('Adding channel close callback')
        self.channel.add_on_close_callback(self.onchannel_closed)

    def onchannel_closed(self, channel, reply_code, reply_text):
        self.logger.warning('Channel was closed: (%s) %s', reply_code, reply_text)
        if not self._closing:
            self.connection.close()

    def onchannel_open(self, channel):
        self.logger.info('Channel opened')
        self.channel = channel
        self.add_onchannel_close_callback()
        self.setup_exchange(self.exchange)

    def setup_exchange(self, exchange_name):
        self.logger.info('Declaring exchange %s', exchange_name)
        self.channel.exchange_declare(self.on_exchange_declareok,
                                       exchange_name)

    def on_exchange_declareok(self, unused_frame):
        self.logger.info('Exchange declared')
        self.setup_queue(self.queue)

    def setup_queue(self, queue_name):
        self.logger.info('Declaring queue %s', queue_name)
        self.channel.queue_declare(self.on_queue_declareok, queue_name,durable=True)

    def on_queue_declareok(self, method_frame):
        self.logger.info('Binding %s to %s with %s',
                    self.exchange, self.queue, self.routing_key)
        self.channel.queue_bind(self.on_bindok, self.queue,
                                 self.exchange, self.routing_key)


    def publish_message(self):
        if self._stopping:
            return
        if self.message == None:
            return

        try:
            properties = pika.BasicProperties(delivery_mode = 1)
    
            self.channel.basic_publish(self.exchange, self.routing_key,
                                        self.message,
                                        properties)
            self.logger.info('Published message # %i', self._message_number)
        except Exception as err:
            import trace
            self.logger.info("Error in sending message ... {0}".format(err.message))
            self.ready = False

    def start_publishing(self):
        self.logger.info('Issuing consumer related RPC commands')
        self.ready = True
        self.publish_message()

    def on_bindok(self, unused_frame):
        self.logger.info('Queue bound')
        self.start_publishing()

    def closechannel(self):
        self.logger.info('Closing the channel')
        if self.channel:
            self.channel.close()

    def openchannel(self):
        self.logger.info('Creating a new channel')
        self.connection.channel(on_open_callback=self.onchannel_open)

    def run(self):
        self.connection = self.connect()
        self.connection.ioloop.start()


    def stop(self):
        self.logger.info('Stopping')
        self._stopping = True
        self.closechannel()
        self.close_connection()
        self.connection.ioloop.start()
        self.logger.info('Stopped')

if __name__ == '__main__':
    try:
        RABBITMQ_SETTINGS = {"user":"user","passwd":"pw","host":"xxx.xxx.xxx.xxx"}
        LOG4PY_SETTINGS = {"log4py_file":"./log4py.log","log4py_log_level":"INFO"}
        
        publisher = Publisher(RABBITMQ_SETTINGS,LOG4PY_SETTINGS)
        publisher.start()
        for i in range(1 , 100000):
            message = '{"message":"Hello Fellasss...."'+str(i)+'}'
            publisher.set_message(message)
            while not publisher.is_ready():
                time.sleep(.001)
            publisher.publish_message()
        publisher.stop()    
    except KeyboardInterrupt:
        publisher.stop()


Leave a Reply

Your email address will not be published. Required fields are marked *