Skip to content

Commit e4d082b

Browse files
committed
Implement circularRead fix
1 parent 85de6fe commit e4d082b

File tree

1 file changed

+30
-1
lines changed

1 file changed

+30
-1
lines changed

io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala

+30-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel
137137
// or just a part of it.
138138
val toRead = math.min(available, length)
139139
// Transfer the bytes to the provided byte array.
140-
System.arraycopy(buffer, head % capacity, b, offset, toRead)
140+
circularRead(buffer, head, capacity, b, offset, toRead)
141141
// The bytes are marked as read by advancing the head of the
142142
// circular buffer.
143143
head += toRead
@@ -192,6 +192,35 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel
192192
override def available(): Int = self.synchronized {
193193
if (closed) 0 else tail - head
194194
}
195+
196+
/** Reads bytes from a circular buffer by copying them into a regular
197+
* buffer.
198+
*
199+
* @param src the source circular buffer
200+
* @param srcPos the offset into the source circular buffer
201+
* @param srcCap the capacity of the source circular buffer
202+
* @param dst the destination buffer
203+
* @param dstPos the offset into the destination buffer
204+
* @param length the number of bytes to be transferred
205+
*/
206+
private[this] def circularRead(
207+
src: Array[Byte],
208+
srcPos: Int,
209+
srcCap: Int,
210+
dst: Array[Byte],
211+
dstPos: Int,
212+
length: Int
213+
): Unit = {
214+
val srcOffset = srcPos % srcCap
215+
if (srcOffset + length >= srcCap) {
216+
val batch1 = srcCap - srcOffset
217+
val batch2 = length - batch1
218+
System.arraycopy(src, srcOffset, dst, dstPos, batch1)
219+
System.arraycopy(src, 0, dst, dstPos + batch1, batch2)
220+
} else {
221+
System.arraycopy(src, srcOffset, dst, dstPos, length)
222+
}
223+
}
195224
}
196225

197226
val outputStream: OutputStream = new OutputStream {

0 commit comments

Comments
 (0)