data science tutorials and snippets prepared by greysweater42
RabbitMQ is a message broker, which means that it enables communication between services. Probably the most popular way of communication is TCP, but it has a couple of drawbacks, which rabbitmq solves, for example:
and also provides additional features:
In order to provide these functionalities RabbitMQ implements AMQP (Advanced Message Queueing Protocol) in which the producer does not produce directly to a message queue, but to an exchange, which further redirects the messages to queues. How does an exchange know which queues to send messages to? The user defines bindings, which provide this information, and each of the bindings has its own id called binding key.
The message can be distrubuted to queues in a couple of ways:
Other remarks:
Let’s set RabbitMQ up inside of a docker container on local machine:
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3
Now you are able to access the web UI at http://localhost:15672/, and log in with username: guest, password: guest.
In most of my blog posts I describe example usage here, but RabbitMQ already wrote an excellent tutorial, so I would not write a better one anyway. Here’s the link.
And here are snippets copy-pasted from the this tutorial, with a minor refactor:
producer:
import pika
parameters = pika.ConnectionParameters('localhost')
with pika.BlockingConnection(parameters) as connection:
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(exchange='', routing_key='task_queue', body='Hello World!')
print(" [x] Sent 'Hello World!'")
consumer:
import pika
import time
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
parameters = pika.ConnectionParameters('localhost')
with pika.BlockingConnection(parameters) as connection:
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()