This is an old revision of the document!
Quality measurement system example
We send states to a message queue named 'qualityQueue', which stores simple status messages of a quality assurance system. Create a multi-component application that communicates with the message queue through two clients in the following way:
- The first client connects to the sensor placed on the measuring machine and randomly sends GOOD, EXCELLENT, and WRONG messages to the 'qualityQueue' message queue every second.
- Create a component that reads and collects the 'GOOD', 'EXCELLENT', and 'WRONG' messages from the qualityQueue queue. After receiving 10 identical messages, it sends a message to the 'qualityStatistics' queue indicating that it has processed 10 messages of a certain quality.
- Create a second client that reads the statistics from the 'qualityStatistics' queue and prints to the console, for example, '10 'WRONG' messages has been processed'.
Let's try to solve the task with http://docker.iit.uni-miskolc.hu framework.
Starting RabbitMQ in Docker
To solve the task, it is recommended to start multiple instances (terminals). The first terminal will start the RabbitMQ server. Open a new terminal (node 1) and run the following command:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management
After running the command above, the RabbitMQ management console will be accessible on port 15672, using the credentials guest/guest. In the left-side menu, you can find the internal IP address (10.x.y.z) of node1, which can be used in the clients and processors.
Create another terminal and execute the following command:
pip install pika
This command installs the pika module, which provides the connection to RabbitMQ.
Create the quality_message_sender.py:
Use the appropriate IP address in the init(self): method.
import pika import random import time class QualitySender: def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) self.channel = self.connection.channel() self.channel.queue_declare(queue='qualityQueue') def start_sending(self): qualities = ['GOOD', 'EXCELLENT', 'WRONG'] while True: quality = random.choice(qualities) self.channel.basic_publish(exchange='', routing_key='qualityQueue', body=quality) print(f'Sent quality: {quality}') time.sleep(1) def close_connection(self): self.connection.close() if __name__ == '__main__': sender = QualitySender() try: sender.start_sending() except KeyboardInterrupt: sender.close_connection()