Skip to content

[QUERY] Azure ServiceBus: Completing messages from another thread #48748

Open
@mystiqu

Description

@mystiqu

Library name and version

Azure.Messaging.ServiceBus 7.10.0

Query/Question

Hi

We have a scenario where we process tracking related messages from a queue and currently have different issues like race conditions (when having multiple instances) and running out of sql handles.

Each message is a part of a longer "chain" of messages that uses the same transaction id (throughout the system), and we update the status of a single unique record in the database for each message with the same transaction id. We usually have around 1 million messages / day.

We're experimenting with using a session-enabled queue instead and storing messages for a specific sessionId in memory until they are "old enough" (usually all messages are received within a few seconds), and then process the entire batch at once within the lock duration. This would guarantee it works across instances since a session is locked for all clients and greatly reduce the database traffic (se we can e.g. replace 15 connections and sql statements with a single one). However we just ran into trouble since it seems we cannot commit the message from another thread.

We used a ServiceBusSessionProcesser, but that required us to complete the message directly in the message handler. Same limitation seems to be present of using a ServiceBusTrigger in a Function App, the completeMessageAsync() call must be made from the tread that received the message.

Is there any "best practice" for a situation like this or is it completely out of the question to complete a message "later in time" (but within the lock duration) from another thread? Part from the completeMessageAsync issue call we're gotten pretty good performance (good enough, ~400ms/s) - but perhaps we need to rethink our architecture...

Demo Code

using Azure.Messaging.ServiceBus;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using System.Timers;

namespace ServiceBus.DLQ.Reader
{
   public class Demo
    {
        public static async Task Main(string[] args)
        {
            DempoSessionProcessReader reader = new DempoSessionProcessReader(
                new ServiceBusClient("Endpoint=sb://*****.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*****"), "mikes.test.queue.session");

            try
            {
                //Create 10 processors with 10 concurrent sessions, prefetch 10 messages and set the session timeout to 2s.
                //Do not autocomplete the message
                await reader.CreateSessionProcessors(10, 10, 10, 2, false);

                //To stop!
                Console.ReadKey();
                return;

            }
            catch (Exception ex)
            {
            }
        }
    }

    /// <summary>
    /// A class for grouping all messages for a particula session.
    /// The timestamp is to keep track of when to start processing them (persist)
    /// </summary>
    public class GroupedSession
    {
        public DateTime Timestamp { get; set; } //When this was created (i.e. when the first message for a message was added
        public List<ProcessSessionMessageEventArgs> Messages { get; set; }

        public GroupedSession()
        {

        }
    }
    public class DempoSessionProcessReader
    {
        List<ServiceBusSessionProcessor> processorList =new List<ServiceBusSessionProcessor>();
        List<Guid> guids = new List<Guid>();
        public ServiceBusClient _client;
        public string Queue { get; set; }
        private static bool _automcomplete = false;

        /// <summary>
        /// A list of messages with the sessionId as the key
        /// </summary>
        public static IDictionary<string, GroupedSession> _messages = new Dictionary<string, GroupedSession>();
        static object _lock = new object();
        private delegate void FlowHandlerDelegate(string sessionId, ProcessSessionMessageEventArgs[] messages); 
        static Timer _timer = new Timer();

        public DempoSessionProcessReader(ServiceBusClient client, string queue)
        {
            _client = client; 
            Queue = queue;

            //Generate some guids for testing purposes which we can use when preloading the queue with messages
            for(int i=0;i<1000;i++)
                guids.Add(Guid.NewGuid());

            //Our timer that continuously checks the list of received messages (grouped by sessionId) 
            _timer.Elapsed += ProcessMessageTimerTick;
            _timer.Interval = 2000;
            _timer.Enabled = true;
            _timer.Start();
        }

        /// <summary>
        /// Creates one or more ServiceBusSessionProcessor instances
        /// </summary>
        /// <param name="numberOfProcesses">Number of processor instances</param>
        /// <param name="concurrentSessionsPerProcessor">Number of concurrent sessions per processor</param>
        /// <param name="prefetchCount">Prefetch count (default = 1)</param>
        /// <param name="sessionTimeout">Session timeout in seconds</param>
        /// <param name="autocompleteMessage">Should the messages handler auto-complete the message?</param>
        /// <returns></returns>
        public async Task CreateSessionProcessors(int numberOfProcesses, int concurrentSessionsPerProcessor, int prefetchCount, int sessionTimeout, bool autocompleteMessage)
        {
            ServiceBusSessionProcessor processor = null!;

            for (int i=0;i<numberOfProcesses;i++)
            {
                _automcomplete = autocompleteMessage;

                processor = _client.CreateSessionProcessor("mikes.test.queue.session", new ServiceBusSessionProcessorOptions()
                {
                    ReceiveMode = ServiceBusReceiveMode.PeekLock,
                    PrefetchCount = prefetchCount > 0 ? prefetchCount : 1,
                    AutoCompleteMessages = autocompleteMessage,
                    SessionIdleTimeout = TimeSpan.FromSeconds(sessionTimeout),
                    MaxConcurrentSessions = concurrentSessionsPerProcessor
                });

                processor.ProcessMessageAsync += MessageHandler;
                processor.ProcessErrorAsync += ErrorHandler;

                processorList.Add(processor);
            }

            for (int i = 0; i < numberOfProcesses; i++) {
                await processorList[i].StartProcessingAsync();
            }
        }

        Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception);
            return Task.CompletedTask;
        }

        /// <summary>
        /// The message handler adds each message to a dictionary with the sessionId as a key
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        async Task MessageHandler(ProcessSessionMessageEventArgs args)
        {
            string sessionId = args.Message.SessionId;

            lock (_lock)
            {
                if (!_messages.ContainsKey(sessionId))
                    _messages.Add(sessionId, new GroupedSession() 
                    { 
                        Timestamp = DateTime.Now,
                        Messages = new List<ProcessSessionMessageEventArgs>() { args }
                    });
                else
                    _messages[sessionId].Messages.Add(args);
            }

            await Task.CompletedTask;
        }

        /// <summary>
        /// Our timer handler that checks the list for sessions that are ready to be processed (persisted)
        /// </summary>
        /// <param name="source"></param>
        /// <param name="e"></param>
        private void ProcessMessageTimerTick(object source, ElapsedEventArgs e)
        {
            //Loop over all session Id's and see if any of them are older than 5 seconds
            //In that case, remove it from the dictionary, process/persist them and them complete the messages
            for (int i = 0; i < _messages.Count; i++) {
                if(_messages.Count > i)
                {
                    if (_messages.ElementAt(i).Value.Timestamp.AddSeconds(5) < DateTime.Now)
                    {
                        lock (_lock)
                        {
                            string sessionId = _messages.ElementAt(i).Key;
                            ProcessSessionMessageEventArgs[] array = new ProcessSessionMessageEventArgs[_messages[sessionId].Messages.Count];
                            _messages[sessionId].Messages.CopyTo(array, 0); //we could decorate it as "processed" as well and have another background thread take care of the cleaning
                            Array.Sort(array);
                            //InsertIntoDatabase(...)
                            CompleteMessageList(array).GetAwaiter().GetResult(); //Not allowed to complete here unfortunately :/ 
                            _messages.Remove(sessionId);
                        }
                    }
                }
            }
        }

        /// <summary>
        /// Complets a list of messages
        /// </summary>
        /// <param name="messages">List of ProcessSessionMessageEventArgs that was received by the ServiceBusSessionProcessor handler</param>
        /// <returns></returns>
        private async Task CompleteMessageList(ProcessSessionMessageEventArgs[] messages)
        {
            try
            {
                if (!_automcomplete) //Just for testing purposes
                {
                    for (int i = 0; i < messages.Length; i++)
                    {
                        await messages[i].CompleteMessageAsync(messages[i].Message);
                    }
                }
            }
            catch(Exception ex) 
            {
                //Here we get the follow error:
                // "ServiceBusReceiver has already been closed and cannot perform the requested operation.\r\nObject name: 'ServiceBusReceiver'."
            }
        }
    }

    /// <summary>
    /// A simple date sorter that sorts in ascending order
    /// </summary>
    public class DateSorter : IComparer
    {

        // Calls CaseInsensitiveComparer.Compare with the parameters reversed.
        int IComparer.Compare(object x, object y)
        {
            DateTime a = DateTime.ParseExact((string)((ProcessSessionMessageEventArgs)x).Message.ApplicationProperties["senttimestamp"], "yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture);
            DateTime b = DateTime.ParseExact((string)((ProcessSessionMessageEventArgs)y).Message.ApplicationProperties["senttimestamp"], "yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture);

            if (a > b)
                return 1;
            if (a < b)
                return -1;

            return 0;
        }
    }
}

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Labels

ClientThis issue points to a problem in the data-plane of the library.Service Buscustomer-reportedIssues that are reported by GitHub users external to the Azure organization.issue-addressedWorkflow: The Azure SDK team believes it to be addressed and ready to close.questionThe issue doesn't require a change to the product in order to be resolved. Most issues start as that

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions