Skip to content

Commit f943d89

Browse files
authored
Split stream functionality into modules. (#4)
1 parent 865ffef commit f943d89

File tree

4 files changed

+389
-321
lines changed

4 files changed

+389
-321
lines changed

lib/io/stream/generic.rb

Lines changed: 6 additions & 318 deletions
Original file line numberDiff line numberDiff line change
@@ -4,246 +4,30 @@
44
# Copyright, 2023-2024, by Samuel Williams.
55

66
require_relative "string_buffer"
7+
require_relative "readable"
8+
require_relative "writable"
79

810
require_relative "shim/buffered"
911
require_relative "shim/readable"
1012

1113
require_relative "openssl"
1214

1315
module IO::Stream
14-
# The default block size for IO buffers. Defaults to 64KB (typical pipe buffer size).
15-
BLOCK_SIZE = ENV.fetch("IO_STREAM_BLOCK_SIZE", 1024*64).to_i
16-
17-
# The maximum read size when appending to IO buffers. Defaults to 8MB.
18-
MAXIMUM_READ_SIZE = ENV.fetch("IO_STREAM_MAXIMUM_READ_SIZE", BLOCK_SIZE * 128).to_i
19-
2016
class LimitError < StandardError
2117
end
2218

2319
class Generic
24-
def initialize(block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE)
25-
@eof = false
26-
27-
@writing = ::Thread::Mutex.new
28-
29-
@block_size = block_size
30-
@maximum_read_size = maximum_read_size
31-
32-
@read_buffer = StringBuffer.new
33-
@write_buffer = StringBuffer.new
34-
35-
# Used as destination buffer for underlying reads.
36-
@input_buffer = StringBuffer.new
37-
end
38-
39-
attr_accessor :block_size
40-
41-
# Reads `size` bytes from the stream. If size is not specified, read until end of file.
42-
def read(size = nil)
43-
return String.new(encoding: Encoding::BINARY) if size == 0
44-
45-
if size
46-
until @eof or @read_buffer.bytesize >= size
47-
# Compute the amount of data we need to read from the underlying stream:
48-
read_size = size - @read_buffer.bytesize
49-
50-
# Don't read less than @block_size to avoid lots of small reads:
51-
fill_read_buffer(read_size > @block_size ? read_size : @block_size)
52-
end
53-
else
54-
until @eof
55-
fill_read_buffer
56-
end
57-
end
58-
59-
return consume_read_buffer(size)
60-
end
61-
62-
# Read at most `size` bytes from the stream. Will avoid reading from the underlying stream if possible.
63-
def read_partial(size = nil)
64-
return String.new(encoding: Encoding::BINARY) if size == 0
65-
66-
if !@eof and @read_buffer.empty?
67-
fill_read_buffer
68-
end
69-
70-
return consume_read_buffer(size)
71-
end
72-
73-
def read_exactly(size, exception: EOFError)
74-
if buffer = read(size)
75-
if buffer.bytesize != size
76-
raise exception, "could not read enough data"
77-
end
78-
79-
return buffer
80-
end
81-
82-
raise exception, "encountered eof while reading data"
83-
end
84-
85-
# This is a compatibility shim for existing code that uses `readpartial`.
86-
def readpartial(size = nil)
87-
read_partial(size) or raise EOFError, "Encountered eof while reading data!"
88-
end
89-
90-
private def index_of(pattern, offset, limit)
91-
# We don't want to split on the pattern, so we subtract the size of the pattern.
92-
split_offset = pattern.bytesize - 1
93-
94-
until index = @read_buffer.index(pattern, offset)
95-
offset = @read_buffer.bytesize - split_offset
96-
97-
offset = 0 if offset < 0
98-
99-
return nil if limit and offset >= limit
100-
return nil unless fill_read_buffer
101-
end
102-
103-
return index
104-
end
105-
106-
# Efficiently read data from the stream until encountering pattern.
107-
# @parameter pattern [String] The pattern to match.
108-
# @parameter offset [Integer] The offset to start searching from.
109-
# @parameter limit [Integer] The maximum number of bytes to read, including the pattern (even if chomped).
110-
# @returns [String | Nil] The contents of the stream up until the pattern, which is consumed but not returned.
111-
def read_until(pattern, offset = 0, limit: nil, chomp: true)
112-
if index = index_of(pattern, offset, limit)
113-
return nil if limit and index >= limit
114-
115-
@read_buffer.freeze
116-
matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize))
117-
@read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize)
118-
119-
return matched
120-
end
121-
end
122-
123-
def peek(size = nil)
124-
if size
125-
until @eof or @read_buffer.bytesize >= size
126-
# Compute the amount of data we need to read from the underlying stream:
127-
read_size = size - @read_buffer.bytesize
128-
129-
# Don't read less than @block_size to avoid lots of small reads:
130-
fill_read_buffer(read_size > @block_size ? read_size : @block_size)
131-
end
132-
return @read_buffer[..([size, @read_buffer.size].min - 1)]
133-
end
134-
until (block_given? && yield(@read_buffer)) or @eof
135-
fill_read_buffer
136-
end
137-
return @read_buffer
138-
end
139-
140-
def gets(separator = $/, limit = nil, chomp: false)
141-
# Compatibility with IO#gets:
142-
if separator.is_a?(Integer)
143-
limit = separator
144-
separator = $/
145-
end
146-
147-
# We don't want to split in the middle of the separator, so we subtract the size of the separator from the start of the search:
148-
split_offset = separator.bytesize - 1
149-
150-
offset = 0
151-
152-
until index = @read_buffer.index(separator, offset)
153-
offset = @read_buffer.bytesize - split_offset
154-
offset = 0 if offset < 0
155-
156-
# If a limit was given, and the offset is beyond the limit, we should return up to the limit:
157-
if limit and offset >= limit
158-
# As we didn't find the separator, there is nothing to chomp either.
159-
return consume_read_buffer(limit)
160-
end
161-
162-
# If we can't read any more data, we should return what we have:
163-
return consume_read_buffer unless fill_read_buffer
164-
end
165-
166-
# If the index of the separator was beyond the limit:
167-
if limit and index >= limit
168-
# Return up to the limit:
169-
return consume_read_buffer(limit)
170-
end
171-
172-
# Freeze the read buffer, as this enables us to use byteslice without generating a hidden copy:
173-
@read_buffer.freeze
174-
175-
line = @read_buffer.byteslice(0, index+(chomp ? 0 : separator.bytesize))
176-
@read_buffer = @read_buffer.byteslice(index+separator.bytesize, @read_buffer.bytesize)
177-
178-
return line
179-
end
20+
include Readable
21+
include Writable
18022

181-
private def drain(buffer)
182-
begin
183-
syswrite(buffer)
184-
ensure
185-
# If the write operation fails, we still need to clear this buffer, and the data is essentially lost.
186-
buffer.clear
187-
end
188-
end
189-
190-
# Flushes buffered data to the stream.
191-
def flush
192-
return if @write_buffer.empty?
193-
194-
@writing.synchronize do
195-
self.drain(@write_buffer)
196-
end
197-
end
198-
199-
# Writes `string` to the buffer. When the buffer is full or #sync is true the
200-
# buffer is flushed to the underlying `io`.
201-
# @parameter string [String] the string to write to the buffer.
202-
# @returns [Integer] the number of bytes appended to the buffer.
203-
def write(string, flush: false)
204-
@writing.synchronize do
205-
@write_buffer << string
206-
207-
flush |= (@write_buffer.bytesize >= @block_size)
208-
209-
if flush
210-
self.drain(@write_buffer)
211-
end
212-
end
213-
214-
return string.bytesize
215-
end
216-
217-
# Writes `string` to the stream and returns self.
218-
def <<(string)
219-
write(string)
220-
221-
return self
222-
end
223-
224-
def puts(*arguments, separator: $/)
225-
return if arguments.empty?
226-
227-
@writing.synchronize do
228-
arguments.each do |argument|
229-
@write_buffer << argument << separator
230-
end
231-
232-
self.drain(@write_buffer)
233-
end
23+
def initialize(**options)
24+
super(**options)
23425
end
23526

23627
def closed?
23728
false
23829
end
23930

240-
def close_read
241-
end
242-
243-
def close_write
244-
flush
245-
end
246-
24731
# Best effort to flush any unwritten data, and then close the underling IO.
24832
def close
24933
return if closed?
@@ -257,44 +41,6 @@ def close
25741
end
25842
end
25943

260-
# Determins if the stream has consumed all available data. May block if the stream is not readable.
261-
# See {readable?} for a non-blocking alternative.
262-
#
263-
# @returns [Boolean] If the stream is at file which means there is no more data to be read.
264-
def eof?
265-
if !@read_buffer.empty?
266-
return false
267-
elsif @eof
268-
return true
269-
else
270-
return !self.fill_read_buffer
271-
end
272-
end
273-
274-
def eof!
275-
@read_buffer.clear
276-
@eof = true
277-
278-
raise EOFError
279-
end
280-
281-
# Whether there is a chance that a read operation will succeed or not.
282-
# @returns [Boolean] If the stream is readable, i.e. a `read` operation has a chance of success.
283-
def readable?
284-
# If we are at the end of the file, we can't read any more data:
285-
if @eof
286-
return false
287-
end
288-
289-
# If the read buffer is not empty, we can read more data:
290-
if !@read_buffer.empty?
291-
return true
292-
end
293-
294-
# If the underlying stream is readable, we can read more data:
295-
return !closed?
296-
end
297-
29844
protected
29945

30046
def sysclose
@@ -309,63 +55,5 @@ def syswrite(buffer)
30955
def sysread(size, buffer)
31056
raise NotImplementedError
31157
end
312-
313-
private
314-
315-
# Fills the buffer from the underlying stream.
316-
def fill_read_buffer(size = @block_size)
317-
# We impose a limit because the underlying `read` system call can fail if we request too much data in one go.
318-
if size > @maximum_read_size
319-
size = @maximum_read_size
320-
end
321-
322-
# This effectively ties the input and output stream together.
323-
flush
324-
325-
if @read_buffer.empty?
326-
if sysread(size, @read_buffer)
327-
# Console.info(self, name: "read") {@read_buffer.inspect}
328-
return true
329-
end
330-
else
331-
if chunk = sysread(size, @input_buffer)
332-
@read_buffer << chunk
333-
# Console.info(self, name: "read") {@read_buffer.inspect}
334-
335-
return true
336-
end
337-
end
338-
339-
# else for both cases above:
340-
@eof = true
341-
return false
342-
end
343-
344-
# Consumes at most `size` bytes from the buffer.
345-
# @parameter size [Integer|nil] The amount of data to consume. If nil, consume entire buffer.
346-
def consume_read_buffer(size = nil)
347-
# If we are at eof, and the read buffer is empty, we can't consume anything.
348-
return nil if @eof && @read_buffer.empty?
349-
350-
result = nil
351-
352-
if size.nil? or size >= @read_buffer.bytesize
353-
# Consume the entire read buffer:
354-
result = @read_buffer
355-
@read_buffer = StringBuffer.new
356-
else
357-
# This approach uses more memory.
358-
# result = @read_buffer.slice!(0, size)
359-
360-
# We know that we are not going to reuse the original buffer.
361-
# But byteslice will generate a hidden copy. So let's freeze it first:
362-
@read_buffer.freeze
363-
364-
result = @read_buffer.byteslice(0, size)
365-
@read_buffer = @read_buffer.byteslice(size, @read_buffer.bytesize)
366-
end
367-
368-
return result
369-
end
37058
end
37159
end

0 commit comments

Comments
 (0)