tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2
Differences
This shows you the differences between two versions of the page.
Next revision | Previous revision | ||
tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2023/05/08 06:47] – létrehozva knehez | tanszek:oktatas:informacios_rendszerek_integralasa:uezenetsorok-rabbitmq_2 [2024/04/24 06:45] (current) – knehez | ||
---|---|---|---|
Line 1: | Line 1: | ||
==== Összetettebb példa ==== | ==== Összetettebb példa ==== | ||
- | Készítsen egy több komponensből álló alkalmazást, | + | Egy minőségbiztosító rendszer mérőgépének 3 állapotát |
- | * Egy minőségbiztosító rendszer mérőgépének 3 állapotát | + | |
- | * Az első kliens, ami a mérőgépre helyezett érzékelőre kapcsolódik a ' | + | |
- | * Készítsen egy komponenst amely a ' | + | |
- | * Készítsen egy második klienst, ami a ' | + | |
- | A fenti feladatot | + | * Az első komponenst, ami a mérőgépre helyezett érzékelőre kapcsolódik a ' |
+ | * **Készítsen egy második komponenst** amely a ' | ||
+ | * **Készítsen egy harmadik komponenst**, | ||
+ | < | ||
+ | flowchart TB | ||
+ | MQ[(" | ||
+ | Client1[" | ||
+ | Client2[" | ||
+ | Client2 --> | ||
+ | Client3[" | ||
+ | subgraph Docker | ||
+ | MQ | ||
+ | end | ||
+ | subgraph Components | ||
+ | Client1 | ||
+ | Client2 | ||
+ | Client3 | ||
+ | end | ||
+ | |||
+ | classDef machine fill:# | ||
+ | classDef clients fill:# | ||
+ | class Docker machine; | ||
+ | class Client1, | ||
+ | |||
+ | </ | ||
+ | |||
+ | A fenti feladatot a http:// | ||
+ | |||
+ | === RabbitMQ indítása docker-ben === | ||
+ | |||
+ | A feladat megoldásához több instance-t (konzolt) érdemes indítani. Az első konzol fogja a rabbitMQ szervert indítani. Adjunk hozzá egy konzolt (node 1) és futtassuk a következő parancsot: | ||
+ | |||
+ | < | ||
+ | docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq: | ||
+ | </ | ||
+ | |||
+ | A futtatás után a rabitMQ management konzol elérhető az 15672-es porton, a guest/guest megadásával. A bal oldali listában láthatjuk a node1 10.x.y.z belső IP címét, amit használhatunk a kliensekben és a feldolgozóban. | ||
+ | |||
+ | Hozzunk létre egy másik konzolt és indítsuk el az alábbi parancsot: | ||
+ | |||
+ | < | ||
+ | pip install pika | ||
+ | </ | ||
+ | |||
+ | Ezzel telepítettük a pika modult, ami a rabbitMQ-hoz való csatlakozást biztosítja. | ||
+ | |||
+ | Hozzuk létre a **quality_message_sender.py**-t: | ||
+ | |||
+ | Használjuk a megfelelő IP-t a // | ||
+ | |||
+ | <sxh python> | ||
+ | import pika | ||
+ | import random | ||
+ | import time | ||
+ | |||
+ | class QualitySender: | ||
+ | def __init__(self): | ||
+ | self.connection = pika.BlockingConnection(pika.ConnectionParameters(' | ||
+ | self.channel = self.connection.channel() | ||
+ | self.channel.queue_declare(queue=' | ||
+ | |||
+ | def start_sending(self): | ||
+ | qualities = [' | ||
+ | while True: | ||
+ | quality = random.choice(qualities) | ||
+ | self.channel.basic_publish(exchange='', | ||
+ | print(f' | ||
+ | time.sleep(1) | ||
+ | |||
+ | def close_connection(self): | ||
+ | self.connection.close() | ||
+ | |||
+ | if __name__ == ' | ||
+ | sender = QualitySender() | ||
+ | try: | ||
+ | sender.start_sending() | ||
+ | except KeyboardInterrupt: | ||
+ | sender.close_connection() | ||
+ | |||
+ | </ | ||
+ | |||
+ | A második komponenshez indítsunk egy új konzolt: | ||
+ | |||
+ | A // | ||
+ | |||
+ | <sxh python> | ||
+ | import pika | ||
+ | |||
+ | class QualityConsumer: | ||
+ | def __init__(self): | ||
+ | self.connection = pika.BlockingConnection(pika.ConnectionParameters(' | ||
+ | self.channel = self.connection.channel() | ||
+ | self.channel.queue_declare(queue=' | ||
+ | self.channel.queue_declare(queue=' | ||
+ | self.message_count = {' | ||
+ | |||
+ | def start_consuming(self): | ||
+ | def callback(ch, | ||
+ | quality = body.decode() | ||
+ | self.message_count[quality] += 1 | ||
+ | print(f' | ||
+ | if self.is_batch_completed(): | ||
+ | self.send_statistics() | ||
+ | self.reset_message_count() | ||
+ | |||
+ | self.channel.basic_consume(queue=' | ||
+ | self.channel.start_consuming() | ||
+ | |||
+ | def send_statistics(self): | ||
+ | for quality, count in self.message_count.items(): | ||
+ | if count > 0: | ||
+ | message = f' | ||
+ | self.channel.basic_publish(exchange='', | ||
+ | print(f' | ||
+ | |||
+ | def reset_message_count(self): | ||
+ | for quality in self.message_count: | ||
+ | self.message_count[quality] = 0 | ||
+ | |||
+ | def is_batch_completed(self): | ||
+ | return sum(self.message_count.values()) >= 10 | ||
+ | |||
+ | def close_connection(self): | ||
+ | self.connection.close() | ||
+ | |||
+ | if __name__ == ' | ||
+ | consumer = QualityConsumer() | ||
+ | try: | ||
+ | consumer.start_consuming() | ||
+ | except KeyboardInterrupt: | ||
+ | consumer.close_connection() | ||
+ | |||
+ | </ | ||
+ | |||
+ | Készítsük el a statisztika kiírását egy új konzolban: | ||
+ | |||
+ | <sxh python> | ||
+ | import pika | ||
+ | |||
+ | # RabbitMQ settings | ||
+ | connection = pika.BlockingConnection(pika.ConnectionParameters(' | ||
+ | channel = connection.channel() | ||
+ | |||
+ | channel.queue_declare(queue=' | ||
+ | |||
+ | def callback(ch, | ||
+ | message = body.decode() | ||
+ | print(f' | ||
+ | ch.basic_ack(delivery_tag=method.delivery_tag) | ||
+ | |||
+ | channel.basic_consume(queue=' | ||
+ | |||
+ | print(' | ||
+ | channel.start_consuming() | ||
+ | </ | ||
+ | |||
+ | |||
+ | **Feladat: | ||
+ | |||
+ | A 15672-es porton lépjük be a rabbitMQ management console-ra és vizsgáljuk meg a lehetőségeit. | ||
tanszek/oktatas/informacios_rendszerek_integralasa/uezenetsorok-rabbitmq_2.1683528479.txt.gz · Last modified: 2023/05/08 06:47 by knehez