Skip to content

Commit 2b411c5

Browse files
authored
Merge pull request #2459 from vasilmkd/piped-bug-port
PipedStreamBuffer bug fix port
2 parents a502dd1 + e4d082b commit 2b411c5

File tree

2 files changed

+96
-3
lines changed

2 files changed

+96
-3
lines changed

io/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala

+60-2
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel
137137
// or just a part of it.
138138
val toRead = math.min(available, length)
139139
// Transfer the bytes to the provided byte array.
140-
System.arraycopy(buffer, head % capacity, b, offset, toRead)
140+
circularRead(buffer, head, capacity, b, offset, toRead)
141141
// The bytes are marked as read by advancing the head of the
142142
// circular buffer.
143143
head += toRead
@@ -192,6 +192,35 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel
192192
override def available(): Int = self.synchronized {
193193
if (closed) 0 else tail - head
194194
}
195+
196+
/** Reads bytes from a circular buffer by copying them into a regular
197+
* buffer.
198+
*
199+
* @param src the source circular buffer
200+
* @param srcPos the offset into the source circular buffer
201+
* @param srcCap the capacity of the source circular buffer
202+
* @param dst the destination buffer
203+
* @param dstPos the offset into the destination buffer
204+
* @param length the number of bytes to be transferred
205+
*/
206+
private[this] def circularRead(
207+
src: Array[Byte],
208+
srcPos: Int,
209+
srcCap: Int,
210+
dst: Array[Byte],
211+
dstPos: Int,
212+
length: Int
213+
): Unit = {
214+
val srcOffset = srcPos % srcCap
215+
if (srcOffset + length >= srcCap) {
216+
val batch1 = srcCap - srcOffset
217+
val batch2 = length - batch1
218+
System.arraycopy(src, srcOffset, dst, dstPos, batch1)
219+
System.arraycopy(src, 0, dst, dstPos + batch1, batch2)
220+
} else {
221+
System.arraycopy(src, srcOffset, dst, dstPos, length)
222+
}
223+
}
195224
}
196225

197226
val outputStream: OutputStream = new OutputStream {
@@ -270,7 +299,7 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel
270299
// or just a part of it.
271300
val toWrite = math.min(available, length)
272301
// Transfer the bytes to the provided byte array.
273-
System.arraycopy(b, offset, buffer, tail % capacity, toWrite)
302+
circularWrite(b, offset, buffer, tail, capacity, toWrite)
274303
// The bytes are marked as written by advancing the tail of the
275304
// circular buffer.
276305
tail += toWrite
@@ -316,5 +345,34 @@ private[io] final class PipedStreamBuffer(private[this] val capacity: Int) { sel
316345
readerPermit.release()
317346
}
318347
}
348+
349+
/** Writes bytes into a circular buffer by copying them from a regular
350+
* buffer.
351+
*
352+
* @param src the source buffer
353+
* @param srcPos the offset into the source buffer
354+
* @param dst the destination circular buffer
355+
* @param dstPos the offset into the destination circular buffer
356+
* @param dstCap the capacity of the destination circular buffer
357+
* @param length the number of bytes to be transferred
358+
*/
359+
private[this] def circularWrite(
360+
src: Array[Byte],
361+
srcPos: Int,
362+
dst: Array[Byte],
363+
dstPos: Int,
364+
dstCap: Int,
365+
length: Int
366+
): Unit = {
367+
val dstOffset = dstPos % dstCap
368+
if (dstOffset + length >= dstCap) {
369+
val batch1 = dstCap - dstOffset
370+
val batch2 = length - batch1
371+
System.arraycopy(src, srcPos, dst, dstOffset, batch1)
372+
System.arraycopy(src, srcPos + batch1, dst, 0, batch2)
373+
} else {
374+
System.arraycopy(src, srcPos, dst, dstOffset, length)
375+
}
376+
}
319377
}
320378
}

io/src/test/scala/fs2/io/IoSuite.scala

+36-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2020
*/
2121

22-
package fs2.io
22+
package fs2
23+
package io
2324

2425
import java.io.{ByteArrayInputStream, InputStream, OutputStream}
2526
import java.util.concurrent.Executors
@@ -163,6 +164,40 @@ class IoSuite extends Fs2Suite {
163164
}
164165
}.compile.drain.map(_ => assert(true))
165166
}
167+
168+
test("different chunk sizes function correctly") {
169+
170+
def test(chunkSize: Int): Pipe[IO, Byte, Byte] = source => {
171+
readOutputStream(chunkSize) { os =>
172+
source.through(writeOutputStream(IO.delay(os), true)).compile.drain
173+
}
174+
}
175+
176+
def source(chunkSize: Int, bufferSize: Int): Stream[Pure, Byte] =
177+
Stream.range(65, 75).map(_.toByte).repeat.take(chunkSize.toLong * 2).buffer(bufferSize)
178+
179+
forAllF { (chunkSize0: Int, bufferSize0: Int) =>
180+
val chunkSize = (chunkSize0 % 512).abs + 1
181+
val bufferSize = (bufferSize0 % 511).abs + 1
182+
183+
val src = source(chunkSize, bufferSize)
184+
185+
src
186+
.through(text.utf8Decode)
187+
.foldMonoid
188+
.flatMap { expected =>
189+
src
190+
.through(test(chunkSize))
191+
.through(text.utf8Decode)
192+
.foldMonoid
193+
.evalMap { actual =>
194+
IO(assertEquals(actual, expected))
195+
}
196+
}
197+
.compile
198+
.drain
199+
}
200+
}
166201
}
167202

168203
group("unsafeReadInputStream") {

0 commit comments

Comments
 (0)