# nautchkafe-rabbitmq 
Lightweight, flexible, and functional message-passing pubsub system built with RabbitMQ.
This project allows easy publishing and subscribing of messages using a publish-subscribe architecture.
- Publisher and Subscriber Interfaces: Simplifies sending and receiving messages through RabbitMQ using the publish-subscribe pattern.
- Lightweight and Extensible: Easily configurable RabbitMQ setup with the ability to extend or customize the transport layer to fit various message delivery needs.
- Pure Functions: The code is designed with pure functions, eliminating side effects and making it easier to reason about and
- No simulation of errors or results; simply publish and subscribe to topics in a straightforward manner.
- Clean, minimalistic design with focus on decoupling the message transport logic from application logic.
- Supports multiple topics (queues) for publishing and subscribing messages in a highly decoupled manner.
- When a message is delivered to a consumer, it becomes unavailable to other consumers until it is either acknowledged or rejected. This mechanism eliminates the risk of racecondition, deadlock, ... between consumers attempting to access the same message.
- Message Acknowledgments (ACK) RabbitMQ employs an acknowledgment system to manage the receipt and processing of messages:
Consumer acknowledgment (ack): The consumer informs RabbitMQ that the message has been successfully processed. If the message is not acknowledged within a specified time frame (or if the consumer crashes), RabbitMQ requeues the message, making it available for redelivery.
- Prefetching and Load Management RabbitMQ allows the configuration of a prefetch count parameter, which controls the number of messages delivered to a consumer at a time:
This limits the number of messages processed concurrently by a single consumer. It prevents scenarios where a consumer becomes overwhelmed with an excessive number of tasks.
- RabbitPublisher: An interface to send messages to a specific RabbitMQ topic.
- RabbitSubscriber: An interface to handle incoming messages from a RabbitMQ topic.
- RabbitTransport: Implementation that connects to RabbitMQ, publishes messages, and subscribes to topics.
- RabbitLockMapper: Implementation for lock acquire system in java.concurrent.
- RabbitClientCredintials: A factory class that handles creating and managing connections to RabbitMQ using the provided configuration.
To get started, clone the repository and build the project:
git clone https://github.com/noyzys/nautchkafe-rabbit-bridge.git
cd nautchkafe-rabbit-bridgepublic static void main(String[] args) {
    
    // Base scope configuration, register TOPIC
    RabbitClientConnector clientConnector = new RabbitClientConnector();
    RabbitClientResource<RabbitTransport<RabbitUser>> resource = RabbitTransport.createResource(clientConnector);
    // Example RabbitUser message
    RabbitUser userMessage = new RabbitUser("Frankie", 40);
    // Sync
    resource.use(transport -> {
        transport.peekLeft(error -> System.err.println("Connection failed: " + error.getMessage()))
            .peek(result -> result.publish("server", userMessage))
            .peek(result -> result.subscribe("server", message -> {
            System.out.println("> Received message: " + message);
            return Either.right(null);
        })).peek(result -> result.close());
        // Async
        transport.peekLeft(error -> System.err.println("Connection failed: " + error.getMessage()))
            .peek(result -> result.publishAsync("server_async", userMessage))
            .peek(result -> result.subscribeAsync("server_async", message -> {
            System.out.println("> Received async message: " + message);
            return Future.successful(null);
        })).peek(result -> result.closeAsync().onFailure(error -> System.err.println("Failed to close: " + error.getMessage())));
    });
}
// BY DEFAULT JACKSON DATABIND OBJECT SERDES.public static void main(String[] args) {
    RabbitClientConnector clientConnector = new RabbitClientConnector();
    RabbitClientResource<RabbitTransport<RabbitUser>> resource = RabbitTransport.createResource(clientConnector);
    resource.use(transport -> {
        // List of topics to subscribe to and publish to
        List<String> topics = List.of("server-1", "server-2", "server-3");
        // Example RabbitUser message
        RabbitUser userMessage = new RabbitUser("Frankie", 40);
        // Publish a message to multiple topics synchronously
        transport.peekLeft(error -> System.err.println("Connection failed: " + error.getMessage()))
            .peek(result -> result.publishMultiple(topics, userMessage))
            .peek(result -> System.out.println("> Published message to multiple topics"));
        // Subscribe to multiple topics asynchronously
        transport.peekLeft(error -> System.err.println("Connection failed: " + error.getMessage()))
            .peek(result -> result.subscribeToMultipleTopics(topics, message -> {
            
            System.out.println("> Received message from topic: " + message);
            return Future.successful(null); // Handle asynchronously
        })).peek(result -> System.out.println("> Subscribed to multiple topics"));
        // Publish a message asynchronously to multiple topics
        transport.peekLeft(error -> System.err.println("Connection failed: " + error.getMessage()))
            .peek(result -> result.publishMultipleAsync(topics, userMessage))
            .peek(result -> System.out.println("> Published async message to multiple topics"));
        // Subscribe to multiple topics asynchronously with async handling
        transport.peekLeft(error -> System.err.println("Connection failed: " + error.getMessage()))
            .peek(result -> result.subscribeToMultipleTopics(topics, message -> {
                System.out.println("Received async message from topic: " + message);
                return Future.successful(null); // Handle asynchronously
        })).peek(result -> System.out.println("> Subscribed asynchronously to multiple topics"));
        // Close the transport after operations are completed
        transport.peekLeft(error -> System.err.println("Connection failed: " + error.getMessage()))
            .peek(result -> result.close())
            .peek(result -> System.out.println("> Transport closed"));
        // Close asynchronously
        transport.peekLeft(error -> System.err.println("Connection failed: " + error.getMessage()))
            .peek(result -> result.closeAsync().onFailure(error -> System.err.println("Failed to close: " + error.getMessage())))
            .peek(result -> System.out.println("> Transport closed asynchronously"));
    });
}RabbitLockMapper lock = new RabbitLockMapper();
lock.acquire(message, input -> {
    System.out.println("Received message: " + input);
    return Either.right(null);
}).apply(message);
return Either.right(null);publish, subscribe topic -> "server", "server_async"
> Received message: sync message
> Received async message: async message
RabbitUser{name='Frankie', age=40}
Multiple operations
multi::publish, subscribe topic -> "server-1", "server-2", "server-3"
> Published message to multiple topics
> Subscribed to multiple topics
> Received message from topic: RabbitUser{name='Frankie', age=40}
> Received message from topic: RabbitUser{name='Frankie', age=40}
> Received message from topic: RabbitUser{name='Frankie', age=40}
> Published async message to multiple topics
> Subscribed asynchronously to multiple topics
> Received async message from topic: RabbitUser{name='Frankie', age=40}
> Received async message from topic: RabbitUser{name='Frankie', age=40}
> Received async message from topic: RabbitUser{name='Frankie', age=40}
> Transport closed
> Transport closed asynchronously
If you are interested in exploring functional programming and its applications within this project visit the repository at vavr-in-action, fp-practice.