Skip to content

Add raw_decode method to JSON and MsgPack decoders #821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

akirchhoff-modular
Copy link

Adds a new raw_decode method on the JSON and MsgPack Decoder classes that allow decoding objects with trailing data after them. This mirrors the interface of the raw_decode method in the Python standard library's json.JSONDecoder class.

This can be used to, for example, parse several concatenated messages, enabling similar functionality to that available through msgpack.Unpacker in the msgpack package.

Adds a new `raw_decode` method on the JSON and MsgPack `Decoder` classes
that allow decoding objects with trailing data after them.  This mirrors
the interface of the `raw_decode` method in the Python standard
library's `json.JSONDecoder` class.

This can be used to, for example, parse several concatenated messages,
enabling similar functionality to that available through
`msgpack.Unpacker` in the `msgpack` package.
@pullpush-io
Copy link

Thanks for this one. Will this be useful for reading compressed files record-by-record? This is what I use the old msgpack lib for most:

        with open('archive.zst', 'rb') as fh:
            with zstd.ZstdDecompressor().stream_reader(fh, read_size=8388608) as sr:
                for obj in msgpack.Unpacker(sr):

@akirchhoff-modular
Copy link
Author

@pullpush-io

Thanks for this one. Will this be useful for reading compressed files record-by-record? This is what I use the old msgpack lib for most:

        with open('archive.zst', 'rb') as fh:
            with zstd.ZstdDecompressor().stream_reader(fh, read_size=8388608) as sr:
                for obj in msgpack.Unpacker(sr):

If you know ahead of time the maximum decompressed size of a MsgPack message ahead of time, then yes, it can be used for this, although it would require more code to deal with the details. For example:

def iter_messages_from_file(f, decoder, max_message_size):
    buffer = f.read(max_message_size)
    while buffer:
        decoded, bytes_used = decoder.raw_decode(buffer)
        yield decoded
        buffer = buffer[bytes_used:] + f.read(bytes_used)

I should note that the implementation in the snippet above is rather inefficient, as it will be doing a lot of memory copies (both the slice and the + will be copying roughly the entire buffer each iteration). You may be able to tweak it for better performance; I have not thought deeply about how you might do so.

If, on the other hand, the full decompressed data can be available in memory at once, it is quite efficient to iterate over all concatenated messages with minimal copying:

def iter_messages_in_buffer(data, decoder):
    start = 0
    with memoryview(data) as overall_view:
        while next_index < len(overall_view):
            with memoryview(overall_view[start:]) as slice_view:
                decoded, bytes_used = decoder.raw_decode(slice_view)
                yield decoded
                start += bytes_used

I have pondered efficiently handling the streaming case with unbounded message sizes myself, but it is far from straightforward to add. There are two cases:

  • The data read so far contains at least one complete message. raw_decode in this PR can help.
  • The data read so far contains a prefix of a message, but not yet a complete message. raw_decode will fail with a truncated message error.

If the message is truncated, you will need to read more data into the buffer and try again. There are two issues with this:

  • If the message is truncated, the error we get is a generic msgspec.DecodeError:

    msgspec/msgspec/_core.c

    Lines 477 to 482 in dd965dc

    static int
    ms_err_truncated(void)
    {
    PyErr_SetString(msgspec_get_global_state()->DecodeError, "Input data was truncated");
    return -1;
    }

    The message is constant, so you could perhaps try/except msgspec.DecodeError and compare the string to see if the error was due to truncation, but this feels brittle. You would likely want to modify msgspec in a separate PR to add a new exception type (e.g. TruncationError or similar, subclassing from DecodeError), to be able to better handle this error programmatically without needing to compare error strings.

  • Even with the above addressed, there are some efficiency issues. Suppose that your chunk size is 1 byte, and a complete message is "hello". You start with "h" in the buffer. You attempt to decode "h" and get a truncation, read an additional byte. You attempt to decode "he" and get a truncation, read an additional byte. Attempt to decode "hel" and get a truncation, read an additional byte, and so on. If each attempt to decode is O(n) in the length of the message, then attempting to decode in a streaming manner using a "read more on truncation" strategy would become O(n^2) time to decode overall. This quadratic complexity can be avoided if you can bound the size of a message, as I required in my first example.

To resolve this correctly, you would need to be able to halt and resume the MsgPack decoder. Having read the implementation of msgspec, I think this would hugely complicate the core, and is unlikely to be in scope for the project, though it is ultimately not my decision to make.

I did come up with one workaround that allows you to use msgspec while still handling unbounded message sizes from streaming sources. The key observation is that doing a very cursory parse of a MsgPack message just to determine its length, in a way that can be suspended and resumed, is very easy, due to some specifics of the MsgPack format. Suppose I had a "length detector" object to which you could feed data chunks, and it would be able to tell you once one of those chunks completed a message (and where in the chunk the message had been completed). Then you could do something like this:

length_detector = LengthDetector()
buffer = bytearray()
while True:
    chunk = f.read(1)  # Obviously inefficient, but cuts down on bookkeeping for the sake of example.
    if not chunk:
        break
    buffer.extend(chunk)
    # completes_message returns None if the accumulated message remains incomplete.
    # Returns the index into the provided chunk at which the accumulated message ends,
    # if the chunk provided completes the message.
    if length_detector.completes_message(chunk):
        # Length detector has detected that buffer contains a complete message.
        yield msgspec.msgpack.decode(buffer, Example)
        buffer.clear()

This is able to decode streaming messages of unbounded size in O(n) time. You may notice it also can run with normal decode, and does not require raw_decode.

I have prototyped in my personal time such a LengthDetector implementation. It is pretty small and fast, a couple hundred lines of C available to Python as an extension module (albeit pretty messy — I was more interested in golfing the binary size and performance for fun than I was in readability). I am thinking about uploading it to PyPI eventually, though it's not a high priority for me. If it would be useful to you, I can see about uploading it soon.

I would also be happy to contribute the implementation to msgspec if the maintainer so chooses, though it does feel somewhat disconnected from the rest of msgspec's functionality, which is why it's not my first choice to do so.

@pullpush-io
Copy link

My familiarity with the source code here is extremely limited, but I think I saw skip functionality? Usually this can be used to probe the buffer and establish if we have a complete decode.

@akirchhoff-modular
Copy link
Author

@pullpush-io

My familiarity with the source code here is extremely limited, but I think I saw skip functionality? Usually this can be used to probe the buffer and establish if we have a complete decode.

I see that; mpack_skip appears to be the function in question. That does work, but it can still take O(n) time to do each probe (though the constant factors are lower for skipping than they are for normal parsing). Whenever you determine the buffer was too short and add new data to the buffer, the next time you run mpack_skip, it will still need to reprocess the data it had previously processed, making the overall time O(n^2). Depending on your use case this may or may not be a problem. If your data consists primarily of long strings, you may not notice the effects at all, but if your data has long arrays of small integers, the effects can be debilitating quickly.

In any case, I think it's probably out of scope of this PR. The interface introduced in this PR can be efficiently used when the entire sequence of messages is already available in memory. This is the same use case that the standard library chooses to support in json.JSONDecoder.raw_decode. It makes streaming decode possible, albeit inefficiently. Admitting efficient streaming decode requires capabilities not currently within msgspec, and is not what I'm trying to solve in this PR.


Appendix: If you want to measure mpack_skip, one way to access it from Python code is to decode a msgspec.Struct whose only MessagePack field is not present in the struct. For example:

class A(msgspec.Struct):
    a: Any
class B(msgspec.Struct):
    pass
data = msgspec.msgpack.encode(A([0] * 1_000_000))
# 'data' at the top level is a mapping with a key 'a',
# but type B has no field 'a',
# so msgspec will invoke mpack_skip to skip over the value of key 'a'.
msgspec.msgpack.decode(data, type=B)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants