-
Notifications
You must be signed in to change notification settings - Fork 73
Add Source and Sink extensions for Apple's NSInputStream and NSOutputStream #174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
c0f042f
13bf3b1
3386ab3
6b4bb80
8ca65a2
2be70a6
54e247c
dccd22a
2ba8bed
d7b8e1d
62757c5
613e2be
e5f5c27
f5dfc1a
c9d1f44
242fda0
ead4f78
f9c9305
18ff1d0
c4eb1b9
56bb0e4
9d53c8e
4e3ff87
aa5830c
6f076d9
f789518
feb3145
3508104
9e71d4b
f40d472
caba9b4
afab5b7
365c354
a19c463
0021412
211c5f5
ae33893
5d14977
5cfafa2
e9fcaeb
50fe63f
89a4f80
e296c01
7b3ab06
c74845e
d2d040d
630423c
110e56c
6d034e9
ed5902e
07318d0
4897741
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| package kotlinx.io | ||
|
|
||
| import kotlinx.cinterop.UnsafeNumber | ||
| import kotlinx.cinterop.addressOf | ||
| import kotlinx.cinterop.convert | ||
| import kotlinx.cinterop.usePinned | ||
| import platform.Foundation.* | ||
|
|
||
| @OptIn(UnsafeNumber::class) | ||
| internal fun Exception.toNSError() = NSError( | ||
| domain = "Kotlin", | ||
| code = 0, | ||
| userInfo = mapOf( | ||
| NSLocalizedDescriptionKey to message, | ||
| NSUnderlyingErrorKey to this | ||
| ) | ||
| ) | ||
|
|
||
| internal fun ByteArray.toNSData() = if (isNotEmpty()) { | ||
| usePinned { | ||
| @OptIn(UnsafeNumber::class) | ||
| NSData.create(bytes = it.addressOf(0), length = size.convert()) | ||
| } | ||
| } else { | ||
| NSData.data() | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| /* | ||
| * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. | ||
| * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. | ||
| */ | ||
|
|
||
| package kotlinx.io | ||
|
|
||
| import kotlinx.cinterop.* | ||
| import platform.Foundation.NSInputStream | ||
| import platform.Foundation.NSOutputStream | ||
| import platform.Foundation.NSStreamStatusNotOpen | ||
| import platform.darwin.UInt8Var | ||
|
|
||
| /** | ||
| * Returns [RawSink] that writes to an output stream. | ||
| * | ||
| * Use [RawSink.buffered] to create a buffered sink from it. | ||
| * | ||
| * @sample kotlinx.io.samples.KotlinxIoSamplesApple.outputStreamAsSink | ||
| */ | ||
| public fun NSOutputStream.asSink(): RawSink = OutputStreamSink(this) | ||
|
|
||
| private open class OutputStreamSink( | ||
| private val out: NSOutputStream, | ||
| ) : RawSink { | ||
|
|
||
| @OptIn(UnsafeNumber::class) | ||
| override fun write(source: Buffer, byteCount: Long) { | ||
| if (out.streamStatus == NSStreamStatusNotOpen) out.open() | ||
|
|
||
| checkOffsetAndCount(source.size, 0, byteCount) | ||
| var remaining = byteCount | ||
| while (remaining > 0) { | ||
| val head = source.head!! | ||
| val toCopy = minOf(remaining, head.limit - head.pos).toInt() | ||
| val bytesWritten = head.data.usePinned { | ||
| val bytes = it.addressOf(head.pos).reinterpret<UInt8Var>() | ||
| out.write(bytes, toCopy.convert()).toLong() | ||
| } | ||
|
|
||
| if (bytesWritten < 0L) throw IOException(out.streamError?.localizedDescription ?: "Unknown error") | ||
jeffdgr8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (bytesWritten == 0L) throw IOException("NSOutputStream reached capacity") | ||
|
|
||
| head.pos += bytesWritten.toInt() | ||
| remaining -= bytesWritten | ||
| source.size -= bytesWritten | ||
|
|
||
| if (head.pos == head.limit) { | ||
| source.head = head.pop() | ||
| SegmentPool.recycle(head) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| override fun flush() { | ||
| // no-op | ||
| } | ||
|
|
||
| override fun close() = out.close() | ||
|
|
||
| override fun toString() = "RawSink($out)" | ||
| } | ||
|
|
||
| /** | ||
| * Returns [RawSource] that reads from an input stream. | ||
| * | ||
| * Use [RawSource.buffered] to create a buffered source from it. | ||
| * | ||
| * @sample kotlinx.io.samples.KotlinxIoSamplesApple.inputStreamAsSource | ||
| */ | ||
| public fun NSInputStream.asSource(): RawSource = NSInputStreamSource(this) | ||
|
|
||
| private open class NSInputStreamSource( | ||
| private val input: NSInputStream, | ||
| ) : RawSource { | ||
|
|
||
| @OptIn(UnsafeNumber::class) | ||
| override fun readAtMostTo(sink: Buffer, byteCount: Long): Long { | ||
| if (input.streamStatus == NSStreamStatusNotOpen) input.open() | ||
jeffdgr8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if (byteCount == 0L) return 0L | ||
| checkByteCount(byteCount) | ||
|
|
||
| val tail = sink.writableSegment(1) | ||
| val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit) | ||
| val bytesRead = tail.data.usePinned { | ||
| val bytes = it.addressOf(tail.limit).reinterpret<UInt8Var>() | ||
| input.read(bytes, maxToCopy.convert()).toLong() | ||
| } | ||
|
|
||
| if (bytesRead < 0L) throw IOException(input.streamError?.localizedDescription ?: "Unknown error") | ||
jeffdgr8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (bytesRead == 0L) { | ||
| if (tail.pos == tail.limit) { | ||
| // We allocated a tail segment, but didn't end up needing it. Recycle! | ||
| sink.head = tail.pop() | ||
| SegmentPool.recycle(tail) | ||
| } | ||
| return -1 | ||
| } | ||
| tail.limit += bytesRead.toInt() | ||
| sink.size += bytesRead | ||
| return bytesRead | ||
| } | ||
|
|
||
| override fun close() = input.close() | ||
|
|
||
| override fun toString() = "RawSource($input)" | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| /* | ||
| * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. | ||
| * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. | ||
| */ | ||
|
|
||
| package kotlinx.io | ||
|
|
||
| import kotlinx.cinterop.* | ||
| import platform.Foundation.* | ||
| import platform.darwin.NSInteger | ||
| import platform.darwin.NSUInteger | ||
| import platform.posix.memcpy | ||
jeffdgr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| import platform.posix.uint8_tVar | ||
| import kotlin.concurrent.Volatile | ||
|
|
||
| /** | ||
| * Returns an output stream that writes to this sink. Closing the stream will also close this sink. | ||
| * | ||
jeffdgr8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * @sample kotlinx.io.samples.KotlinxIoSamplesApple.asStream | ||
| */ | ||
| public fun Sink.asNSOutputStream(): NSOutputStream = SinkNSOutputStream(this) | ||
|
|
||
| @OptIn(UnsafeNumber::class) | ||
| private class SinkNSOutputStream( | ||
| private val sink: Sink, | ||
| ) : NSOutputStream(toMemory = Unit) { | ||
|
|
||
| private val isClosed: () -> Boolean = when (sink) { | ||
| is RealSink -> sink::closed | ||
| is Buffer -> { | ||
| { false } | ||
| } | ||
| } | ||
|
|
||
| @OptIn(ExperimentalStdlibApi::class) | ||
| @Volatile | ||
| private var status = NSStreamStatusNotOpen | ||
fzhinkin marked this conversation as resolved.
Show resolved
Hide resolved
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct me if I'm wrong, but status transition diagrams seems to look like this: graph TD
NSStreamStatusNotOpen -->|open| NSStreamStatusOpen;
NSStreamStatusNotOpen -->|close| NSStreamStatusClosed;
NSStreamStatusOpen -->|write| NSStreamStatusWriting;
NSStreamStatusWriting -->|write| NSStreamStatusOpen;
NSStreamStatusOpen -->|error| NSStreamStatusError;
NSStreamStatusNotOpen -->|write->error| NSStreamStatusError;
NSStreamStatusClosed -->|write->error| NSStreamStatusError;
NSStreamStatusWriting -->|error| NSStreamStatusError;
NSStreamStatusOpening;
NSStreamStatusReading;
NSStreamStatusAtEnd;
Do all the transitions make sense? Should we change the status to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the stream is not open, I don't think we should go from We should probably mirror the behavior of the platform streams and not allow closing a stream that has never been opened. I'll make that change. You're missing the close from the Where are you getting the I guess we just pass right through Sinks don't provide a mechanism to be limited in capacity, so we never get This is what I believe the status lifecycle should look like after these changes: graph TD
NSStreamStatusNotOpen -->|open| NSStreamStatusOpening;
NSStreamStatusOpening -->|open| NSStreamStatusOpen;
NSStreamStatusOpen -->|close| NSStreamStatusClosed;
NSStreamStatusOpen -->|write| NSStreamStatusWriting;
NSStreamStatusWriting -->|write| NSStreamStatusOpen;
NSStreamStatusWriting -->|error| NSStreamStatusError;
NSStreamStatusReading;
NSStreamStatusAtEnd;
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's possible when the stream is still open, but the underlying sink is closed and someone is calling
My understanding is that it's mainly intended for streams with complex opening protocols, like network connections.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh, ok. This should be changed now where any |
||
| private var error: NSError? = null | ||
| set(value) { | ||
| status = NSStreamStatusError | ||
| field = value | ||
| } | ||
|
|
||
| override fun streamStatus() = status | ||
|
|
||
| override fun streamError() = error | ||
|
|
||
| override fun open() { | ||
| if (status == NSStreamStatusNotOpen) { | ||
| status = NSStreamStatusOpen | ||
| } | ||
| } | ||
|
|
||
| override fun close() { | ||
| status = NSStreamStatusClosed | ||
fzhinkin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| sink.close() | ||
| } | ||
|
|
||
| @OptIn(DelicateIoApi::class) | ||
| override fun write(buffer: CPointer<uint8_tVar>?, maxLength: NSUInteger): NSInteger { | ||
| return try { | ||
| if (isClosed()) throw IOException("Underlying sink is closed.") | ||
fzhinkin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (status != NSStreamStatusOpen) return -1 | ||
| status = NSStreamStatusWriting | ||
| sink.writeToInternalBuffer { | ||
| it.writeNative(buffer, maxLength.toInt()) | ||
| } | ||
| status = NSStreamStatusOpen | ||
| maxLength.convert() | ||
| } catch (e: Exception) { | ||
| error = e.toNSError() | ||
| -1 | ||
| } | ||
| } | ||
|
|
||
| private fun Buffer.writeNative(source: CPointer<uint8_tVar>?, maxLength: Int) { | ||
jeffdgr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| var currentOffset = 0 | ||
| while (currentOffset < maxLength) { | ||
| val tail = writableSegment(1) | ||
|
|
||
| val toCopy = minOf(maxLength - currentOffset, Segment.SIZE - tail.limit) | ||
| tail.data.usePinned { | ||
| memcpy(it.addressOf(tail.pos), source + currentOffset, toCopy.convert()) | ||
| } | ||
|
|
||
| currentOffset += toCopy | ||
| tail.limit += toCopy | ||
| } | ||
| size += maxLength | ||
| } | ||
|
|
||
| override fun hasSpaceAvailable() = true | ||
jeffdgr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @OptIn(InternalIoApi::class) | ||
| override fun propertyForKey(key: NSStreamPropertyKey): Any? = when (key) { | ||
| NSStreamDataWrittenToMemoryStreamKey -> sink.buffer.readByteArray().toNSData() | ||
jeffdgr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| else -> null | ||
| } | ||
|
|
||
| override fun description() = "$sink.asNSOutputStream()" | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| /* | ||
| * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. | ||
| * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. | ||
| */ | ||
|
|
||
| package kotlinx.io | ||
|
|
||
| import kotlinx.cinterop.* | ||
| import platform.Foundation.* | ||
| import platform.darwin.NSInteger | ||
| import platform.darwin.NSUInteger | ||
| import platform.darwin.NSUIntegerVar | ||
| import platform.posix.memcpy | ||
| import platform.posix.uint8_tVar | ||
| import kotlin.concurrent.Volatile | ||
|
|
||
| /** | ||
| * Returns an input stream that reads from this source. Closing the stream will also close this source. | ||
| * | ||
jeffdgr8 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * @sample kotlinx.io.samples.KotlinxIoSamplesApple.asStream | ||
| */ | ||
| public fun Source.asNSInputStream(): NSInputStream = SourceNSInputStream(this) | ||
|
|
||
| @OptIn(InternalIoApi::class, UnsafeNumber::class) | ||
| private class SourceNSInputStream( | ||
| private val source: Source, | ||
| ) : NSInputStream(NSData()) { | ||
fzhinkin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| private val isClosed: () -> Boolean = when (source) { | ||
| is RealSource -> source::closed | ||
| is Buffer -> { | ||
| { false } | ||
| } | ||
| } | ||
|
|
||
| @OptIn(ExperimentalStdlibApi::class) | ||
| @Volatile | ||
| private var status = NSStreamStatusNotOpen | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same question as for the source, but with the corresponding diagram: graph TD
NSStreamStatusNotOpen -->|open| NSStreamStatusOpen;
NSStreamStatusNotOpen -->|close| NSStreamStatusClosed;
NSStreamStatusNotOpen -->|error| NSStreamStatusError;
NSStreamStatusOpen -->|read| NSStreamStatusReading;
NSStreamStatusReading --> |read| NSStreamStatusOpen;
NSStreamStatusOpen -->|read| NSStreamStatusAtEnd;
NSStreamStatusAtEnd -->|close| NSStreamStatusClosed;
NSStreamStatusOpen -->|error| NSStreamStatusError;
NSStreamStatusClosed --> |read->error| NSStreamStatusError;
NSStreamStatusClosed;
NSStreamStatusOpening;
NSStreamStatusReading;
NSStreamStatusAtEnd;
NSStreamStatusWriting;
```
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this should be the status lifecycle after updates: graph TD
NSStreamStatusNotOpen -->|open| NSStreamStatusOpening;
NSStreamStatusOpening -->|open| NSStreamStatusOpen;
NSStreamStatusOpen -->|read| NSStreamStatusReading;
NSStreamStatusReading -->|read| NSStreamStatusAtEnd;
NSStreamStatusAtEnd -->|read| NSStreamStatusReading;
NSStreamStatusReading --> |read| NSStreamStatusOpen;
NSStreamStatusReading -->|error| NSStreamStatusError;
NSStreamStatusAtEnd -->|close| NSStreamStatusClosed;
NSStreamStatusOpen -->|close| NSStreamStatusClosed;
NSStreamStatusWriting;
|
||
| private var error: NSError? = null | ||
| set(value) { | ||
| status = NSStreamStatusError | ||
| field = value | ||
| } | ||
|
|
||
| private var pinnedBuffer: Pinned<ByteArray>? = null | ||
jeffdgr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| override fun streamStatus() = if (isClosed()) NSStreamStatusClosed else status | ||
fzhinkin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| override fun streamError() = error | ||
|
|
||
| override fun open() { | ||
| if (status == NSStreamStatusNotOpen) { | ||
| status = NSStreamStatusOpen | ||
| } | ||
| } | ||
|
|
||
| override fun close() { | ||
| status = NSStreamStatusClosed | ||
fzhinkin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pinnedBuffer?.unpin() | ||
| pinnedBuffer = null | ||
| source.close() | ||
| } | ||
|
|
||
| override fun read(buffer: CPointer<uint8_tVar>?, maxLength: NSUInteger): NSInteger { | ||
| try { | ||
| if (isClosed()) throw IOException("Underlying source is closed.") | ||
| if (status != NSStreamStatusOpen) return -1 | ||
| status = NSStreamStatusReading | ||
| if (source.exhausted()) { | ||
| status = NSStreamStatusAtEnd | ||
| return 0 | ||
| } | ||
| val toRead = minOf(maxLength.toInt(), source.buffer.size).toInt() | ||
| val read = source.buffer.readNative(buffer, toRead).convert<NSInteger>() | ||
| status = NSStreamStatusOpen | ||
| return read | ||
| } catch (e: Exception) { | ||
| error = e.toNSError() | ||
| return -1 | ||
| } | ||
| } | ||
|
|
||
| override fun getBuffer(buffer: CPointer<CPointerVar<uint8_tVar>>?, length: CPointer<NSUIntegerVar>?): Boolean { | ||
fzhinkin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (source.buffer.size > 0) { | ||
| source.buffer.head?.let { s -> | ||
| pinnedBuffer?.unpin() | ||
| s.data.pin().let { | ||
| pinnedBuffer = it | ||
| buffer?.pointed?.value = it.addressOf(s.pos).reinterpret() | ||
| length?.pointed?.value = (s.limit - s.pos).convert() | ||
| return true | ||
| } | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| private fun Buffer.readNative(sink: CPointer<uint8_tVar>?, maxLength: Int): Int { | ||
jeffdgr8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val s = head ?: return 0 | ||
| val toCopy = minOf(maxLength, s.limit - s.pos) | ||
| s.data.usePinned { | ||
| memcpy(sink, it.addressOf(s.pos), toCopy.convert()) | ||
| } | ||
|
|
||
| s.pos += toCopy | ||
| size -= toCopy.toLong() | ||
|
|
||
| if (s.pos == s.limit) { | ||
| head = s.pop() | ||
| SegmentPool.recycle(s) | ||
| } | ||
|
|
||
| return toCopy | ||
| } | ||
|
|
||
| override fun hasBytesAvailable() = !source.exhausted() | ||
|
|
||
| override fun propertyForKey(key: NSStreamPropertyKey): Any? = null | ||
|
|
||
| override fun description() = "$source.asNSInputStream()" | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.