Skip to content

Commit

Permalink
Fix stop_on_end = true closing underlying stream (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhz2 authored Mar 16, 2024
1 parent f2916c9 commit 0a46abf
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 101 deletions.
3 changes: 0 additions & 3 deletions docs/src/assets/modes.dot
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,16 @@ digraph modes {
"read" -> "panic";

"write" -> "write";
"write" -> "done";
"write" -> "close";
"write" -> "panic";

"stop" -> "close";
"done" -> "close";

"start" [ shape = point ];
"idle" [ shape = circle ];
"read" [ shape = circle ];
"write" [ shape = circle ];
"stop" [ shape = circle; style=bold; ];
"done" [ shape = circle; style=bold; ];
"close" [ shape = circle; style=bold; ];
"panic" [ shape = circle; style=bold; ];
}
112 changes: 47 additions & 65 deletions docs/src/assets/modes.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 0 additions & 1 deletion docs/src/devnotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ The `mode` field may be one of the following value:
- `:read` : being ready to read data, data may be buffered
- `:write`: being ready to write data, data may be buffered
- `:stop` : transcoding is stopped after read, data may be buffered
- `:done` : transcoding is stopped after write, data may be buffered
- `:close`: closed, no buffered data
- `:panic`: an exception has been thrown in codec, data may be buffered but we
cannot do anything
Expand Down
7 changes: 3 additions & 4 deletions ext/TestExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,11 @@ function TranscodingStreams.test_chunked_write(Encoder, Decoder)
buffer = IOBuffer()
stream = TranscodingStream(Decoder(), buffer, stop_on_end=true)
write(stream, vcat(data...))
flush(stream)
close(stream)
ok = true
ok &= hash(take!(buffer)) == hash(chunks[1])
ok &= buffersize(stream.state.buffer1) == length(data[2])
ok &= hash(take!(buffer)) == hash(vcat(chunks...))
ok &= buffersize(stream.state.buffer1) == 0
Test.@test ok
close(stream)
end
finalize(encoder)
end
Expand Down
5 changes: 4 additions & 1 deletion src/noop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ end

function TranscodingStream(codec::Noop, stream::IO;
bufsize::Integer=DEFAULT_BUFFER_SIZE,
stop_on_end::Bool=false,
sharedbuf::Bool=(stream isa TranscodingStream))
checkbufsize(bufsize)
checksharedbuf(sharedbuf, stream)
Expand All @@ -38,7 +39,9 @@ function TranscodingStream(codec::Noop, stream::IO;
else
buffer = Buffer(bufsize)
end
return TranscodingStream(codec, stream, State(buffer, buffer))
state = State(buffer, buffer)
state.stop_on_end = stop_on_end
return TranscodingStream(codec, stream, state)
end

"""
Expand Down
4 changes: 2 additions & 2 deletions src/state.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ See Developer's notes for details.
"""
mutable struct State
# current stream mode
mode::Symbol # {:idle, :read, :write, :stop, :done, :close, :panic}
mode::Symbol # {:idle, :read, :write, :stop, :close, :panic}

# return code of the last method call
code::Symbol # {:ok, :end, :error}

# flag to go :stop or :done on :end
# flag to go :stop on :end while reading
stop_on_end::Bool

# exception thrown while data processing
Expand Down
26 changes: 7 additions & 19 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ Arguments
The initial buffer size (the default size is 16KiB). The buffer may be
extended whenever `codec` requests so.
- `stop_on_end`:
The flag to stop transcoding on `:end` return code from `codec`. The
The flag to stop reading on `:end` return code from `codec`. The
transcoded data are readable even after stopping transcoding process. With
this flag on, `stream` is not closed when the wrapper stream is closed with
`close`. Note that some extra data may be read from `stream` into an
`close`. Note that if reading some extra data may be read from `stream` into an
internal buffer, and thus `stream` must be a `TranscodingStream` object and
`sharedbuf` must be `true` to reuse `stream`.
- `sharedbuf`:
Expand Down Expand Up @@ -184,11 +184,10 @@ end

function Base.close(stream::TranscodingStream)
mode = stream.state.mode
stopped = mode === :stop || mode === :done
if mode != :panic
changemode!(stream, :close)
end
if !stopped
if !stream.state.stop_on_end
close(stream.stream)
end
return nothing
Expand All @@ -214,8 +213,6 @@ end
continue
elseif mode == :write
return eof(stream.stream)
elseif mode == :done
return eof(stream.stream)
elseif mode == :close
return true
elseif mode == :stop
Expand Down Expand Up @@ -622,7 +619,7 @@ function flushbuffer(stream::TranscodingStream, all::Bool=false)
buffer1 = stream.buffer1
buffer2 = stream.buffer2
nflushed::Int = 0
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) && state.mode != :done
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0)
if state.code == :end
callstartproc(stream, :write)
end
Expand Down Expand Up @@ -689,8 +686,6 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
elseif state.code == :end && state.stop_on_end
if stream.state.mode == :read
changemode!(stream, :stop)
else
changemode!(stream, :done)
end
end
return Δin, Δout
Expand Down Expand Up @@ -775,11 +770,9 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
return
end
elseif mode == :write
if newmode == :close || newmode == :done
if newmode == :close
flushbufferall(stream)
flushuntilend(stream)
end
if newmode == :close
flushbufferall(stream)
flushuntilend(stream)
state.mode = newmode
finalize_codec(stream.codec, state.error)
return
Expand All @@ -789,11 +782,6 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
state.mode = newmode
return
end
elseif mode == :done
if newmode == :close
state.mode = newmode
return
end
elseif mode == :panic
throw_panic_error()
end
Expand Down
34 changes: 28 additions & 6 deletions test/codecdoubleframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,37 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
@test transcode(DoubleFrameDecoder, b"[ ]] ]") == b" ]"
@test transcode(DoubleFrameDecoder, b"[ aa ][ bb ]") == b"ab"

@testset "eof is true after write stops" begin
@testset "stop_on_end=true prevents underlying stream closing" begin
sink = IOBuffer()
stream = TranscodingStream(DoubleFrameDecoder(), sink, stop_on_end=true)
write(stream, "[ yy ]sdfsadfasdfdf")
flush(stream)
@test eof(stream)
@test_throws ArgumentError read(stream, UInt8)
@test take!(sink) == b"y"
write(stream, "[ yy ]")
write(stream, "[ xx ]")
close(stream)
@test isopen(sink)
@test take!(sink) == b"yx"
end

@testset "Issue #95" begin
@test sprint() do outer
inner = TranscodingStream(DoubleFrameEncoder(), outer, stop_on_end = true)
println(inner, "Hello, world.")
close(inner)
end == "[ HHeelllloo,, wwoorrlldd..\n\n ]"
end

@testset "TOKEN_END repeated doesn't create more empty frames" begin
sink = IOBuffer()
stream = TranscodingStream(DoubleFrameEncoder(), sink, stop_on_end=true)
write(stream, TranscodingStreams.TOKEN_END)
write(stream, TranscodingStreams.TOKEN_END)
write(stream, "abc")
write(stream, TranscodingStreams.TOKEN_END)
write(stream, "de")
write(stream, TranscodingStreams.TOKEN_END)
write(stream, "") # This doesn't create an empty frame
write(stream, TranscodingStreams.TOKEN_END)
close(stream)
@test String(take!(sink)) == "[ ][ aabbcc ][ ddee ]"
end

test_roundtrip_read(DoubleFrameEncoderStream, DoubleFrameDecoderStream)
Expand Down
Loading

0 comments on commit 0a46abf

Please sign in to comment.