tanszek:oktatas:iss_t:rabbitmq
Differences
This shows you the differences between two versions of the page.
Both sides previous revisionPrevious revisionNext revision | Previous revision | ||
tanszek:oktatas:iss_t:rabbitmq [2023/05/08 08:27] – knehez | tanszek:oktatas:iss_t:rabbitmq [2024/04/22 08:49] (current) – knehez | ||
---|---|---|---|
Line 3: | Line 3: | ||
We send states to a message queue named ' | We send states to a message queue named ' | ||
- | - The first client | + | - The first component |
- | - Create a component that reads and collects the ' | + | - Create a component that reads and collects the '**GOOD**', '**EXCELLENT**', and '**WRONG**' messages from the qualityQueue queue. After receiving 10 identical messages, it sends a message to the '**qualityStatistics**' queue indicating that it has processed 10 messages of a certain quality. |
- | - Create a second | + | - Create a second |
+ | |||
+ | < | ||
+ | flowchart TB | ||
+ | MQ[(" | ||
+ | Component1[" | ||
+ | Component2[" | ||
+ | Component2 --> | ||
+ | Component3[" | ||
+ | |||
+ | subgraph Docker | ||
+ | MQ | ||
+ | end | ||
+ | |||
+ | subgraph Components | ||
+ | Component1 | ||
+ | Component2 | ||
+ | Component3 | ||
+ | end | ||
+ | |||
+ | classDef machine fill:# | ||
+ | classDef clients fill:# | ||
+ | class Docker machine; | ||
+ | class Component1, | ||
+ | |||
+ | </ | ||
Let's try to solve the task with http:// | Let's try to solve the task with http:// | ||
Line 13: | Line 38: | ||
< | < | ||
- | docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management | + | docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq: |
</ | </ | ||
- | After running the command above, the RabbitMQ management console will be accessible on port 15672, using the credentials guest/ | + | After running the command above, the **RabbitMQ** management console will be accessible on port **15672**, using the credentials |
Create another terminal and execute the following command: | Create another terminal and execute the following command: | ||
Line 24: | Line 49: | ||
</ | </ | ||
- | This command installs the pika module, which provides the connection to RabbitMQ. | + | This command installs the pika module, which provides the connection to **RabbitMQ**. |
Create the // | Create the // | ||
Line 30: | Line 55: | ||
Use the appropriate IP address in the // | Use the appropriate IP address in the // | ||
- | <code python> | + | <sxh python> |
import pika | import pika | ||
import random | import random | ||
Line 58: | Line 83: | ||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||
sender.close_connection() | sender.close_connection() | ||
- | </code> | + | </sxh> |
+ | |||
+ | Let's create the // | ||
+ | |||
+ | (do not forget to create it in an other terminal, and run //pip install pika// and set the proper IP in // | ||
+ | |||
+ | <sxh python> | ||
+ | import pika | ||
+ | |||
+ | class QualityConsumer: | ||
+ | def __init__(self): | ||
+ | self.connection = pika.BlockingConnection(pika.ConnectionParameters(' | ||
+ | self.channel = self.connection.channel() | ||
+ | self.channel.queue_declare(queue=' | ||
+ | self.channel.queue_declare(queue=' | ||
+ | self.message_count = {' | ||
+ | |||
+ | def start_consuming(self): | ||
+ | def callback(ch, | ||
+ | quality = body.decode() | ||
+ | self.message_count[quality] += 1 | ||
+ | print(f' | ||
+ | if self.is_batch_completed(): | ||
+ | self.send_statistics() | ||
+ | self.reset_message_count() | ||
+ | |||
+ | self.channel.basic_consume(queue=' | ||
+ | self.channel.start_consuming() | ||
+ | |||
+ | def send_statistics(self): | ||
+ | for quality, count in self.message_count.items(): | ||
+ | if count > 0: | ||
+ | message = f' | ||
+ | self.channel.basic_publish(exchange='', | ||
+ | print(f' | ||
+ | |||
+ | def reset_message_count(self): | ||
+ | for quality in self.message_count: | ||
+ | self.message_count[quality] = 0 | ||
+ | |||
+ | def is_batch_completed(self): | ||
+ | return sum(self.message_count.values()) >= 10 | ||
+ | |||
+ | def close_connection(self): | ||
+ | self.connection.close() | ||
+ | |||
+ | if __name__ == ' | ||
+ | consumer = QualityConsumer() | ||
+ | try: | ||
+ | consumer.start_consuming() | ||
+ | except KeyboardInterrupt: | ||
+ | consumer.close_connection() | ||
+ | </ | ||
+ | |||
+ | The third components prints the statistics. Let's create an other instance (terminal) and create statistics_consumer.py | ||
+ | |||
+ | <sxh python> | ||
+ | import pika | ||
+ | |||
+ | # RabbitMQ settings | ||
+ | connection = pika.BlockingConnection(pika.ConnectionParameters(' | ||
+ | channel = connection.channel() | ||
+ | |||
+ | channel.queue_declare(queue=' | ||
+ | |||
+ | def callback(ch, | ||
+ | message = body.decode() | ||
+ | print(' | ||
+ | print(f' | ||
+ | ch.basic_ack(delivery_tag=method.delivery_tag) | ||
+ | |||
+ | channel.basic_consume(queue=' | ||
+ | |||
+ | print(' | ||
+ | channel.start_consuming() | ||
+ | </ |
tanszek/oktatas/iss_t/rabbitmq.1683534443.txt.gz · Last modified: 2023/05/08 08:27 by knehez