Skip to content

Add docs rgd threads and thread-safety #536

Open
@ampeixoto

Description

@ampeixoto

I detected an issue when trying to make multiple calls concurrently.

Sometimes, if I make 4 concurrent calls to Session.call() I would only get 3 responses and worse, sometimes one of the responses was the payload of another procedure...

So I dig into the autobahn-java code and noticed the following:
In Session.java you have this:

    private <T> CompletableFuture<T> reallyCall(
            String procedure,
            List<Object> args, Map<String, Object> kwargs,
            CallOptions options,
            TypeReference<T> resultTypeReference,
            Class<T> resultTypeClass) {
        throwIfNotConnected();

        CompletableFuture<T> future = new CompletableFuture<>();

        long requestID = mIDGenerator.next();

        mCallRequests.put(requestID, new CallRequest(requestID, procedure, future, options,
                resultTypeReference, resultTypeClass));

        if (options == null) {
            send(new Call(requestID, procedure, args, kwargs, 0));
        } else {
            send(new Call(requestID, procedure, args, kwargs, options.timeout));
        }
        return future;
    }

And in IDGenerator.java this:

public class IDGenerator {
    private long mNext;

    public long next() {
        mNext += 1;
        if (mNext > 9007199254740992L) {
            mNext = 1;
        }
        return mNext;
    }
}

As you can see, that is not thread-safe. Neither mNext nor mCallRequests can be set concurrently.

And to prove it, I created a small snippet in kotlin:

fun main() = runBlocking {

    val scope = CoroutineScope(Job() + Dispatchers.IO)
    println("Start generating ids")
    val results = (1..50).map {
        generateIdsConcurrently(scope)
    }
    println("Results: $results")
    println("All successful: ${results.all { it }}")
}

private suspend fun generateIdsConcurrently(scope: CoroutineScope): Boolean {
    val tasks = mutableListOf<Job>()
    val idsMap = HashMap<Int, Int>()
    val numberOfIdsExpected = 10
    val idGenerator = IDGenerator()
    (1..numberOfIdsExpected).onEach { index ->
        val childJob = scope.launch {
            //this delay forces more failures
            delay(100)
            val id = idGenerator.next()
            idsMap[id.toInt()] = index
        }
        tasks.add(childJob)
    }

    tasks.joinAll()

    val expectedIds = idsMap.values.sorted()
    val generatedIds = idsMap.keys.sorted()
    return expectedIds == generatedIds
}

If we run this code, we can see that it almost always fails (created 50 trials to make it more frequent). So the generated IDs aren't always sequential.

Similar issues happens to Session.subscribe() also (and potentially other methods).

SOLUTION

  • First Step

Make the next() method synchronized:

public class IDGenerator {
    private long mNext;

    public synchronized long next() {
        mNext += 1;
        if (mNext > 9007199254740992L) {
            mNext = 1;
        }
        return mNext;
    }
}

This improved quite a lot but it was still failling sometimes.

  • Second Step

Replace the HashMap by a ConcurrentHashMap.
With this, the test passes 100% of the time.

QUESTIONS

  • Is my analysis correct or I am making some mistake?
  • Is there any hidden reason for why this is not thread safe?
  • Was this already detected before? I didn't find anything about it...
  • Is the caller of autobahn expected to externally synchronize the calls for some reason?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions