Skip to content

Support distributed sorting #57

Open
@LilithHafner

Description

@LilithHafner

I don't have the hardware on hand to test these algorithms, but here's the first distributed sorting algorithm that comes to mind. I'm writing it here in case anyone wants to implement it. (because this is the first algorithm that comes to mind for me it has probably been come up with countless times before, I do not claim to invent it)

Let n be the number of elements being sorted, and k be the number of cores/nodes/machines, and l be the amount of memory each node has. Assume that k is a power of two and let K = log2(k)

This algorithm is designed for the case when 3*k^2 < n < k*l/3. For k = 32, l = 256MB, sorting 64-bit records, that means 3,000 < n < 700,000,000

  1. Begin with the data partitioned evenly among nodes
  2. Perform a single K-bit pass of MSB radix sort at each node
  3. Redistribute data among nodes
  4. Sort the data locally at each node
  5. The concatenation of the data stored at each node is now sorted

In step 2, In the event of an unradixable record or highly nonuniform distribution of leading bits, compute a random sample of about k^2 and use the k-quantiles of the sample as keys in a binary search tree to create a balanced bucket assignment.

Naively, the redistribution requires all-pairs connectedness and O(k) time. Each node sends the appropriate chunk of data to each other node. This can be accomplished with message passing or with shared memory and a previous sharing of counts of how many elements are moved from each node to each other node.

I prefer radix sort for the within-node sorting whenever possible, even for highly nonuniform bit distributions. Another algorithm may be necessary for unradixable types, or unconventional hardware. For example, if each node is a computer with a GPU, then the local sorting may use this very algorithm!

This algorithm also serves as an external single or multithreaded sorting algorithm. In this case, each core is a region of disk space and they are loaded into memory sequentially, but provisionally I prefer an external-sort specific version for that case. As a TODO, it would be nice to unify these two (or even unify both with standard single threaded internal sorting)
function msb!(v, bits=1, n=2^bits, key=x -> x >> (64 - bits) + 1)
    # indices point to the next thing to go.
    # for a source that is the next element to take out
    # for a sink it is the next place to put in.
    chunk = length(v) ÷ n^2
    source = similar(v, chunk * n)
    sink = similar(v, chunk * n)
    counts = zeros(UInt, n + 1)
    counts[1] = firstindex(v)
    for x in v
        counts[key(x)+1] += 1
    end
    #println(Int.(counts))
    cumsum!(counts, counts)
    original_counts = copy(counts)

    sink_limits = chunk:chunk:n*chunk
    sink_indices = collect(1:chunk:n*chunk)
    @assert length(sink_indices) == length(sink_limits)

    source_index = lastindex(source) + 1
    for k in 1:n
        amount = min(chunk, original_counts[k+1] - counts[k])
        #println((amount, original_counts[k+1]-counts[k]))
        copyto!(source, source_index - amount, v, counts[k], amount)
        source_index -= amount
    end

    while true
        #println(Int.(counts))
        x = source[source_index]
        source_index += 1
        k = key(x)
        #println(k)
        sink_index = sink_indices[k]
        sink[sink_index] = x
        sink_indices[k] += 1
        source_index <= lastindex(source) || break
        if sink_index >= sink_limits[k]
            # sink bucket is full, copy to v and copy back some more elements from v
            copyto!(v, counts[k], sink, sink_index - chunk + 1, chunk)
            sink_indices[k] -= chunk
            counts[k] += chunk
            # there may not be a full chunk to copy back
            amount = min(chunk, original_counts[k+1] - counts[k])
            #println((amount, original_counts[k+1]-counts[k]))
            copyto!(source, source_index - amount, v, counts[k], amount)
            source_index -= amount
        end
    end

    # copy the rest of the sink into the original vector
    for i in 1:n
        start = sink_limits[i] - chunk + 1
        copyto!(v, counts[i], sink, start, sink_indices[i] - start)
    end

    v, original_counts
end

export msb_sort!
function msb_sort!(v, args...; kw...)
    _, c = msb!(v, args...; kw...)
    buffer = similar(v, 0)
    for i in firstindex(c):lastindex(c)-1
        sort!(view(v, c[i]:c[i+1]-1); buffer)
    end

    v
end

Asymptotic analysis (assuming radixable and reasonably balanced most significant bits) is O(n/k) time which is pretty good (the same as the distributed runtime of sum).

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions