==== Quality measurement system example ==== We send states to a message queue named 'qualityQueue', which stores simple status messages of a quality assurance system. Create a multi-component application that communicates with the message queue through two clients in the following way: - The first component connects to the sensor placed on the measuring machine and randomly sends **GOOD**, **EXCELLENT**, and **WRONG** messages to the '**qualityQueue**' message queue every second. - Create a component that reads and collects the '**GOOD**', '**EXCELLENT**', and '**WRONG**' messages from the qualityQueue queue. After receiving 10 identical messages, it sends a message to the '**qualityStatistics**' queue indicating that it has processed 10 messages of a certain quality. - Create a second component that reads the statistics from the '**qualityStatistics**' queue and prints to the console, for example, '10 'WRONG' messages has been processed'. flowchart TB MQ[("RabbitMQ Server\n(qualityQueue, qualityStatistics)")] Component1["Component 1\n(Sensor Data Sender)"] -->|sends GOOD/EXCELLENT/WRONG| MQ Component2["Component 2\n(Quality Message Consumer)"] -- collects messages --> MQ Component2 -->|sends batch of 10 messages| MQ Component3["Component 3\n(Statistics Consumer)"] -- receives and prints batches --> MQ subgraph Docker MQ end subgraph Components Component1 Component2 Component3 end classDef machine fill:#f9f,stroke:#333,stroke-width:2px; classDef clients fill:#ccf,stroke:#333,stroke-width:2px; class Docker machine; class Component1,Component2,Component3 clients; Let's try to solve the task with http://docker.iit.uni-miskolc.hu framework. === Starting RabbitMQ in Docker === To solve the task, it is recommended to start multiple instances (terminals). The first terminal will start the RabbitMQ server. Open a new terminal (node 1) and run the following command: docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management-alpine After running the command above, the **RabbitMQ** management console will be accessible on port **15672**, using the credentials //guest/guest//. In the left-side menu, you can find the internal IP address (**10.x.y.z**) of node1, which can be used in the clients and processors. Create another terminal and execute the following command: pip install pika This command installs the pika module, which provides the connection to **RabbitMQ**. Create the //quality_message_sender.py//: Use the appropriate IP address in the //init(self):// method. import pika import random import time class QualitySender: def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) self.channel = self.connection.channel() self.channel.queue_declare(queue='qualityQueue') def start_sending(self): qualities = ['GOOD', 'EXCELLENT', 'WRONG'] while True: quality = random.choice(qualities) self.channel.basic_publish(exchange='', routing_key='qualityQueue', body=quality) print(f'Sent quality: {quality}') time.sleep(1) def close_connection(self): self.connection.close() if __name__ == '__main__': sender = QualitySender() try: sender.start_sending() except KeyboardInterrupt: sender.close_connection() Let's create the //quality_message_consumer.py// file: (do not forget to create it in an other terminal, and run //pip install pika// and set the proper IP in //pika.ConnectionParameters()// ) import pika class QualityConsumer: def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) self.channel = self.connection.channel() self.channel.queue_declare(queue='qualityQueue') self.channel.queue_declare(queue='qualityStatistics') self.message_count = {'GOOD': 0, 'EXCELLENT': 0, 'WRONG': 0} def start_consuming(self): def callback(ch, method, properties, body): quality = body.decode() self.message_count[quality] += 1 print(f'Received quality: {quality}') if self.is_batch_completed(): self.send_statistics() self.reset_message_count() self.channel.basic_consume(queue='qualityQueue', on_message_callback=callback, auto_ack=True) self.channel.start_consuming() def send_statistics(self): for quality, count in self.message_count.items(): if count > 0: message = f'{count} {quality} messages has been processed' self.channel.basic_publish(exchange='', routing_key='qualityStatistics', body=message) print(f'Sent statistics: {message}') def reset_message_count(self): for quality in self.message_count: self.message_count[quality] = 0 def is_batch_completed(self): return sum(self.message_count.values()) >= 10 def close_connection(self): self.connection.close() if __name__ == '__main__': consumer = QualityConsumer() try: consumer.start_consuming() except KeyboardInterrupt: consumer.close_connection() The third components prints the statistics. Let's create an other instance (terminal) and create statistics_consumer.py import pika # RabbitMQ settings connection = pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) channel = connection.channel() channel.queue_declare(queue='qualityStatistics') def callback(ch, method, properties, body): message = body.decode() print('-----------------------------') print(f'{message}') ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(queue='qualityStatistics', on_message_callback=callback) print('Waiting for quality statistics...') channel.start_consuming()