Skip to content

Fast API #596

@ronag

Description

@ronag

Part of the bottlenecks for Piscina at the moment is that we need to transfer data to the worker. Which is not efficient with the current implementation in NodeJS. This means that the tasks sent to piscina need to be large enough that the overhead of sending the task is not too bug.

One way around this would be to avoid the postMessage API's and use a ring buffer to write the tasks + data and let the user provide a Buffer serialize/deserialize function.

Here is a ring buffer implementation we have been using internally for inspiration:

import assert from 'node:assert'

// Make sure write and read are in different
// cache lines.
const WRITE_INDEX = 0
const READ_INDEX = 16

export function alloc(size) {
  return {
    sharedState: new SharedArrayBuffer(128),
    sharedBuffer: new SharedArrayBuffer(size),
  }
}

export function reader({ sharedState, sharedBuffer, hwmBytes = 128 * 1024, hwmItems = 1024 }) {
  const state = new Int32Array(sharedState)
  const size = sharedBuffer.byteLength
  const buffer = Buffer.from(sharedBuffer)
  const view = new DataView(sharedBuffer)
  const data = { buffer, view, offset: 0, length: 0 }

  let readPos = Atomics.load(state, READ_INDEX) | 0
  let notifying = false

  function _notify() {
    notifying = false
    Atomics.store(state, READ_INDEX, readPos)
  }

  function read(next, arg1, arg2, arg3) {
    let items = 0
    let bytes = 0

    const writePos = Atomics.load(state, WRITE_INDEX) | 0

    while (items < hwmItems && bytes < hwmBytes && readPos !== writePos) {
      const dataPos = readPos + 4
      const dataLen = view.getInt32(dataPos - 4, true) | 0

      if (dataLen === -1) {
        readPos = 0
      } else {
        assert(dataLen >= 0)
        assert(dataPos + dataLen <= size)

        readPos += 4 + dataLen
        items += 1
        bytes += dataLen

        data.offset = dataPos
        data.length = dataLen
        next(data, arg1, arg2, arg3)
      }
    }

    // Defer notify so that the returned buffers are valid for at least
    // one microtick.
    if (items > 0 && !notifying) {
      notifying = true
      setImmediate(_notify)
    }

    return items
  }

  return { read }
}

export function writer({ sharedState, sharedBuffer }, { yield: _yield, logger } = {}) {
  const state = new Int32Array(sharedState)
  const size = sharedBuffer.byteLength
  const buffer = Buffer.from(sharedBuffer)
  const view = new DataView(sharedBuffer)
  const data = { buffer, view, offset: 0, length: 0 }

  let readPos = Atomics.load(state, READ_INDEX) | 0
  let writePos = Atomics.load(state, WRITE_INDEX) | 0
  let notifying = false
  let yielding = false

  function _notify() {
    notifying = false
    Atomics.store(state, WRITE_INDEX, writePos)
  }

  function _acquire(len, update) {
    // len + {current packet header} + {next packet header}
    const required = len + 4 + 4
    assert(required >= 0)
    assert(required <= size)

    if (writePos >= readPos) {
      // 0----RxxxxxxW---S
      if (size - writePos >= required) {
        return true
      }

      if (readPos === 0) {
        return false
      }

      view.setInt32(writePos, -1, true)

      writePos = 0

      assert(writePos + 4 <= size) // must have room for next header also
      assert(writePos !== readPos)

      Atomics.store(state, WRITE_INDEX, writePos)
    }

    // 0xxxxW------RxxxS
    return readPos - writePos >= required
  }

  function _write(len, fn, arg1, arg2, arg3) {
    const dataPos = writePos + 4

    data.offset = dataPos
    data.length = len
    const dataLen = fn(data, arg1, arg2, arg3) - dataPos

    assert(dataLen <= len + 4)
    assert(dataLen >= 0)
    assert(dataPos + dataLen <= size)

    view.setInt32(dataPos - 4, dataLen, true)

    writePos += 4 + dataLen

    assert(writePos + 4 <= size) // must have room for next header also
    assert(writePos !== readPos)

    if (!notifying) {
      notifying = true
      queueMicrotask(_notify)
    }

    return true
  }

  // TODO (fix): _sleep/_wait is a bit hacky and should at least have some
  // observability in tracing or logging.
  function write(len, fn, arg1, arg2, arg3) {
    // len + {current packet header} + {next packet header} + {alignment}
    const required = len + 4 + 4 + 8 + 128 // TODO (fix): Remove extra + 128
    assert(required >= 0)
    assert(required <= size)
    assert(!yielding, 'yielding')

    for (let n = 0; !_acquire(required); n++) {
      assert(n < 1000, 'deadlock')

      if (n > 0) {
        if (n === 1) {
          logger?.warn('yielding', { readPos, writePos })
        }

        if (_yield) {
          yielding = true
          try {
            _yield?.()
          } finally {
            yielding = false
          }
        }

        Atomics.wait(state, READ_INDEX, readPos, n)
      } else {
        Atomics.store(state, WRITE_INDEX, writePos)
      }

      readPos = Atomics.load(state, READ_INDEX) | 0
    }

    _write(len, fn, arg1, arg2, arg3)

    assert(writePos !== readPos)
  }

  return { write }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions