Skip to content

Commit

Permalink
Performance: skip background status sync throttle during reactive run (
Browse files Browse the repository at this point in the history
  • Loading branch information
fonsp authored Aug 6, 2024
1 parent 74e9801 commit 674c322
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 44 deletions.
10 changes: 7 additions & 3 deletions src/evaluation/Run.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import REPL: ends_with_semicolon
import .Configuration
import .Throttled
import ExpressionExplorer: is_joined_funcname

"""
Expand Down Expand Up @@ -119,10 +120,13 @@ function run_reactive_core!(

# Save the notebook. In most cases, this is the only time that we save the notebook, so any state changes that influence the file contents (like `depends_on_disabled_cells`) should be behind this point. (More saves might happen if a macro expansion or package using happens.)
save && save_notebook(session, notebook)

# Send intermediate updates to the clients at most 20 times / second during a reactive run. (The effective speed of a slider is still unbounded, because the last update is not throttled.)
# flush_send_notebook_changes_throttled,
send_notebook_changes_throttled, flush_notebook_changes = throttled(1.0 / 20; runtime_multiplier=2.0) do
send_notebook_changes_throttled = Throttled.throttled(1.0 / 20; runtime_multiplier=2.0) do
# We will do a state sync now, so that means that we can delay the status_tree state sync loop, see https://github.com/fonsp/Pluto.jl/issues/2978
Throttled.force_throttle_without_run(notebook.status_tree.update_listener_ref[])
# State sync:
send_notebook_changes!(ClientRequest(; session, notebook))
end
send_notebook_changes_throttled()
Expand Down Expand Up @@ -229,8 +233,8 @@ function run_reactive_core!(
end

notebook.wants_to_interrupt = false
flush_notebook_changes()
Status.report_business_finished!(run_status)
flush(send_notebook_changes_throttled)
return new_order
end

Expand Down
101 changes: 71 additions & 30 deletions src/evaluation/Throttled.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,55 @@
module Throttled

import Base.Threads


struct ThrottledFunction
f::Function
timeout::Real
runtime_multiplier::Float64
tlock::ReentrantLock
iscoolnow::Ref{Bool}
run_later::Ref{Bool}
last_runtime::Ref{Float64}
end

"Run the function now"
function Base.flush(tf::ThrottledFunction)
lock(tf.tlock) do
tf.run_later[] = false
tf.last_runtime[] = @elapsed result = tf.f()
result
end
end

"Start the cooldown period. If at the end, a run_later[] is set, then we run the function and schedule the next cooldown period."
function schedule(tf::ThrottledFunction)
# if the last runtime was quite long, increase the sleep period to match.
Timer(tf.timeout + tf.last_runtime[] * tf.runtime_multiplier) do _t
if tf.run_later[]
flush(tf)
schedule(tf)
else
tf.iscoolnow[] = true
end
end
end

function (tf::ThrottledFunction)()
if tf.iscoolnow[]
tf.iscoolnow[] = false
flush(tf)
schedule(tf)
else
tf.run_later[] = true
end
nothing
end





"""
throttled(f::Function, timeout::Real)
Expand All @@ -19,41 +69,30 @@ function throttled(f::Function, timeout::Real; runtime_multiplier::Float64=0.0)
run_later = Ref(false)
last_runtime = Ref(0.0)

function flush()
lock(tlock) do
run_later[] = false
last_runtime[] = @elapsed result = f()
result
end
end

function schedule()
# if the last runtime was quite long, increase the sleep period to match.
Timer(timeout + last_runtime[] * runtime_multiplier) do _t
if run_later[]
flush()
schedule()
else
iscoolnow[] = true
end
end
end
tf = ThrottledFunction(f, timeout, runtime_multiplier, tlock, iscoolnow, run_later, last_runtime)

# we initialize hot, and start the cooldown period immediately
schedule()
schedule(tf)

return tf
end

function throttled_f()
if iscoolnow[]
iscoolnow[] = false
flush()
schedule()
else
run_later[] = true
end
end
"""
Given a throttled function, skip any pending run if hot (but let the cooldown period continue), or start the cooldown period if cool. This forces the throttled function to not fire for a little while.
return throttled_f, flush
Argument should be the first function returned by `throttled`.
"""
function force_throttle_without_run(tf::ThrottledFunction)
# (we can access variables from the function closure hihi)
tf.run_later[] = false
if tf.iscoolnow[]
tf.iscoolnow[] = false
schedule(tf)
end
end

force_throttle_without_run(::Function) = nothing


"""
simple_leading_throttle(f, delay::Real)
Expand All @@ -74,4 +113,6 @@ function simple_leading_throttle(f, delay::Real)
f(args...;kwargs...)
end
end
end

end
2 changes: 1 addition & 1 deletion src/evaluation/WorkspaceManager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ function start_relaying_self_updates((session, notebook)::SN, run_channel)
end

function start_relaying_logs((session, notebook)::SN, log_channel)
update_throttled, flush_throttled = Pluto.throttled(0.1) do
update_throttled = Pluto.Throttled.throttled(0.1) do
Pluto.send_notebook_changes!(Pluto.ClientRequest(; session, notebook))
end

Expand Down
3 changes: 2 additions & 1 deletion src/webserver/Authentication.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import .Throttled

"""
Return whether the `request` was authenticated in one of two ways:
Expand Down Expand Up @@ -29,7 +30,7 @@ function is_authenticated(session::ServerSession, request::HTTP.Request)
end

# Function to log the url with secret on the Julia CLI when a request comes to the server without the secret. Executes at most once every 5 seconds
const log_secret_throttled = simple_leading_throttle(5) do session::ServerSession, request::HTTP.Request
const log_secret_throttled = Throttled.simple_leading_throttle(5) do session::ServerSession, request::HTTP.Request
host = HTTP.header(request, "Host")
target = request.target
url = Text(string(HTTP.URI(HTTP.URI("http://$host/"); query=Dict("secret" => session.secret))))
Expand Down
2 changes: 1 addition & 1 deletion src/webserver/Dynamic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ end

responses[:reset_shared_state] = function response_reset_shared_state(🙋::ClientRequest)
delete!(current_state_for_clients, 🙋.initiator.client)
send_notebook_changes!(🙋; commentary=Dict(:from_reset => true))
send_notebook_changes!(🙋; commentary=Dict(:from_reset => true))
end

responses[:run_multiple_cells] = function response_run_multiple_cells(🙋::ClientRequest)
Expand Down
7 changes: 3 additions & 4 deletions src/webserver/SessionActions.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module SessionActions

import ..Pluto: Pluto, Status, ServerSession, Notebook, Cell, emptynotebook, tamepath, new_notebooks_directory, without_pluto_file_extension, numbered_until_new, cutename, readwrite, update_save_run!, update_nbpkg_cache!, update_from_file, wait_until_file_unchanged, putnotebookupdates!, putplutoupdates!, load_notebook, clientupdate_notebook_list, WorkspaceManager, try_event_call, NewNotebookEvent, OpenNotebookEvent, ShutdownNotebookEvent, @asynclog, ProcessStatus, maybe_convert_path_to_wsl, move_notebook!, throttled
import ..Pluto: Pluto, Status, ServerSession, Notebook, Cell, emptynotebook, tamepath, new_notebooks_directory, without_pluto_file_extension, numbered_until_new, cutename, readwrite, update_save_run!, update_nbpkg_cache!, update_from_file, wait_until_file_unchanged, putnotebookupdates!, putplutoupdates!, load_notebook, clientupdate_notebook_list, WorkspaceManager, try_event_call, NewNotebookEvent, OpenNotebookEvent, ShutdownNotebookEvent, @asynclog, ProcessStatus, maybe_convert_path_to_wsl, move_notebook!, Throttled
using FileWatching
import ..Pluto.DownloadCool: download_cool
import HTTP
Expand Down Expand Up @@ -186,10 +186,9 @@ function add(session::ServerSession, notebook::Notebook; run_async::Bool=true)
end
end

notebook.status_tree.update_listener_ref[] = first(throttled(1.0 / 5; runtime_multiplier=2.0) do
# TODO: this throttle should be trailing
notebook.status_tree.update_listener_ref[] = Throttled.throttled(1.0 / 8; runtime_multiplier=4.0) do
Pluto.send_notebook_changes!(Pluto.ClientRequest(; session, notebook))
end)
end

return notebook
end
Expand Down
2 changes: 1 addition & 1 deletion src/webserver/Status.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Base.@kwdef mutable struct Business
started_at::Union{Nothing,Float64}=nothing
finished_at::Union{Nothing,Float64}=nothing
subtasks::Dict{Symbol,Business}=Dict{Symbol,Business}()
update_listener_ref::Ref{Function}=Ref{Function}(_default_update_listener)
update_listener_ref::Ref{Any}=Ref{Any}(_default_update_listener)
lock::Threads.SpinLock=Threads.SpinLock()
end

Expand Down
31 changes: 28 additions & 3 deletions test/Throttled.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Pluto:throttled
import Pluto: Throttled
using Pluto.WorkspaceManager: poll

@testset "Throttled" begin
Expand All @@ -12,7 +12,7 @@ using Pluto.WorkspaceManager: poll
@test x[] == 1

dt = 4 / 100
ft, flush = throttled(f, dt)
ft = Throttled.throttled(f, dt)

for x in 1:10
ft()
Expand Down Expand Up @@ -82,7 +82,7 @@ using Pluto.WorkspaceManager: poll
ft()
ft()
@test x[] == 12
flush()
flush(ft)
@test x[] == 13
sleep(2dt)
@test x[] == 13
Expand All @@ -108,6 +108,31 @@ using Pluto.WorkspaceManager: poll
sleep(2dt)

####
x[] = 0
Throttled.force_throttle_without_run(ft)
@test x[] == 0
ft()
@test x[] == 0
sleep(.1dt)
@test x[] == 0
sleep(2dt)
@test x[] == 1


ft()
@test x[] == 2
sleep(.1dt)
ft()
Throttled.force_throttle_without_run(ft)
@test x[] == 2
sleep(2dt)
@test x[] == 2


ft()
@test x[] == 3
sleep(2dt)

####

end

0 comments on commit 674c322

Please sign in to comment.