Skip to content

Latest commit

 

History

History
214 lines (157 loc) · 8.78 KB

File metadata and controls

214 lines (157 loc) · 8.78 KB
title Azure Service Bus
description Consume messages from Azure Service Bus queues and topic subscriptions with configurable receive modes and concurrency settings.

import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';

Azure Service Bus

Azure Service Bus event integrations consume messages from a queue or topic subscription and trigger event handlers as each message arrives. Use them for reliable enterprise messaging, decoupled service-to-service communication, and workflows that require ordered or transactional message delivery.

Creating an Azure Service Bus service

  1. Click + Add Artifact in the canvas or click + next to Entry Points in the sidebar.

  2. In the Artifacts panel, select Azure Service Bus under Event Integration.

  3. In the creation form, fill in the following fields:

    Azure Service Bus Event Integration creation form

    Field Description
    Connection String Service Bus connection string with Shared Access Signatures. Format: Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY_VALUE>. Required.
    Entity Config Configuration for the queue or topic subscription to connect to. Accepts a QueueConfig or TopicSubsConfig record expression. Required.

    Expand Advanced Configurations to set the listener name.

    Field Description Default
    Listener Name Identifier for the listener created with this service. asbListener
  4. Click Create.

  5. WSO2 Integrator opens the service in the Service Designer. The canvas shows the attached listener pill and the Event Handlers section.

    Service Designer showing the Azure Service Bus service canvas

  6. Click + Add Handler to add event handlers.

import ballerinax/asb;
import ballerina/log;

configurable string connectionString = ?;

listener asb:Listener asbListener = new ({
    connectionString: connectionString,
    entityConfig: {
        queueName: "invoices"
    }
});

service on asbListener {

    remote function onMessage(asb:Message message) returns error? {
        log:printInfo("Message received", messageId = message.messageId);
    }

    remote function onError(asb:MessageRetrievalError err) returns error? {
        log:printError("Azure Service Bus error", 'error = err);
    }
}

Listener configuration

In the Service Designer, click the Configure icon in the header to open the Azure Service Bus Event Integration Configuration panel. Select asbListener under Attached Listeners to configure the listener.

Listener configuration — connection string and entity config

Field Description Default
Name Identifier for the listener. asbListener
Connection String Service Bus connection string with Shared Access Signatures. Required
Entity Config Configuration for the target queue or topic subscription. The entity type is determined by the record type — use QueueConfig for queues or TopicSubsConfig for topic subscriptions. Required
Receive Mode How messages are retrieved from the entity. PEEK_LOCK holds a lock on the message until completed or abandoned. RECEIVE_AND_DELETE removes the message immediately on receipt. PEEK_LOCK
Max Auto Lock Renew Duration Maximum lock renewal duration in seconds under PEEK_LOCK mode. Set to 0 to disable auto-renewal. Auto-renewal is disabled for RECEIVE_AND_DELETE mode. 300
Amqp Retry Options Retry configuration for the underlying AMQP message receiver. {}
Additional Values Key-value pairs for additional connection or entity configuration. {}
Auto Complete When enabled, messages are automatically completed on successful handler execution and abandoned on failure.
Prefetch Count Number of messages to prefetch from the broker. 0
Max Concurrency Maximum number of concurrent messages this listener processes at one time. 0

Click + Attach Listener to attach an additional listener to the same service.

Click Save Changes to apply updates.

listener asb:Listener asbListener = new ({
    connectionString: connectionString,
    entityConfig: {
        queueName: "invoices"
    },
    receiveMode: asb:PEEK_LOCK,
    maxAutoLockRenewDuration: 300,
    autoComplete: false,
    prefetchCount: 0,
    maxConcurrency: 5
});

asb:ListenerConfiguration fields:

Field Type Default Description
connectionString string Required Service Bus SAS connection string
entityConfig asb:QueueConfig|asb:TopicSubsConfig Required Target queue or topic subscription
receiveMode asb:ReceiveMode PEEK_LOCK Message retrieval mode
maxAutoLockRenewDuration int 300 Lock renewal timeout in seconds under PEEK_LOCK
amqpRetryOptions asb:AmqpRetryOptions? AMQP retry configuration
additionalValues map<string>? Additional key-value configuration
autoComplete boolean? Auto-complete messages on success
prefetchCount int 0 Number of messages to prefetch
maxConcurrency int 0 Maximum concurrent message processing

Event handlers

Azure Service Bus services support two handler types — onMessage and onError — both added directly without additional configuration.

Adding an event handler

In the Service Designer, click + Add Handler. The Select Handler to Add panel lists onMessage and onError. Click either handler to add it directly — no additional configuration is required.

Handler Triggered when
onMessage A new message arrives from the queue or topic subscription
onError A message retrieval or processing error occurs

onMessage handler — called for each message received:

type InvoiceMessage record {|
    string invoiceId;
    string vendorId;
    decimal amount;
    string currency;
|};

service on asbListener {

    remote function onMessage(asb:Message message,
                              asb:Caller caller) returns error? {
        InvoiceMessage invoice = check message.body.ensureType();
        log:printInfo("Invoice received", invoiceId = invoice.invoiceId);

        check processInvoice(invoice);
        // Complete the message to remove it from the queue
        check caller->complete(message);
    }
}

onError handler — called when message retrieval fails:

service on asbListener {

    remote function onError(asb:MessageRetrievalError err) returns error? {
        log:printError("Azure Service Bus error", 'error = err);
    }
}

Message type

The onMessage handler receives an asb:Message parameter with the message content and metadata.

Field Type Description
body anydata Message payload. Use message.body.ensureType() to cast to a typed record.
messageId string? Unique message identifier set by the sender.
contentType string? MIME content type of the message body (e.g., application/json).
correlationId string? Correlation identifier for request/reply patterns.
subject string? Application-specific message subject.
enqueuedTime string? Time the message was added to the queue.
properties map<anydata>? Custom application properties attached to the message.

What's next