Skip to content

Commit

Permalink
Add done mode (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhz2 authored Mar 6, 2024
1 parent 0915fe9 commit f2916c9
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 84 deletions.
4 changes: 3 additions & 1 deletion docs/src/assets/modes.dot
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ digraph modes {
"read" -> "panic";

"write" -> "write";
"write" -> "stop";
"write" -> "done";
"write" -> "close";
"write" -> "panic";

"stop" -> "close";
"done" -> "close";

"start" [ shape = point ];
"idle" [ shape = circle ];
"read" [ shape = circle ];
"write" [ shape = circle ];
"stop" [ shape = circle; style=bold; ];
"done" [ shape = circle; style=bold; ];
"close" [ shape = circle; style=bold; ];
"panic" [ shape = circle; style=bold; ];
}
171 changes: 102 additions & 69 deletions docs/src/assets/modes.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion docs/src/devnotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ The `mode` field may be one of the following value:
- `:idle` : initial and intermediate mode, no buffered data
- `:read` : being ready to read data, data may be buffered
- `:write`: being ready to write data, data may be buffered
- `:stop` : transcoding is stopped, data may be buffered
- `:stop` : transcoding is stopped after read, data may be buffered
- `:done` : transcoding is stopped after write, data may be buffered
- `:close`: closed, no buffered data
- `:panic`: an exception has been thrown in codec, data may be buffered but we
cannot do anything
Expand Down
4 changes: 2 additions & 2 deletions src/state.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ See Developer's notes for details.
"""
mutable struct State
# current stream mode
mode::Symbol # {:idle, :read, :write, :stop, :close, :panic}
mode::Symbol # {:idle, :read, :write, :stop, :done, :close, :panic}

# return code of the last method call
code::Symbol # {:ok, :end, :error}

# flag to go :stop on :end
# flag to go :stop or :done on :end
stop_on_end::Bool

# exception thrown while data processing
Expand Down
26 changes: 18 additions & 8 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ function Base.isopen(stream::TranscodingStream)
end

function Base.close(stream::TranscodingStream)
stopped = stream.state.mode == :stop
if stream.state.mode != :panic
mode = stream.state.mode
stopped = mode === :stop || mode === :done
if mode != :panic
changemode!(stream, :close)
end
if !stopped
Expand Down Expand Up @@ -213,6 +214,8 @@ end
continue
elseif mode == :write
return eof(stream.stream)
elseif mode == :done
return eof(stream.stream)
elseif mode == :close
return true
elseif mode == :stop
Expand Down Expand Up @@ -252,7 +255,7 @@ function Base.skip(stream::TranscodingStream, offset::Integer)
mode = stream.state.mode
buffer1 = stream.buffer1
skipped = 0
if mode == :read
if mode === :read || mode === :stop
while !eof(stream) && buffersize(buffer1) < offset - skipped
n = buffersize(buffer1)
emptybuffer!(buffer1)
Expand Down Expand Up @@ -619,7 +622,7 @@ function flushbuffer(stream::TranscodingStream, all::Bool=false)
buffer1 = stream.buffer1
buffer2 = stream.buffer2
nflushed::Int = 0
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) && state.mode != :stop
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) && state.mode != :done
if state.code == :end
callstartproc(stream, :write)
end
Expand Down Expand Up @@ -684,7 +687,11 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
# When no progress, expand the output buffer.
makemargin!(outbuf, max(16, marginsize(outbuf) * 2))
elseif state.code == :end && state.stop_on_end
changemode!(stream, :stop)
if stream.state.mode == :read
changemode!(stream, :stop)
else
changemode!(stream, :done)
end
end
return Δin, Δout
end
Expand Down Expand Up @@ -738,8 +745,6 @@ end
function changemode!(stream::TranscodingStream, newmode::Symbol)
state = stream.state
mode = state.mode
buffer1 = stream.buffer1
buffer2 = stream.buffer2
if mode == newmode
# mode does not change
return
Expand Down Expand Up @@ -770,7 +775,7 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
return
end
elseif mode == :write
if newmode == :close || newmode == :stop
if newmode == :close || newmode == :done
if newmode == :close
flushbufferall(stream)
flushuntilend(stream)
Expand All @@ -784,6 +789,11 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
state.mode = newmode
return
end
elseif mode == :done
if newmode == :close
state.mode = newmode
return
end
elseif mode == :panic
throw_panic_error()
end
Expand Down
5 changes: 2 additions & 3 deletions test/codecdoubleframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,8 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
stream = TranscodingStream(DoubleFrameDecoder(), sink, stop_on_end=true)
write(stream, "[ yy ]sdfsadfasdfdf")
flush(stream)
@test_broken eof(stream)
# TODO This is broken.
# @test_throws ArgumentError read(stream, UInt8)
@test eof(stream)
@test_throws ArgumentError read(stream, UInt8)
@test take!(sink) == b"y"
close(stream)
end
Expand Down

0 comments on commit f2916c9

Please sign in to comment.