User Tools

Site Tools


tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revisionPrevious revision
Next revision
Previous revision
tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2023/05/08 07:05] kneheztanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2024/04/24 06:45] (current) knehez
Line 3: Line 3:
 Egy minőségbiztosító rendszer mérőgépének 3 állapotát küldjük egy 'qualityQueue' nevű üzenetsorra. Készítsen egy több komponensből álló alkalmazást, amely 2 kliensen keresztül kommunikál az üzenetsorral az alábbi módon: Egy minőségbiztosító rendszer mérőgépének 3 állapotát küldjük egy 'qualityQueue' nevű üzenetsorra. Készítsen egy több komponensből álló alkalmazást, amely 2 kliensen keresztül kommunikál az üzenetsorral az alábbi módon:
  
-  * Az első kliens, ami a mérőgépre helyezett érzékelőre kapcsolódik a 'qualityQueue' üzenetsorra pont-pont csatlakozással véletlenszerűen GOOD, EXCELLENT és WRONG üzeneteket küld másodpercenként.  +  * Az első komponenst, ami a mérőgépre helyezett érzékelőre kapcsolódik a '**qualityQueue**' üzenetsorra pont-pont csatlakozással véletlenszerűen GOOD, EXCELLENT és WRONG üzeneteket küld másodpercenként.  
-  * **Készítsen egy komponenst** amely a 'GOOD', 'EXCELLENT' és a 'WRONG' üzeneteket leolvassa a qualityQueue sorról és gyűjti. Minden 10 megkapott azonos üzenet után a 'qualityStatistics' sorra küld egy üzenetet, amiben azt jelzi, hogy 10 (adott minőségű) üzenetet feldolgozott.  +  * **Készítsen egy második komponenst** amely a 'GOOD', 'EXCELLENT' és a 'WRONG' üzeneteket leolvassa a **qualityQueue** sorról és gyűjti. Minden 10 megkapott azonos üzenet után a '**qualityStatistics**' sorra küld egy üzenetet, amiben azt jelzi, hogy 10 (adott minőségű) üzenetet feldolgozott.  
-  * **Készítsen egy második klienst**, ami a 'qualityStatistics' sorrol olvassa a statisztikát és a konzolba kiírja hogy pl. '10 'WRONG' messages has been processed'+  * **Készítsen egy harmadik komponenst**, ami a 'qualityStatistics' sorrol olvassa a statisztikát és a konzolba kiírja hogy pl. '10 'WRONG' messages has been processed'
  
-A fenti feladatot a http://docker.iit.uni-miskolc.hu-n keretrendszerben oldjuk meg.+<mermaid> 
 +flowchart TB 
 +    MQ[("RabbitMQ Server\n(qualityQueue, qualityStatistics)")] 
 +    Client1["Component 1\n(Sensor Data Sender)"] -->|sends GOOD/EXCELLENT/WRONG| MQ 
 +    Client2["Component 2\n(Quality Message Consumer)"] -- collects messages --> MQ 
 +    Client2 -->|sends batch of 10 messages| MQ 
 +    Client3["Component 3\n(Statistics Consumer)"] -- receives and prints batches --> MQ 
 + 
 +    subgraph Docker 
 +    MQ 
 +    end 
 + 
 +    subgraph Components 
 +    Client1 
 +    Client2 
 +    Client3 
 +    end 
 + 
 +    classDef machine fill:#f9f,stroke:#333,stroke-width:2px; 
 +    classDef clients fill:#ccf,stroke:#333,stroke-width:2px; 
 +    class Docker machine; 
 +    class Client1,Client2,Client3 clients; 
 + 
 +</mermaid> 
 + 
 +A fenti feladatot a http://docker.iit.uni-miskolc.hu keretrendszerben oldjuk meg.
  
 === RabbitMQ indítása docker-ben === === RabbitMQ indítása docker-ben ===
Line 14: Line 39:
  
 <code> <code>
-docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management+docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management-alpine
 </code> </code>
  
-A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben.+A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben és a feldolgozóban.
  
 Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot: Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot:
Line 23: Line 48:
 <code> <code>
 pip install pika pip install pika
-</>+</code>
  
 Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja. Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja.
  
-Hozzuk létre a quality_message_sender.py-t:+Hozzuk létre a **quality_message_sender.py**-t:
  
-<code python>+Használjuk a megfelelő IP-t a //__init__(self):// ben 
 + 
 +<sxh python>
 import pika import pika
 import random import random
 import time import time
  
-qualities ['EXCELLENT', 'GOOD', 'WRONG']+class QualitySender: 
 +    def __init__(self): 
 +        self.connection pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) 
 +        self.channel = self.connection.channel() 
 +        self.channel.queue_declare(queue='qualityQueue')
  
-# RabbitMQ settings +    def start_sending(self): 
-connection pika.BlockingConnection(pika.ConnectionParameters('10.x.y.z')) +        qualities = ['GOOD', 'EXCELLENT', 'WRONG'
-channel = connection.channel()+        while True: 
 +            quality random.choice(qualities) 
 +            self.channel.basic_publish(exchange='', routing_key='qualityQueue', body=quality) 
 +            print(f'Sent quality: {quality}'
 +            time.sleep(1)
  
-channel.queue_declare(queue='qualityQueue')+    def close_connection(self): 
 +        self.connection.close()
  
-while True+if __name__ == '__main__'
-    quality random.choice(qualities+    sender QualitySender() 
-    message = f'{quality}' +    try: 
-    channel.basic_publish(exchange='', routing_key='qualityQueue', body=message+        sender.start_sending() 
-    print(f'Sent message{message}') +    except KeyboardInterrupt
-    time.sleep(1)+        sender.close_connection()
  
-connection.close()+</sxh>
  
-</code>+A második komponenshez indítsunk egy új konzolt:
  
-Állítsuk be a megfelelő IP címet a //pika.ConnectionParameters()// függvénynél.+//__init__(self):// konstruktorban állítsuk be a rabbitMQ szerver IP címét
  
-Indítsunk egy másik terminált, futtassuk a //'pip install pika'// parancsot. Majd hozzuk létre a //quality_consumer.py// állományt az alábbi kóddal.+<sxh python> 
 +import pika
  
-A //pika.ConnectionParameters()//-t értelem szerűen állítsuk be.+class QualityConsumer: 
 +    def __init__(self): 
 +        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
 +        self.channel = self.connection.channel() 
 +        self.channel.queue_declare(queue='qualityQueue'
 +        self.channel.queue_declare(queue='qualityStatistics') 
 +        self.message_count = {'GOOD': 0, 'EXCELLENT': 0, 'WRONG': 0}
  
-<code python>+    def start_consuming(self): 
 +        def callback(ch, method, properties, body): 
 +            quality = body.decode() 
 +            self.message_count[quality] += 1 
 +            print(f'Received quality: {quality}'
 +            if self.is_batch_completed(): 
 +                self.send_statistics() 
 +                self.reset_message_count() 
 + 
 +        self.channel.basic_consume(queue='qualityQueue', on_message_callback=callback, auto_ack=True) 
 +        self.channel.start_consuming() 
 + 
 +    def send_statistics(self): 
 +        for quality, count in self.message_count.items(): 
 +            if count > 0: 
 +                message = f'{count} {quality} messages has been processed' 
 +                self.channel.basic_publish(exchange='', routing_key='qualityStatistics', body=message) 
 +                print(f'Sent statistics: {message}'
 + 
 +    def reset_message_count(self): 
 +        for quality in self.message_count: 
 +            self.message_count[quality] = 0 
 + 
 +    def is_batch_completed(self): 
 +        return sum(self.message_count.values()) >= 10 
 + 
 +    def close_connection(self): 
 +        self.connection.close() 
 + 
 +if __name__ == '__main__': 
 +    consumer = QualityConsumer() 
 +    try: 
 +        consumer.start_consuming() 
 +    except KeyboardInterrupt: 
 +        consumer.close_connection() 
 + 
 +</sxh> 
 + 
 +Készítsük el a statisztika kiírását egy új konzolban: 
 + 
 +<sxh python>
 import pika import pika
  
Line 69: Line 152:
  
 def callback(ch, method, properties, body): def callback(ch, method, properties, body):
-    quality = body.decode() +    message = body.decode() 
-    print(f'{qualitymessages has been processed')+    print(f'{message}')
     ch.basic_ack(delivery_tag=method.delivery_tag)     ch.basic_ack(delivery_tag=method.delivery_tag)
  
Line 77: Line 160:
 print('Waiting for quality statistics...') print('Waiting for quality statistics...')
 channel.start_consuming() channel.start_consuming()
- +</sxh>
-</code>+
  
  
 +**Feladat:**
  
 +A 15672-es porton lépjük be a rabbitMQ management console-ra és vizsgáljuk meg a lehetőségeit.
  
  
tanszek/oktatas/informacios_rendszerek_integralasa/uezenetsorok-rabbitmq_2.1683529514.txt.gz · Last modified: 2023/05/08 07:05 by knehez