Надійне, готове до продакшену middleware для інтеграції RabbitMQ (AMQP) з HTTP сервісами (наприклад, 1C:Enterprise).
Це middleware діє як надійний міст між HTTP клієнтами та RabbitMQ, забезпечуючи:
- Публікацію (Publishing): HTTP POST -> RabbitMQ Exchange (Надійно)
- Споживання (Consuming): HTTP POST (Long-polling) <- RabbitMQ Queue
- Бриджинг протоколів: Конвертує HTTP REST запити в AMQP повідомлення.
- Надійність: Використовує RabbitMQ Publisher Confirms та ручні підтвердження (acknowledgments) для гарантії доставки "At-Least-Once".
- Управління топологією: Автоматично налаштовує Exchanges, Queues та Dead Letter Exchanges (DLX).
- Безпека:
- HTTP Basic Authentication: Використовує стандартний механізм аутентифікації. Middleware виступає проксі, передаючи облікові дані (username/password) безпосередньо в RabbitMQ.
- Управління підключеннями (Connection Pooling): Автоматично створює та кешує з'єднання для кожного користувача.
- Обмеження частоти запитів (Rate Limiting)
- Валідація та санітизація вхідних даних (відповідно до OWASP)
- Спостережуваність (Observability): Структуроване JSON логування з Correlation IDs та Prometheus метрики.
- Валідація схеми: Строгі Pydantic V2 моделі для надійної обробки даних.
Цей сервіс використовує HTTP Basic Authentication.
КРИТИЧНО: НЕ публікуйте цей сервіс в інтернет напряму. Ви ЗОБОВ'ЯЗАНІ розмістити його за Reverse Proxy (Nginx, Traefik, AWS ALB), налаштованим з HTTPS (TLS). Передача облікових даних (логін/пароль) через незахищений HTTP є небезпечною.
- Python 3.11+
- uv (рекомендовано) або pip
- Docker та Docker Compose
-
Клонуйте репозиторій:
git clone <repo-url> cd rmq_middleware
-
Встановіть залежності:
uv sync
-
Запустіть юніт-тести:
uv run pytest
- Створіть файл
.env(див. приклад у.env.example). - Запустіть сервіси:
docker-compose up -d
- Перевірте статус (Health check):
curl http://localhost:8000/health
Middleware намагається розумно обробляти тіло повідомлень:
- JSON: Якщо тіло є валідним JSON, воно десеріалізується в об'єкт/масив.
- String: Якщо це не JSON, спроба декодувати як UTF-8 рядок.
- Hex: Якщо це бінарні дані, повертається Hexadecimal рядок.
-
Отримання повідомлення (
Fetch):- Параметр
timeoutу запиті/v1/fetchвизначає час очікування надходження повідомлення в чергу. Це не час на обробку. - Якщо повідомлення отримано з
auto_ack: false, воно отримує статусUnackedв RabbitMQ. - Повідомлення залишається "заблокованим" за поточною сесією користувача і невидимим для інших споживачів.
- Параметр
-
Повернення повідомлення в чергу (Redelivery):
- Повідомлення повернеться в чергу і стане доступним іншим споживачам лише у двох випадках:
- Явна відмова: Виклик
/v1/reject/{tag}зrequeue: true. - Таймаут сесії: Middleware автоматично закриває неактивні з'єднання через 5 хвилин (idle timeout). В цей момент RabbitMQ детектить розрив з'єднання і повертає всі
Unackedповідомлення в чергу.
- Явна відмова: Виклик
- Повідомлення повернеться в чергу і стане доступним іншим споживачам лише у двох випадках:
Рекомендації для клієнтів:
- Успішна обробка: Завжди надсилайте
/v1/ack/{tag}після обробки. - Помилка обробки: Надсилайте
/v1/reject/{tag}.
Middleware використовує HTTP Basic Auth. Ви повинні надати ім'я користувача та пароль, які зареєстровані в RabbitMQ. Middleware підключиться від імені цього користувача.
import requests
from requests.auth import HTTPBasicAuth
url = "http://localhost:8000/v1/publish"
auth = HTTPBasicAuth('my_rmq_user', 'my_secret_pass')
payload = {
"exchange": "enterprise.core",
"routing_key": "order.created",
"payload": {
"order_id": 12345,
"amount": 99.99
},
"persistent": True,
"mandatory": True
}
response = requests.post(url, json=payload, auth=auth)
print(response.status_code) # 202
print(response.json())import requests
from requests.auth import HTTPBasicAuth
url = "http://localhost:8000/v1/fetch"
auth = HTTPBasicAuth('my_rmq_user', 'my_secret_pass')
payload = {
"queue": "orders.queue",
"timeout": 10,
"auto_ack": False
}
response = requests.post(url, json=payload, auth=auth)
if response.status_code == 200:
msg = response.json()
print("Отримано:", msg["body"])
# Підтвердження обробки (Acknowledgment)
ack_url = f"http://localhost:8000/v1/ack/{msg['delivery_tag']}"
requests.post(ack_url, auth=auth)
elif response.status_code == 204:
print("Черга порожня")| Ендпоінт | Метод | Опис | Auth |
|---|---|---|---|
/v1/publish |
POST | Публікація повідомлення | Basic |
/v1/fetch |
POST | Отримання повідомлення (Long-polling) | Basic |
/v1/ack/{tag} |
POST | Підтвердження повідомлення (Ack) | Basic |
/v1/reject/{tag} |
POST | Відхилення повідомлення (Reject) | Basic |
/health |
GET | Перевірка працездатності (Liveness) | None |
/ready |
GET | Перевірка готовності (Readiness) | None |
Цей проект ліцензовано під ліцензією MIT - дивіться файл LICENSE для деталей.