tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2
Differences
This shows you the differences between two versions of the page.
| Both sides previous revisionPrevious revisionNext revision | Previous revision | ||
| tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2023/05/08 07:03] – knehez | tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2024/04/24 06:45] (current) – knehez | ||
|---|---|---|---|
| Line 3: | Line 3: | ||
| Egy minőségbiztosító rendszer mérőgépének 3 állapotát küldjük egy ' | Egy minőségbiztosító rendszer mérőgépének 3 állapotát küldjük egy ' | ||
| - | * Az első kliens, ami a mérőgépre helyezett érzékelőre kapcsolódik a ' | + | * Az első komponenst, ami a mérőgépre helyezett érzékelőre kapcsolódik a '**qualityQueue**' üzenetsorra pont-pont csatlakozással véletlenszerűen GOOD, EXCELLENT és WRONG üzeneteket küld másodpercenként. |
| - | * **Készítsen egy komponenst** amely a ' | + | * **Készítsen egy második |
| - | * **Készítsen egy második klienst**, ami a ' | + | * **Készítsen egy harmadik komponenst**, ami a ' |
| - | A fenti feladatot a http:// | + | < |
| + | flowchart TB | ||
| + | MQ[(" | ||
| + | Client1[" | ||
| + | Client2[" | ||
| + | Client2 --> | ||
| + | Client3[" | ||
| + | |||
| + | subgraph Docker | ||
| + | MQ | ||
| + | end | ||
| + | |||
| + | subgraph Components | ||
| + | Client1 | ||
| + | Client2 | ||
| + | Client3 | ||
| + | end | ||
| + | |||
| + | classDef machine fill:# | ||
| + | classDef clients fill:# | ||
| + | class Docker machine; | ||
| + | class Client1, | ||
| + | |||
| + | </ | ||
| + | |||
| + | A fenti feladatot a http:// | ||
| === RabbitMQ indítása docker-ben === | === RabbitMQ indítása docker-ben === | ||
| Line 14: | Line 39: | ||
| < | < | ||
| - | docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management | + | docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq: |
| </ | </ | ||
| - | A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben. | + | A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben |
| Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot: | Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot: | ||
| Line 23: | Line 48: | ||
| < | < | ||
| pip install pika | pip install pika | ||
| - | </> | + | </code> |
| Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja. | Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja. | ||
| - | Hozzuk létre a quality_message_sender.py-t: | + | Hozzuk létre a **quality_message_sender.py**-t: |
| - | <code python> | + | Használjuk a megfelelő IP-t a // |
| + | |||
| + | <sxh python> | ||
| import pika | import pika | ||
| import random | import random | ||
| import time | import time | ||
| - | qualities | + | class QualitySender: |
| + | def __init__(self): | ||
| + | self.connection | ||
| + | self.channel = self.connection.channel() | ||
| + | self.channel.queue_declare(queue='qualityQueue') | ||
| - | # RabbitMQ settings | + | def start_sending(self): |
| - | connection | + | qualities = [' |
| - | channel = connection.channel() | + | while True: |
| + | | ||
| + | self.channel.basic_publish(exchange='' | ||
| + | print(f' | ||
| + | time.sleep(1) | ||
| - | channel.queue_declare(queue=' | + | def close_connection(self): |
| + | self.connection.close() | ||
| - | while True: | + | if __name__ == ' |
| - | | + | |
| - | | + | |
| - | | + | |
| - | | + | |
| - | time.sleep(1) | + | |
| - | connection.close() | + | </ |
| - | </ | + | A második komponenshez indítsunk egy új konzolt: |
| - | Állítsuk be a megfelelő IP címet a //pika.ConnectionParameters()// függvénynél. | + | A //__init__(self):// konstruktorban állítsuk be a rabbitMQ szerver IP címét |
| - | Indítsunk egy másik terminált, futtassuk a //'pip install | + | <sxh python> |
| + | import | ||
| - | A //pika.ConnectionParameters()//-t értelem szerűen állítsuk be. | + | class QualityConsumer: |
| + | def __init__(self): | ||
| + | self.connection = pika.BlockingConnection(pika.ConnectionParameters(' | ||
| + | self.channel = self.connection.channel() | ||
| + | self.channel.queue_declare(queue=' | ||
| + | self.channel.queue_declare(queue=' | ||
| + | self.message_count = {' | ||
| - | <code python> | + | def start_consuming(self): |
| + | def callback(ch, | ||
| + | quality = body.decode() | ||
| + | self.message_count[quality] += 1 | ||
| + | print(f' | ||
| + | if self.is_batch_completed(): | ||
| + | self.send_statistics() | ||
| + | self.reset_message_count() | ||
| + | |||
| + | self.channel.basic_consume(queue=' | ||
| + | self.channel.start_consuming() | ||
| + | |||
| + | def send_statistics(self): | ||
| + | for quality, count in self.message_count.items(): | ||
| + | if count > 0: | ||
| + | message = f' | ||
| + | self.channel.basic_publish(exchange='', | ||
| + | print(f' | ||
| + | |||
| + | def reset_message_count(self): | ||
| + | for quality in self.message_count: | ||
| + | self.message_count[quality] = 0 | ||
| + | |||
| + | def is_batch_completed(self): | ||
| + | return sum(self.message_count.values()) >= 10 | ||
| + | |||
| + | def close_connection(self): | ||
| + | self.connection.close() | ||
| + | |||
| + | if __name__ == ' | ||
| + | consumer = QualityConsumer() | ||
| + | try: | ||
| + | consumer.start_consuming() | ||
| + | except KeyboardInterrupt: | ||
| + | consumer.close_connection() | ||
| + | |||
| + | </sxh> | ||
| + | |||
| + | Készítsük el a statisztika kiírását egy új konzolban: | ||
| + | |||
| + | < | ||
| import pika | import pika | ||
| Line 69: | Line 152: | ||
| def callback(ch, | def callback(ch, | ||
| - | | + | |
| - | print(f' | + | print(f' |
| ch.basic_ack(delivery_tag=method.delivery_tag) | ch.basic_ack(delivery_tag=method.delivery_tag) | ||
| Line 77: | Line 160: | ||
| print(' | print(' | ||
| channel.start_consuming() | channel.start_consuming() | ||
| - | + | </sxh> | |
| - | </code> | + | |
| + | **Feladat: | ||
| + | A 15672-es porton lépjük be a rabbitMQ management console-ra és vizsgáljuk meg a lehetőségeit. | ||
tanszek/oktatas/informacios_rendszerek_integralasa/uezenetsorok-rabbitmq_2.1683529435.txt.gz · Last modified: 2023/05/08 07:03 by knehez
