User Tools

Site Tools


tanszek:oktatas:iss_t:rabbitmq

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revisionPrevious revision
Next revision
Previous revision
tanszek:oktatas:iss_t:rabbitmq [2023/05/08 08:28] kneheztanszek:oktatas:iss_t:rabbitmq [2024/04/22 08:49] (current) knehez
Line 3: Line 3:
 We send states to a message queue named 'qualityQueue', which stores simple status messages of a quality assurance system. Create a multi-component application that communicates with the message queue through two clients in the following way: We send states to a message queue named 'qualityQueue', which stores simple status messages of a quality assurance system. Create a multi-component application that communicates with the message queue through two clients in the following way:
  
-  - The first client connects to the sensor placed on the measuring machine and randomly sends GOOD, EXCELLENT, and WRONG messages to the 'qualityQueue' message queue every second. +  - The first component connects to the sensor placed on the measuring machine and randomly sends **GOOD****EXCELLENT**, and **WRONG** messages to the '**qualityQueue**' message queue every second. 
-  - Create a component that reads and collects the 'GOOD', 'EXCELLENT', and 'WRONG' messages from the qualityQueue queue. After receiving 10 identical messages, it sends a message to the 'qualityStatistics' queue indicating that it has processed 10 messages of a certain quality. +  - Create a component that reads and collects the '**GOOD**', '**EXCELLENT**', and '**WRONG**' messages from the qualityQueue queue. After receiving 10 identical messages, it sends a message to the '**qualityStatistics**' queue indicating that it has processed 10 messages of a certain quality. 
-  - Create a second client that reads the statistics from the 'qualityStatistics' queue and prints to the console, for example, '10 'WRONG' messages has been processed'.+  - Create a second component that reads the statistics from the '**qualityStatistics**' queue and prints to the console, for example, '10 'WRONG' messages has been processed'. 
 + 
 +<mermaid> 
 +flowchart TB 
 +    MQ[("RabbitMQ Server\n(qualityQueue, qualityStatistics)")] 
 +    Component1["Component 1\n(Sensor Data Sender)"] -->|sends GOOD/EXCELLENT/WRONG| MQ 
 +    Component2["Component 2\n(Quality Message Consumer)"] -- collects messages --> MQ 
 +    Component2 -->|sends batch of 10 messages| MQ 
 +    Component3["Component 3\n(Statistics Consumer)"] -- receives and prints batches --> MQ 
 + 
 +    subgraph Docker 
 +    MQ 
 +    end 
 + 
 +    subgraph Components 
 +    Component1 
 +    Component2 
 +    Component3 
 +    end 
 + 
 +    classDef machine fill:#f9f,stroke:#333,stroke-width:2px; 
 +    classDef clients fill:#ccf,stroke:#333,stroke-width:2px; 
 +    class Docker machine; 
 +    class Component1,Component2,Component3 clients; 
 + 
 +</mermaid>
  
 Let's try to solve the task with http://docker.iit.uni-miskolc.hu framework. Let's try to solve the task with http://docker.iit.uni-miskolc.hu framework.
Line 13: Line 38:
  
 <code> <code>
-docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management+docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management-alpine
 </code> </code>
  
-After running the command above, the RabbitMQ management console will be accessible on port 15672, using the credentials guest/guest. In the left-side menu, you can find the internal IP address (10.x.y.z) of node1, which can be used in the clients and processors.+After running the command above, the **RabbitMQ** management console will be accessible on port **15672**, using the credentials //guest/guest//. In the left-side menu, you can find the internal IP address (**10.x.y.z**) of node1, which can be used in the clients and processors.
  
 Create another terminal and execute the following command: Create another terminal and execute the following command:
Line 24: Line 49:
 </code> </code>
  
-This command installs the pika module, which provides the connection to RabbitMQ.+This command installs the pika module, which provides the connection to **RabbitMQ**.
  
 Create the //quality_message_sender.py//: Create the //quality_message_sender.py//:
Line 30: Line 55:
 Use the appropriate IP address in the //init(self):// method. Use the appropriate IP address in the //init(self):// method.
  
-<code python>+<sxh python>
 import pika import pika
 import random import random
Line 58: Line 83:
     except KeyboardInterrupt:     except KeyboardInterrupt:
         sender.close_connection()         sender.close_connection()
-</code>+</sxh>
  
-Let's create the consumer, and the create statistics:+Let's create the //quality_message_consumer.py// file:
  
-<code python>+(do not forget to create it in an other terminal, and run //pip install pika// and set the proper IP in //pika.ConnectionParameters()//
 + 
 +<sxh python>
 import pika import pika
    
 class QualityConsumer: class QualityConsumer:
     def __init__(self):     def __init__(self):
-        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))+        self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z'))
         self.channel = self.connection.channel()         self.channel = self.connection.channel()
         self.channel.queue_declare(queue='qualityQueue')         self.channel.queue_declare(queue='qualityQueue')
Line 108: Line 135:
     except KeyboardInterrupt:     except KeyboardInterrupt:
         consumer.close_connection()         consumer.close_connection()
-</code>+</sxh> 
 + 
 +The third components prints the statistics. Let's create an other instance (terminal) and  create statistics_consumer.py 
 + 
 +<sxh python> 
 +import pika 
 +  
 +# RabbitMQ settings 
 +connection = pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) 
 +channel = connection.channel() 
 +  
 +channel.queue_declare(queue='qualityStatistics'
 +  
 +def callback(ch, method, properties, body): 
 +    message = body.decode() 
 +    print('-----------------------------'
 +    print(f'{message}'
 +    ch.basic_ack(delivery_tag=method.delivery_tag) 
 +  
 +channel.basic_consume(queue='qualityStatistics', on_message_callback=callback) 
 +  
 +print('Waiting for quality statistics...'
 +channel.start_consuming() 
 +</sxh>
  
tanszek/oktatas/iss_t/rabbitmq.1683534512.txt.gz · Last modified: 2023/05/08 08:28 by knehez