tanszek:oktatas:iss_t:messaging_systems
Differences
This shows you the differences between two versions of the page.
Both sides previous revisionPrevious revisionNext revision | Previous revision | ||
tanszek:oktatas:iss_t:messaging_systems [2023/04/24 18:42] – [Message Queue implementations] knehez | tanszek:oktatas:iss_t:messaging_systems [2025/04/14 07:31] (current) – [MQTT example] knehez | ||
---|---|---|---|
Line 15: | Line 15: | ||
A message queue is a software that enables communication between different software components in a distributed system. It allows components to exchange messages asynchronously, | A message queue is a software that enables communication between different software components in a distributed system. It allows components to exchange messages asynchronously, | ||
- | RabbitMQ (https:// | + | RabbitMQ (https:// |
In RabbitMQ, messages are published by producers to a specific exchange, which routes them to one or more queues based on the specified routing key. Consumers then subscribe to the queues and receive messages. RabbitMQ supports multiple programming languages, including Java, Python, .NET, and Node.js, making it a versatile messaging solution for various use cases. | In RabbitMQ, messages are published by producers to a specific exchange, which routes them to one or more queues based on the specified routing key. Consumers then subscribe to the queues and receive messages. RabbitMQ supports multiple programming languages, including Java, Python, .NET, and Node.js, making it a versatile messaging solution for various use cases. | ||
Line 53: | Line 53: | ||
This code sets up a callback function that will be called every time a message is received from the ' | This code sets up a callback function that will be called every time a message is received from the ' | ||
- | **Type of " | + | ===== |
An exchange in RabbitMQ is a messaging entity that receives messages from producers and routes them to queues based on some criteria. When a producer sends a message to RabbitMQ, it sends the message to an exchange. The exchange then examines the message' | An exchange in RabbitMQ is a messaging entity that receives messages from producers and routes them to queues based on some criteria. When a producer sends a message to RabbitMQ, it sends the message to an exchange. The exchange then examines the message' | ||
Line 60: | Line 60: | ||
**Direct Exchange:** | **Direct Exchange:** | ||
- | A direct exchange routes messages based on a routing key that is matched exactly with the routing key of the queue. When a message is sent to a direct exchange, RabbitMQ will deliver it to the queue(s) whose binding key exactly matches the routing key of the message. | + | A direct exchange routes messages based on a routing key that is //matched exactly with the routing key// of the queue. When a message is sent to a direct exchange, RabbitMQ will deliver it to the queue(s) whose binding key exactly matches the routing key of the message. |
* **For example**, a stock market application may send messages to a direct exchange with the routing key being the stock ticker symbol, and each queue bound to the exchange would represent a different stock. This way, the application can send specific messages to the appropriate queue, where consumers can consume them and perform actions based on the stock data. | * **For example**, a stock market application may send messages to a direct exchange with the routing key being the stock ticker symbol, and each queue bound to the exchange would represent a different stock. This way, the application can send specific messages to the appropriate queue, where consumers can consume them and perform actions based on the stock data. | ||
+ | |||
+ | [Queue: Stock A] | ||
+ | [Queue: Stock B] | ||
+ | [Direct Exchange: Stock Market] → [Queue: Stock C] | ||
+ | [Queue: Stock D] | ||
+ | [Queue: Stock E] | ||
+ | |||
+ | |||
**Topic Exchange** | **Topic Exchange** | ||
Line 68: | Line 76: | ||
* **For example**, a blog platform may send messages to a topic exchange with the routing key being the topic of the blog post. Queues can then bind to the exchange using a matching pattern to receive messages that match certain criteria. For example, a queue bound to the exchange with the pattern " | * **For example**, a blog platform may send messages to a topic exchange with the routing key being the topic of the blog post. Queues can then bind to the exchange using a matching pattern to receive messages that match certain criteria. For example, a queue bound to the exchange with the pattern " | ||
+ | |||
+ | [Queue: Sports] | ||
+ | [Topic Exchange: Blog Platform] → [Queue: Technology] | ||
+ | [Queue: Politics] | ||
+ | [Queue: Entertainment] | ||
+ | |||
**Fanout Exchange** | **Fanout Exchange** | ||
A fanout exchange routes messages to all queues that are bound to it, regardless of the routing key of the message. It is useful for broadcasting messages to multiple queues or multiple consumers. | A fanout exchange routes messages to all queues that are bound to it, regardless of the routing key of the message. It is useful for broadcasting messages to multiple queues or multiple consumers. | ||
- | * **For example**, a notification system may send messages to a fanout exchange when a new event is created. Each queue bound to the exchange would represent a different user, and all users should receive the notification. This way, the application can broadcast the message to all connected | + | * **For example**, a notification system may send messages to a fanout exchange when a new event is created. Each queue bound to the exchange would represent a different user, and all users should receive the notification. This way, the application can broadcast the message to all connected |
+ | |||
+ | [Notification System] → [Fanout Exchange] → [Queue: User A] | ||
+ | [Queue: User B] | ||
+ | [Queue: User C] | ||
+ | [Queue: User D] | ||
**Headers Exchange** | **Headers Exchange** | ||
Line 78: | Line 98: | ||
* **For example**, a logistics system may send messages to a headers exchange with headers such as " | * **For example**, a logistics system may send messages to a headers exchange with headers such as " | ||
+ | |||
+ | [Logistics System] → [Headers Exchange] → [Queue: New York Air] | ||
+ | [Queue: New York Sea] | ||
+ | [Queue: Los Angeles Air] | ||
+ | [Queue: Los Angeles Sea] | ||
+ | |||
Each exchange type has its own routing algorithm and is used in different messaging scenarios. Understanding the exchange types is important when designing RabbitMQ architectures that meet specific business requirements. | Each exchange type has its own routing algorithm and is used in different messaging scenarios. Understanding the exchange types is important when designing RabbitMQ architectures that meet specific business requirements. | ||
+ | |||
+ | ===== MQTT example ===== | ||
+ | |||
+ | Clone repository into docker playground: | ||
+ | |||
+ | git clone https:// | ||
+ | cd isi/ | ||
+ | docker-compose up | ||
+ | |||
+ | docker-compose.yml defines a multi-container application with three services: mqtt, consumer, and producer. | ||
+ | |||
+ | The mqtt service is an instance of the // | ||
+ | |||
+ | The consumer and producer services are both custom-built Docker images, which are defined using the build key. The context key specifies the build context, which in this case is the current directory (.), and the dockerfile key specifies the Dockerfile to use for the build. Additionally, | ||
+ | |||
+ | Finally, the depends_on key is used to specify that both the consumer and producer services depend on the mqtt service. This means that the mqtt service will be started before the other services, and will be available for use by those services. | ||
+ | |||
+ | **docker-compose.yml** | ||
+ | |||
+ | <code yml> | ||
+ | version: ' | ||
+ | services: | ||
+ | mqtt: | ||
+ | image: toke/ | ||
+ | restart: unless-stopped | ||
+ | volumes: | ||
+ | - ./ | ||
+ | - ./ | ||
+ | - ./ | ||
+ | |||
+ | consumer: | ||
+ | build: | ||
+ | context: . | ||
+ | dockerfile: Dockerfile-consumer | ||
+ | volumes: | ||
+ | - .:/ | ||
+ | depends_on: | ||
+ | - mqtt | ||
+ | |||
+ | producer: | ||
+ | build: | ||
+ | context: . | ||
+ | dockerfile: Dockerfile-producer | ||
+ | volumes: | ||
+ | - .:/ | ||
+ | depends_on: | ||
+ | - mqtt | ||
+ | </ | ||
+ | |||
+ | **Dockerfile-consumer** | ||
+ | |||
+ | This Dockerfile defines a simple containerized Python application that can be used as a consumer for a message broker. The dependencies are installed in the container, and the consumer code is copied into the container' | ||
+ | |||
+ | <code yml> | ||
+ | FROM python: | ||
+ | |||
+ | WORKDIR /app | ||
+ | |||
+ | COPY requirements.txt . | ||
+ | RUN pip install --no-cache-dir -r requirements.txt | ||
+ | |||
+ | COPY consumer.py . | ||
+ | |||
+ | CMD [" | ||
+ | </ | ||
+ | |||
+ | Here is a breakdown of the different parts of the Dockerfile: | ||
+ | |||
+ | * FROM python: | ||
+ | |||
+ | * WORKDIR /app: This line sets the working directory for the container to /app. This is where the consumer code and other related files will be located. | ||
+ | |||
+ | * COPY requirements.txt .: This line copies the requirements.txt file from the current directory on the host machine to the /app directory in the container. The requirements.txt file lists the dependencies that the consumer requires to run. | ||
+ | |||
+ | * RUN pip install --no-cache-dir -r requirements.txt: | ||
+ | |||
+ | * COPY consumer.py .: This line copies the consumer.py file from the current directory on the host machine to the /app directory in the container. This is the main code file for the consumer. | ||
+ | |||
+ | * CMD [" | ||
+ | |||
+ | **consumer.py** | ||
+ | |||
+ | <code python> | ||
+ | import paho.mqtt.client as mqtt | ||
+ | |||
+ | broker = " | ||
+ | port = 1883 | ||
+ | |||
+ | timelive = 60 | ||
+ | |||
+ | def on_connect(client, | ||
+ | print(" | ||
+ | client.subscribe("/ | ||
+ | |||
+ | |||
+ | def on_message(client, | ||
+ | print(msg.payload.decode()) | ||
+ | |||
+ | client = mqtt.Client() | ||
+ | client.connect(broker, | ||
+ | client.on_connect = on_connect | ||
+ | client.on_message = on_message | ||
+ | client.loop_forever() | ||
+ | </ | ||
+ | |||
+ | **producer.py** | ||
+ | |||
+ | <code python> | ||
+ | # simulator device 1 for mqtt message publishing | ||
+ | import paho.mqtt.client as paho | ||
+ | import time | ||
+ | import random | ||
+ | |||
+ | broker = " | ||
+ | port = 1883 | ||
+ | |||
+ | def on_publish(client, | ||
+ | print(" | ||
+ | |||
+ | client = paho.Client(" | ||
+ | client.on_publish = on_publish | ||
+ | client.connect(broker, | ||
+ | |||
+ | for i in range(20): | ||
+ | d = random.randint(1, | ||
+ | |||
+ | # telemetry to send | ||
+ | message = " | ||
+ | |||
+ | time.sleep(d) | ||
+ | |||
+ | # publish message | ||
+ | ret = client.publish("/ | ||
+ | |||
+ | print(" | ||
+ | </ | ||
tanszek/oktatas/iss_t/messaging_systems.1682361744.txt.gz · Last modified: 2023/04/24 18:42 by knehez