From 0a46abfb190fee808f7d248d94050e4aa97a5a08 Mon Sep 17 00:00:00 2001 From: Nathan Zimmerberg <39104088+nhz2@users.noreply.github.com> Date: Sat, 16 Mar 2024 17:38:41 -0400 Subject: [PATCH] Fix `stop_on_end = true` closing underlying stream (#178) --- docs/src/assets/modes.dot | 3 - docs/src/assets/modes.svg | 112 ++++++++++++++++---------------------- docs/src/devnotes.md | 1 - ext/TestExt.jl | 7 +-- src/noop.jl | 5 +- src/state.jl | 4 +- src/stream.jl | 26 +++------ test/codecdoubleframe.jl | 34 ++++++++++-- test/codecnoop.jl | 10 ++++ 9 files changed, 101 insertions(+), 101 deletions(-) diff --git a/docs/src/assets/modes.dot b/docs/src/assets/modes.dot index 2031a77d..f0ebae56 100644 --- a/docs/src/assets/modes.dot +++ b/docs/src/assets/modes.dot @@ -12,19 +12,16 @@ digraph modes { "read" -> "panic"; "write" -> "write"; - "write" -> "done"; "write" -> "close"; "write" -> "panic"; "stop" -> "close"; - "done" -> "close"; "start" [ shape = point ]; "idle" [ shape = circle ]; "read" [ shape = circle ]; "write" [ shape = circle ]; "stop" [ shape = circle; style=bold; ]; - "done" [ shape = circle; style=bold; ]; "close" [ shape = circle; style=bold; ]; "panic" [ shape = circle; style=bold; ]; } diff --git a/docs/src/assets/modes.svg b/docs/src/assets/modes.svg index a7ddb427..a9bdab0f 100644 --- a/docs/src/assets/modes.svg +++ b/docs/src/assets/modes.svg @@ -1,150 +1,132 @@ - - - + + modes - + start - + idle - -idle + +idle start->idle - - + + read - -read + +read idle->read - - + + write - -write + +write idle->write - - + + close - -close + +close idle->close - - + + panic - -panic + +panic idle->panic - - + + read->read - - + + read->close - - + + read->panic - - + + stop - -stop + +stop read->stop - - + + write->write - - + + - + write->close - - + + - + write->panic - - - - - -done - -done - - - -write->done - - + + - + stop->close - - - - - -done->close - - + + diff --git a/docs/src/devnotes.md b/docs/src/devnotes.md index 895bb54e..5fd968c8 100644 --- a/docs/src/devnotes.md +++ b/docs/src/devnotes.md @@ -47,7 +47,6 @@ The `mode` field may be one of the following value: - `:read` : being ready to read data, data may be buffered - `:write`: being ready to write data, data may be buffered - `:stop` : transcoding is stopped after read, data may be buffered -- `:done` : transcoding is stopped after write, data may be buffered - `:close`: closed, no buffered data - `:panic`: an exception has been thrown in codec, data may be buffered but we cannot do anything diff --git a/ext/TestExt.jl b/ext/TestExt.jl index 66714827..cac67dd8 100644 --- a/ext/TestExt.jl +++ b/ext/TestExt.jl @@ -108,12 +108,11 @@ function TranscodingStreams.test_chunked_write(Encoder, Decoder) buffer = IOBuffer() stream = TranscodingStream(Decoder(), buffer, stop_on_end=true) write(stream, vcat(data...)) - flush(stream) + close(stream) ok = true - ok &= hash(take!(buffer)) == hash(chunks[1]) - ok &= buffersize(stream.state.buffer1) == length(data[2]) + ok &= hash(take!(buffer)) == hash(vcat(chunks...)) + ok &= buffersize(stream.state.buffer1) == 0 Test.@test ok - close(stream) end finalize(encoder) end diff --git a/src/noop.jl b/src/noop.jl index f3c341c1..143a6e4d 100644 --- a/src/noop.jl +++ b/src/noop.jl @@ -30,6 +30,7 @@ end function TranscodingStream(codec::Noop, stream::IO; bufsize::Integer=DEFAULT_BUFFER_SIZE, + stop_on_end::Bool=false, sharedbuf::Bool=(stream isa TranscodingStream)) checkbufsize(bufsize) checksharedbuf(sharedbuf, stream) @@ -38,7 +39,9 @@ function TranscodingStream(codec::Noop, stream::IO; else buffer = Buffer(bufsize) end - return TranscodingStream(codec, stream, State(buffer, buffer)) + state = State(buffer, buffer) + state.stop_on_end = stop_on_end + return TranscodingStream(codec, stream, state) end """ diff --git a/src/state.jl b/src/state.jl index eb68c487..0cb54b6c 100644 --- a/src/state.jl +++ b/src/state.jl @@ -9,12 +9,12 @@ See Developer's notes for details. """ mutable struct State # current stream mode - mode::Symbol # {:idle, :read, :write, :stop, :done, :close, :panic} + mode::Symbol # {:idle, :read, :write, :stop, :close, :panic} # return code of the last method call code::Symbol # {:ok, :end, :error} - # flag to go :stop or :done on :end + # flag to go :stop on :end while reading stop_on_end::Bool # exception thrown while data processing diff --git a/src/stream.jl b/src/stream.jl index 0bd9a491..deca707f 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -85,10 +85,10 @@ Arguments The initial buffer size (the default size is 16KiB). The buffer may be extended whenever `codec` requests so. - `stop_on_end`: - The flag to stop transcoding on `:end` return code from `codec`. The + The flag to stop reading on `:end` return code from `codec`. The transcoded data are readable even after stopping transcoding process. With this flag on, `stream` is not closed when the wrapper stream is closed with - `close`. Note that some extra data may be read from `stream` into an + `close`. Note that if reading some extra data may be read from `stream` into an internal buffer, and thus `stream` must be a `TranscodingStream` object and `sharedbuf` must be `true` to reuse `stream`. - `sharedbuf`: @@ -184,11 +184,10 @@ end function Base.close(stream::TranscodingStream) mode = stream.state.mode - stopped = mode === :stop || mode === :done if mode != :panic changemode!(stream, :close) end - if !stopped + if !stream.state.stop_on_end close(stream.stream) end return nothing @@ -214,8 +213,6 @@ end continue elseif mode == :write return eof(stream.stream) - elseif mode == :done - return eof(stream.stream) elseif mode == :close return true elseif mode == :stop @@ -622,7 +619,7 @@ function flushbuffer(stream::TranscodingStream, all::Bool=false) buffer1 = stream.buffer1 buffer2 = stream.buffer2 nflushed::Int = 0 - while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) && state.mode != :done + while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) if state.code == :end callstartproc(stream, :write) end @@ -689,8 +686,6 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) elseif state.code == :end && state.stop_on_end if stream.state.mode == :read changemode!(stream, :stop) - else - changemode!(stream, :done) end end return Δin, Δout @@ -775,11 +770,9 @@ function changemode!(stream::TranscodingStream, newmode::Symbol) return end elseif mode == :write - if newmode == :close || newmode == :done - if newmode == :close - flushbufferall(stream) - flushuntilend(stream) - end + if newmode == :close + flushbufferall(stream) + flushuntilend(stream) state.mode = newmode finalize_codec(stream.codec, state.error) return @@ -789,11 +782,6 @@ function changemode!(stream::TranscodingStream, newmode::Symbol) state.mode = newmode return end - elseif mode == :done - if newmode == :close - state.mode = newmode - return - end elseif mode == :panic throw_panic_error() end diff --git a/test/codecdoubleframe.jl b/test/codecdoubleframe.jl index 52516595..4b5e4e53 100644 --- a/test/codecdoubleframe.jl +++ b/test/codecdoubleframe.jl @@ -210,15 +210,37 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD @test transcode(DoubleFrameDecoder, b"[ ]] ]") == b" ]" @test transcode(DoubleFrameDecoder, b"[ aa ][ bb ]") == b"ab" - @testset "eof is true after write stops" begin + @testset "stop_on_end=true prevents underlying stream closing" begin sink = IOBuffer() stream = TranscodingStream(DoubleFrameDecoder(), sink, stop_on_end=true) - write(stream, "[ yy ]sdfsadfasdfdf") - flush(stream) - @test eof(stream) - @test_throws ArgumentError read(stream, UInt8) - @test take!(sink) == b"y" + write(stream, "[ yy ]") + write(stream, "[ xx ]") close(stream) + @test isopen(sink) + @test take!(sink) == b"yx" + end + + @testset "Issue #95" begin + @test sprint() do outer + inner = TranscodingStream(DoubleFrameEncoder(), outer, stop_on_end = true) + println(inner, "Hello, world.") + close(inner) + end == "[ HHeelllloo,, wwoorrlldd..\n\n ]" + end + + @testset "TOKEN_END repeated doesn't create more empty frames" begin + sink = IOBuffer() + stream = TranscodingStream(DoubleFrameEncoder(), sink, stop_on_end=true) + write(stream, TranscodingStreams.TOKEN_END) + write(stream, TranscodingStreams.TOKEN_END) + write(stream, "abc") + write(stream, TranscodingStreams.TOKEN_END) + write(stream, "de") + write(stream, TranscodingStreams.TOKEN_END) + write(stream, "") # This doesn't create an empty frame + write(stream, TranscodingStreams.TOKEN_END) + close(stream) + @test String(take!(sink)) == "[ ][ aabbcc ][ ddee ]" end test_roundtrip_read(DoubleFrameEncoderStream, DoubleFrameDecoderStream) diff --git a/test/codecnoop.jl b/test/codecnoop.jl index 33d7934b..a83aa57a 100644 --- a/test/codecnoop.jl +++ b/test/codecnoop.jl @@ -360,4 +360,14 @@ close(stream) end end + + @testset "stop_on_end=true prevents underlying stream closing" begin + sink = IOBuffer() + stream = NoopStream(sink, stop_on_end=true) + write(stream, "abcd") + close(stream) + @test isopen(sink) + @test take!(sink) == b"abcd" + end + end