Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c0f042f
NSInputStream.asSource() and Source.asNSInputStream()
jeffdgr8 Jul 12, 2023
13bf3b1
Move Exception.toNSError() to -Util file
jeffdgr8 Jul 12, 2023
3386ab3
Make isClosed explicitly private
jeffdgr8 Jul 12, 2023
6b4bb80
Code review feedback
jeffdgr8 Jul 12, 2023
8ca65a2
Implement NSStreamStatus
jeffdgr8 Jul 12, 2023
2be70a6
Open NSInputStream on first read
jeffdgr8 Jul 12, 2023
54e247c
Unknown error when no streamError description
jeffdgr8 Jul 12, 2023
dccd22a
NSOutputStream.asSink() and Sink.asNSOutputStream()
jeffdgr8 Jul 13, 2023
2ba8bed
Support SinkNSOutputStream NSStreamDataWrittenToMemoryStreamKey
jeffdgr8 Jul 13, 2023
d7b8e1d
Override SourceNSInputStream.propertyForKey as no-op
jeffdgr8 Jul 13, 2023
62757c5
Mark status property @Volatile
jeffdgr8 Jul 13, 2023
613e2be
Code review feedback and fixes
jeffdgr8 Jul 13, 2023
e5f5c27
Fix reading byte as int
jeffdgr8 Jul 17, 2023
f5dfc1a
Buffer.snapshotAsNSData() for NSStreamDataWrittenToMemoryStreamKey
jeffdgr8 Jul 17, 2023
c9d1f44
Test SinkNSOutputStream with data longer than Segment
jeffdgr8 Jul 17, 2023
242fda0
Open streams on init
jeffdgr8 Jul 18, 2023
ead4f78
Update core/apple/src/-Util.kt
jeffdgr8 Jul 18, 2023
f9c9305
Update core/apple/src/BuffersApple.kt
jeffdgr8 Jul 18, 2023
18ff1d0
Update core/apple/test/NSOutputStreamSinkTest.kt
jeffdgr8 Jul 18, 2023
c4eb1b9
Update core/apple/test/SinkNSOutputStreamTest.kt
jeffdgr8 Jul 18, 2023
56bb0e4
Update core/apple/test/samples/samplesApple.kt
jeffdgr8 Jul 18, 2023
9d53c8e
Update core/apple/test/utilApple.kt
jeffdgr8 Jul 18, 2023
4e3ff87
Add samplesApple.kt to Dokka samples
jeffdgr8 Jul 18, 2023
aa5830c
Verify buffer != null and maxLength >= 0
jeffdgr8 Jul 18, 2023
6f076d9
Check isClosed() in SinkNSOutputStream.streamStatus
jeffdgr8 Jul 18, 2023
f789518
Use assertFailsWith
jeffdgr8 Jul 18, 2023
feb3145
Update core/apple/src/BuffersApple.kt
jeffdgr8 Jul 18, 2023
3508104
Update core/apple/src/SinksApple.kt
jeffdgr8 Jul 18, 2023
9e71d4b
Don't close a stream with the error status
jeffdgr8 Jul 18, 2023
f40d472
Use malloc and NSData.dataWithBytesNoCopy:length:
jeffdgr8 Jul 18, 2023
caba9b4
Better variable names
jeffdgr8 Jul 18, 2023
afab5b7
Use uint8_tVar (same typealias, but matches function signature)
jeffdgr8 Jul 18, 2023
365c354
Test SourceNSInputStream with long input data
jeffdgr8 Jul 18, 2023
a19c463
Add apple source set to bytestring module
jeffdgr8 Jul 18, 2023
0021412
Add NSInputStream from file test
jeffdgr8 Jul 19, 2023
211c5f5
Remove @Volatile annotations
jeffdgr8 Jul 19, 2023
ae33893
Remove getBuffer() implementation
jeffdgr8 Jul 19, 2023
5d14977
createTempFile() not working on Apple platforms
jeffdgr8 Jul 19, 2023
5cfafa2
Merge remote-tracking branch 'upstream/develop' into nsinputstream
jeffdgr8 Jul 19, 2023
e9fcaeb
Add apple source set
jeffdgr8 Jul 19, 2023
50fe63f
SourceNSInputStream run loop delegate support
jeffdgr8 Jul 20, 2023
89a4f80
Test subscribe after open
jeffdgr8 Jul 20, 2023
e296c01
Check run loop on postEvent()
jeffdgr8 Jul 20, 2023
7b3ab06
SinkNSOutputStream run loop delegate support
jeffdgr8 Jul 20, 2023
c74845e
lockWithTimeout() with better failure logging
jeffdgr8 Jul 22, 2023
d2d040d
Synchronize access to read variable for entire event handler
jeffdgr8 Jul 22, 2023
630423c
Only catch TimeoutCancellationException
jeffdgr8 Jul 22, 2023
110e56c
Code review feedback and fixes
jeffdgr8 Aug 1, 2023
6d034e9
Check for NSStreamStatusAtEnd after read
jeffdgr8 Aug 2, 2023
ed5902e
Post NSStreamEventErrorOccurred on exhausted error
jeffdgr8 Aug 2, 2023
07318d0
Revert check for NSStreamStatusAtEnd after read
jeffdgr8 Aug 2, 2023
4897741
Add suggested doc comments
jeffdgr8 Aug 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions buildSrc/src/main/kotlin/Platforms.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fun KotlinMultiplatformExtension.configureNativePlatforms() {
mingwX64()
}

private val appleTargets = listOf(
val appleTargets = listOf(
"iosArm64",
"iosX64",
"iosSimulatorArm64",
Expand Down Expand Up @@ -64,7 +64,7 @@ private val androidTargets = listOf(
"androidNativeX86"
)

val nativeTargets = appleTargets + linuxTargets + mingwTargets + androidTargets
val nativeTargets = linuxTargets + mingwTargets + androidTargets

/**
* Creates a source set for a directory that isn't already a built-in platform. Use this to create
Expand Down
26 changes: 26 additions & 0 deletions core/apple/src/-Util.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
@file:OptIn(UnsafeNumber::class)

package kotlinx.io

import kotlinx.cinterop.UnsafeNumber
import kotlinx.cinterop.addressOf
import kotlinx.cinterop.convert
import kotlinx.cinterop.usePinned
import platform.Foundation.*

internal fun Exception.toNSError() = NSError(
domain = "Kotlin",
code = 0,
userInfo = mapOf(
NSLocalizedDescriptionKey to message,
NSUnderlyingErrorKey to this
)
)

internal fun ByteArray.toNSData() = if (isNotEmpty()) {
usePinned {
NSData.create(bytes = it.addressOf(0), length = size.convert())
}
} else {
NSData.data()
}
111 changes: 111 additions & 0 deletions core/apple/src/AppleCore.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers.
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file.
*/

package kotlinx.io

import kotlinx.cinterop.*
import platform.Foundation.NSInputStream
import platform.Foundation.NSOutputStream
import platform.Foundation.NSStreamStatusClosed
import platform.Foundation.NSStreamStatusNotOpen
import platform.darwin.UInt8Var

/**
* Returns [RawSink] that writes to an output stream.
*
* Use [RawSink.buffered] to create a buffered sink from it.
*
* @sample kotlinx.io.samples.KotlinxIoSamplesApple.outputStreamAsSink
*/
public fun NSOutputStream.asSink(): RawSink = OutputStreamSink(this)

private open class OutputStreamSink(
private val out: NSOutputStream,
) : RawSink {

@OptIn(UnsafeNumber::class)
override fun write(source: Buffer, byteCount: Long) {
if (out.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed")
if (out.streamStatus == NSStreamStatusNotOpen) out.open()

checkOffsetAndCount(source.size, 0, byteCount)
var remaining = byteCount
while (remaining > 0) {
val head = source.head!!
val toCopy = minOf(remaining, head.limit - head.pos).toInt()
val bytesWritten = head.data.usePinned {
val bytes = it.addressOf(head.pos).reinterpret<UInt8Var>()
out.write(bytes, toCopy.convert()).toLong()
}

if (bytesWritten < 0L) throw IOException(out.streamError?.localizedDescription ?: "Unknown error")
if (bytesWritten == 0L) throw IOException("NSOutputStream reached capacity")

head.pos += bytesWritten.toInt()
remaining -= bytesWritten
source.size -= bytesWritten

if (head.pos == head.limit) {
source.head = head.pop()
SegmentPool.recycle(head)
}
}
}

override fun flush() {
// no-op
}

override fun close() = out.close()

override fun toString() = "RawSink($out)"
}

/**
* Returns [RawSource] that reads from an input stream.
*
* Use [RawSource.buffered] to create a buffered source from it.
*
* @sample kotlinx.io.samples.KotlinxIoSamplesApple.inputStreamAsSource
*/
public fun NSInputStream.asSource(): RawSource = NSInputStreamSource(this)

private open class NSInputStreamSource(
private val input: NSInputStream,
) : RawSource {

@OptIn(UnsafeNumber::class)
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
if (input.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed")
if (input.streamStatus == NSStreamStatusNotOpen) input.open()

if (byteCount == 0L) return 0L
checkByteCount(byteCount)

val tail = sink.writableSegment(1)
val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit)
val bytesRead = tail.data.usePinned {
val bytes = it.addressOf(tail.limit).reinterpret<UInt8Var>()
input.read(bytes, maxToCopy.convert()).toLong()
}

if (bytesRead < 0L) throw IOException(input.streamError?.localizedDescription ?: "Unknown error")
if (bytesRead == 0L) {
if (tail.pos == tail.limit) {
// We allocated a tail segment, but didn't end up needing it. Recycle!
sink.head = tail.pop()
SegmentPool.recycle(tail)
}
return -1
}
tail.limit += bytesRead.toInt()
sink.size += bytesRead
return bytesRead
}

override fun close() = input.close()

override fun toString() = "RawSource($input)"
}
67 changes: 67 additions & 0 deletions core/apple/src/BuffersApple.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
@file:OptIn(UnsafeNumber::class)

package kotlinx.io

import kotlinx.cinterop.*
import kotlinx.io.bytestring.ByteString
import kotlinx.io.bytestring.buildByteString
import platform.Foundation.*
import platform.darwin.NSUInteger
import platform.darwin.NSUIntegerMax
import platform.posix.memcpy
import platform.posix.uint8_tVar

internal fun Buffer.write(source: CPointer<uint8_tVar>?, maxLength: Int) {
var currentOffset = 0
while (currentOffset < maxLength) {
val tail = writableSegment(1)

val toCopy = minOf(maxLength - currentOffset, Segment.SIZE - tail.limit)
tail.data.usePinned {
memcpy(it.addressOf(tail.pos), source + currentOffset, toCopy.convert())
}

currentOffset += toCopy
tail.limit += toCopy
}
size += maxLength
}

internal fun Buffer.readAtMostTo(sink: CPointer<uint8_tVar>?, maxLength: Int): Int {
val s = head ?: return 0
val toCopy = minOf(maxLength, s.limit - s.pos)
s.data.usePinned {
memcpy(sink, it.addressOf(s.pos), toCopy.convert())
}

s.pos += toCopy
size -= toCopy.toLong()

if (s.pos == s.limit) {
head = s.pop()
SegmentPool.recycle(s)
}

return toCopy
}

internal fun Buffer.snapshotAsNSData(): NSData {
if (size == 0L) return NSData.data()

check(size.toULong() <= NSUIntegerMax) { "Buffer is too long ($size) to be converted into NSData." }

val data = NSMutableData.create(length = size.convert())!!
var curr = head
var index: NSUInteger = 0U
do {
check(curr != null) { "Current segment is null" }
val pos = curr.pos
val length: NSUInteger = (curr.limit - pos).convert()
curr.data.usePinned {
data.replaceBytesInRange(NSMakeRange(index, length), it.addressOf(pos))
}
curr = curr.next
index += length
} while (curr !== head)
return data
}
85 changes: 85 additions & 0 deletions core/apple/src/SinksApple.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers.
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file.
*/

package kotlinx.io

import kotlinx.cinterop.*
import platform.Foundation.*
import platform.darwin.NSInteger
import platform.darwin.NSUInteger
import platform.posix.memcpy
import platform.posix.uint8_tVar
import kotlin.concurrent.Volatile

/**
* Returns an output stream that writes to this sink. Closing the stream will also close this sink.
*
* @sample kotlinx.io.samples.KotlinxIoSamplesApple.asStream
*/
public fun Sink.asNSOutputStream(): NSOutputStream = SinkNSOutputStream(this)

@OptIn(UnsafeNumber::class)
private class SinkNSOutputStream(
private val sink: Sink,
) : NSOutputStream(toMemory = Unit) {

private val isClosed: () -> Boolean = when (sink) {
is RealSink -> sink::closed
is Buffer -> {
{ false }
}
}

@OptIn(ExperimentalStdlibApi::class)
@Volatile
private var status = NSStreamStatusNotOpen
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong, but status transition diagrams seems to look like this:

graph TD
    NSStreamStatusNotOpen -->|open| NSStreamStatusOpen;
    NSStreamStatusNotOpen -->|close| NSStreamStatusClosed;
    NSStreamStatusOpen -->|write| NSStreamStatusWriting;
    NSStreamStatusWriting -->|write| NSStreamStatusOpen;

    NSStreamStatusOpen -->|error| NSStreamStatusError;
    NSStreamStatusNotOpen -->|write->error| NSStreamStatusError;
    NSStreamStatusClosed -->|write->error| NSStreamStatusError;
    NSStreamStatusWriting -->|error| NSStreamStatusError;

    NSStreamStatusOpening;
    NSStreamStatusReading;
    NSStreamStatusAtEnd;
Loading

Do all the transitions make sense? Should we change the status to error if it is closed? Should we fail and change the status to error when writing into a stream whose underlying sink was closed but the stream itself was not opened yet, or it's better to exit the write immediately if the status is not-open?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the stream is not open, write() will just return -1, but not change the status to error. NSStreamStatusError is a result of a terminal error, so any error that is correctable, like calling a function in the wrong order or with bad parameters, should not change the status to error.

I don't think we should go from NSStreamStatusClosed to NSStreamStatusError. Calling write on a closed platform stream just returns -1. I'll change that throwing IOException to returning -1 when closed.

We should probably mirror the behavior of the platform streams and not allow closing a stream that has never been opened. I'll make that change. You're missing the close from the NSStreamStatusOpen state though.

Where are you getting the NSStreamStatusOpen -->|error| NSStreamStatusError; from? I believe we should only get an error during write. That's the only place it's being set.

I guess we just pass right through NSStreamStatusOpening. Maybe it makes sense to set it right before setting to NSStreamStatusOpen, in case some key-value observer is expecting it in Objective-C? We don't actually do anything to open the underlying Sink or Source, so setting both statuses is entirely to align with NSStream expected behavior.

Sinks don't provide a mechanism to be limited in capacity, so we never get NSStreamStatusAtEnd.

This is what I believe the status lifecycle should look like after these changes:

graph TD
    NSStreamStatusNotOpen -->|open| NSStreamStatusOpening;
    NSStreamStatusOpening -->|open| NSStreamStatusOpen;
    NSStreamStatusOpen -->|close| NSStreamStatusClosed;
    NSStreamStatusOpen -->|write| NSStreamStatusWriting;
    NSStreamStatusWriting -->|write| NSStreamStatusOpen;

    NSStreamStatusWriting -->|error| NSStreamStatusError;

    NSStreamStatusReading;
    NSStreamStatusAtEnd;
Loading

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are you getting the NSStreamStatusOpen -->|error| NSStreamStatusError; from? I believe we should only get an error during write. That's the only place it's being set.

That's possible when the stream is still open, but the underlying sink is closed and someone is calling write.
Not saying that it's wrong or problematic, just mentioning the possible transition.

I guess we just pass right through NSStreamStatusOpening. Maybe it makes sense to set it right before setting to NSStreamStatusOpen, in case some key-value observer is expecting it in Objective-C?

My understanding is that it's mainly intended for streams with complex opening protocols, like network connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's possible when the stream is still open, but the underlying sink is closed and someone is calling write.
Not saying that it's wrong or problematic, just mentioning the possible transition.

Oh, ok. This should be changed now where any error would come after write has changed status to NSStreamStatusWriting.

private var error: NSError? = null
set(value) {
status = NSStreamStatusError
field = value
}

override fun streamStatus() = status

override fun streamError() = error

override fun open() {
if (status == NSStreamStatusNotOpen) {
status = NSStreamStatusOpen
}
}

override fun close() {
status = NSStreamStatusClosed
sink.close()
}

@OptIn(DelicateIoApi::class)
override fun write(buffer: CPointer<uint8_tVar>?, maxLength: NSUInteger): NSInteger {
return try {
if (isClosed()) throw IOException("Underlying sink is closed.")
if (status != NSStreamStatusOpen) return -1
status = NSStreamStatusWriting
sink.writeToInternalBuffer {
it.write(buffer, maxLength.toInt())
}
status = NSStreamStatusOpen
maxLength.convert()
} catch (e: Exception) {
error = e.toNSError()
-1
}
}

override fun hasSpaceAvailable() = !isClosed()

@OptIn(InternalIoApi::class)
override fun propertyForKey(key: NSStreamPropertyKey): Any? = when (key) {
NSStreamDataWrittenToMemoryStreamKey -> sink.buffer.snapshotAsNSData()
else -> null
}

override fun description() = "$sink.asNSOutputStream()"
}
Loading