-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bin): don't allocate in server UDP recv path #2202
base: main
Are you sure you want to change the base?
Conversation
Benchmark resultsPerformance differences relative to 8e36f63. coalesce_acked_from_zero 1+1 entries: No change in performance detected.time: [98.981 ns 99.347 ns 99.713 ns] change: [-0.6200% -0.1765% +0.3186%] (p = 0.46 > 0.05) coalesce_acked_from_zero 3+1 entries: No change in performance detected.time: [117.33 ns 117.84 ns 118.49 ns] change: [-0.2259% +0.2445% +0.8181%] (p = 0.36 > 0.05) coalesce_acked_from_zero 10+1 entries: No change in performance detected.time: [116.93 ns 117.44 ns 118.02 ns] change: [-0.7469% +0.0499% +0.8901%] (p = 0.91 > 0.05) coalesce_acked_from_zero 1000+1 entries: No change in performance detected.time: [96.825 ns 99.825 ns 106.74 ns] change: [-0.8824% +3.4394% +11.285%] (p = 0.60 > 0.05) RxStreamOrderer::inbound_frame(): Change within noise threshold.time: [112.37 ms 112.42 ms 112.47 ms] change: [+0.6095% +0.6744% +0.7378%] (p = 0.00 < 0.05) transfer/pacing-false/varying-seeds: No change in performance detected.time: [26.461 ms 27.555 ms 28.653 ms] change: [-9.8255% -4.6029% +0.5740%] (p = 0.10 > 0.05) transfer/pacing-true/varying-seeds: Change within noise threshold.time: [33.783 ms 35.364 ms 36.962 ms] change: [-12.746% -6.7312% -0.6669%] (p = 0.03 < 0.05) transfer/pacing-false/same-seed: 💚 Performance has improved.time: [24.635 ms 25.311 ms 25.978 ms] change: [-13.346% -9.5925% -5.4715%] (p = 0.00 < 0.05) transfer/pacing-true/same-seed: No change in performance detected.time: [41.390 ms 43.471 ms 45.573 ms] change: [-11.808% -5.4065% +1.3004%] (p = 0.12 > 0.05) 1-conn/1-100mb-resp/mtu-1500 (aka. Download)/client: No change in performance detected.time: [895.50 ms 904.04 ms 912.77 ms] thrpt: [109.56 MiB/s 110.61 MiB/s 111.67 MiB/s] change: time: [-0.8172% +0.5809% +1.9620%] (p = 0.41 > 0.05) thrpt: [-1.9242% -0.5775% +0.8239%] 1-conn/10_000-parallel-1b-resp/mtu-1500 (aka. RPS)/client: No change in performance detected.time: [318.88 ms 321.96 ms 325.06 ms] thrpt: [30.764 Kelem/s 31.059 Kelem/s 31.360 Kelem/s] change: time: [-0.8657% +0.5677% +2.0015%] (p = 0.44 > 0.05) thrpt: [-1.9622% -0.5645% +0.8733%] 1-conn/1-1b-resp/mtu-1500 (aka. HPS)/client: No change in performance detected.time: [34.013 ms 34.186 ms 34.377 ms] thrpt: [29.089 elem/s 29.252 elem/s 29.401 elem/s] change: time: [-0.5946% +0.2934% +1.1611%] (p = 0.51 > 0.05) thrpt: [-1.1478% -0.2925% +0.5982%] 1-conn/1-100mb-resp/mtu-1500 (aka. Upload)/client: 💚 Performance has improved.time: [1.6919 s 1.7079 s 1.7240 s] thrpt: [58.005 MiB/s 58.553 MiB/s 59.106 MiB/s] change: time: [-5.2315% -3.9512% -2.5887%] (p = 0.00 < 0.05) thrpt: [+2.6575% +4.1137% +5.5203%] 1-conn/1-100mb-resp/mtu-65536 (aka. Download)/client: 💔 Performance has regressed.time: [111.81 ms 112.05 ms 112.29 ms] thrpt: [890.55 MiB/s 892.47 MiB/s 894.37 MiB/s] change: time: [+1.7079% +2.0413% +2.3807%] (p = 0.00 < 0.05) thrpt: [-2.3254% -2.0005% -1.6792%] 1-conn/10_000-parallel-1b-resp/mtu-65536 (aka. RPS)/client: No change in performance detected.time: [318.06 ms 321.39 ms 324.67 ms] thrpt: [30.801 Kelem/s 31.115 Kelem/s 31.440 Kelem/s] change: time: [-0.2001% +1.4594% +2.9933%] (p = 0.06 > 0.05) thrpt: [-2.9063% -1.4384% +0.2005%] 1-conn/1-1b-resp/mtu-65536 (aka. HPS)/client: Change within noise threshold.time: [33.714 ms 33.909 ms 34.121 ms] thrpt: [29.307 elem/s 29.491 elem/s 29.661 elem/s] change: time: [-1.9609% -1.0950% -0.2330%] (p = 0.01 < 0.05) thrpt: [+0.2336% +1.1071% +2.0001%] 1-conn/1-100mb-resp/mtu-65536 (aka. Upload)/client: No change in performance detected.time: [268.59 ms 284.75 ms 307.00 ms] thrpt: [325.73 MiB/s 351.19 MiB/s 372.32 MiB/s] change: time: [-19.367% -8.2376% +3.5985%] (p = 0.22 > 0.05) thrpt: [-3.4735% +8.9771% +24.018%] Client/server transfer resultsTransfer of 33554432 bytes over loopback.
|
02fe570
to
32ce4ae
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2202 +/- ##
==========================================
- Coverage 95.39% 95.36% -0.03%
==========================================
Files 112 112
Lines 36447 36447
==========================================
- Hits 34768 34759 -9
- Misses 1679 1688 +9 ☔ View full report in Codecov by Sentry. |
Failed Interop TestsQUIC Interop Runner, client vs. server neqo-latest as client
neqo-latest as server
All resultsSucceeded Interop TestsQUIC Interop Runner, client vs. server neqo-latest as client
neqo-latest as server
Unsupported Interop TestsQUIC Interop Runner, client vs. server neqo-latest as client
neqo-latest as server
|
32ce4ae
to
bdb5da9
Compare
Previously the `neqo-bin` server would read a set of datagrams from the socket and allocate them: ``` rust let dgrams: Vec<Datagram> = dgrams.map(|d| d.to_owned()).collect(); ``` This was done out of convenience, as handling `Datagram<&[u8]>`s, each borrowing from `self.recv_buf`, is hard to get right across multiple `&mut self` functions, that is here `self.run`, `self.process` and `self.find_socket`. This commit combines `self.process` and `self.find_socket` and passes a socket index, instead of the read `Datagram`s from `self.run` to `self.process`, thus making the Rust borrow checker happy to handle borrowing `Datagram<&[u8]>`s instead of owning `Datagram`s.
bdb5da9
to
12bb957
Compare
neqo-bin/src/server/mod.rs
Outdated
loop { | ||
match self.server.process(dgram.take(), (self.now)()) { | ||
let input_dgram = if let Some(d) = input_dgrams.iter_mut().flatten().next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this code to be a little unintuitive. You are taking a mutable iterator over the option, then flattening it. It's not clear that you are mutating the underlying iterator as a result of calling next()
.
Taking a step back, I think that this code is fairly simple:
You read from the indicated socket, process every datagram that it produces, and stop when the socket stops producing datagrams.
Would this structure work?
fn whatever(ready_socket_index: Option<usize>) -> Res<()> {
let Some(inx) = ready_socket_index else {
return Ok(());
};
let (host, socket) = self.sockets.get_mut(inx).unwrap();
while let Some(input_dgrams) = socket.recv(*host, &mut self.recv_buf)? {
for input in input_dgrams {
match self.server.process(input, (self.now)()) {
// see below for a note about sending.
Output::Datagram(output) => Self::send(&mut self.sockets, &output).await?,
Output::Callback(t) => {
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)))
}
Output::None => {}
}
}
}
Ok(())
}
I think that's functionally equivalent to what you have, but a lot easier to read, at least for me.
I'm not a borrow checker, so I couldn't say if this works. There are a lot of references being held here. I can't see any obvious overlap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this code to be a little unintuitive.
Agreed. The complexity stems from the fact that neqo_transport::Server
does not have a process_multiple_input
function, thus having to handle individual output Datagram
s while still buffering input Datagram
s.
neqo_transport::Server
does not have a process_multiple_input
function, because the set of input Datagram
s provided might each be for a different neqo_transport::Connection
, thus each result in a Output::Datagram
and thus process_multiple_input
would need to return a set of output Datagram
s, not just one Datagram
.
If you think it is worth it, I can explore this pathway, i.e. adding process_multiple_input
to neqo_transport::Server
. Preferably in a follow-up pull request.
Addressing the concrete suggestion above:
let Some(inx) = ready_socket_index else {
return Ok(());
};
process
(or whatever
above) might be called with ready_socket_index
None
, in which case it is expected to drive the output path only, i.e. not just return
early.
while let Some(input_dgrams) = socket.recv(*host, &mut self.recv_buf)? {
for input in input_dgrams {
match self.server.process(input, (self.now)()) {
// see below for a note about sending.
Output::Datagram(output) => Self::send(&mut self.sockets, &output).await?,
Output::Callback(t) => {
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)))
}
Output::None => {}
}
}
}
If self.server.process
returns Output::Datagram
, one has to call it again, until it returns Output::Callback
or Output::None
. In the above suggestion, self.server.process
is only called, if more input datagrams are available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like there are two functions here then:
- When
ready_socket_index
isNone
, where you just callprocess_output
and send out what comes. - When
ready_socket_index
isSome
, where you do what I outlined.
That leads to:
/// Returns `true` if it wants to be called again.
fn drive_sending(&mut self, mut input: Option<Datagram>) {
loop {
match self.server.process(input.take(), (self.now)()) {
// see below for a note about sending.
Output::Datagram(output) => Self::send(&mut self.sockets, &output).await?,
Output::Callback(t) => {
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
return;
}
Output::None => return,
}
true
}
}
let Some(inx) = ready_socket_index else {
self.drive_sending(None);
return Ok(());
};
while let Some(input_dgrams) = socket.recv(*host, &mut self.recv_buf)? {
for input in input_dgrams {
drive_sending(input);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I correctly understand the above, 91c7920 implements your suggestion. Please take a look @martinthomson.
fn drive_sending(&mut self, mut input: Option<Datagram>) {
Note that when calling drive_sending
with an input datagram, one can not pass &mut self
to drive_sending
, as self.recv_buf
is borrowed already.
neqo-bin/src/server/mod.rs
Outdated
/// Tries to find a socket, but then just falls back to sending from the first. | ||
fn find_socket(&mut self, addr: SocketAddr) -> &mut crate::udp::Socket { | ||
let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap(); | ||
rest.iter_mut() | ||
.map(|(_host, socket)| socket) | ||
.find(|socket| { | ||
socket | ||
.local_addr() | ||
.ok() | ||
.map_or(false, |socket_addr| socket_addr == addr) | ||
}) | ||
.unwrap_or(first_socket) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved outside of impl ServerRunner {}
, i.e. below, as it no longer takes &mut self
but instead socket: &mut [(SocketAddr, crate::udp::Socket)]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can move that back into ServerRunner
. Because it is dealing with internals of that struct, it might make sense to do that. The only cost is that you invoke with Self::process
rather than just process
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Done in 1b7ce8f.
neqo-bin/src/server/mod.rs
Outdated
async fn process(&mut self, mut dgram: Option<Datagram>) -> Result<(), io::Error> { | ||
loop { | ||
match self.server.process(dgram.take(), (self.now)()) { | ||
Output::Datagram(dgram) => { | ||
let socket = self.find_socket(dgram.source()); | ||
socket.writable().await?; | ||
socket.send(&dgram)?; | ||
} | ||
Output::Callback(new_timeout) => { | ||
qdebug!("Setting timeout of {:?}", new_timeout); | ||
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout))); | ||
break; | ||
} | ||
Output::None => { | ||
break; | ||
} | ||
} | ||
} | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As with find_socket
moved below into free function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this looks much better and is more or less what I suggested. Nice. Consider the free function choice as I suggest though: I tend to prefer some amount of encapsulation for these sorts of functions, particularly when they are touching all of the internal pieces of the struct.
neqo-bin/src/server/mod.rs
Outdated
/// Tries to find a socket, but then just falls back to sending from the first. | ||
fn find_socket(&mut self, addr: SocketAddr) -> &mut crate::udp::Socket { | ||
let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap(); | ||
rest.iter_mut() | ||
.map(|(_host, socket)| socket) | ||
.find(|socket| { | ||
socket | ||
.local_addr() | ||
.ok() | ||
.map_or(false, |socket_addr| socket_addr == addr) | ||
}) | ||
.unwrap_or(first_socket) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can move that back into ServerRunner
. Because it is dealing with internals of that struct, it might make sense to do that. The only cost is that you invoke with Self::process
rather than just process
.
Previously the
neqo-bin
server would read a set of datagrams from the socket and allocate them:This was done out of convenience, as handling
Datagram<&[u8]>
s, each borrowing fromself.recv_buf
, is hard to get right across multiple&mut self
functions, that is hereself.run
,self.process
andself.find_socket
.This commit combines
self.process
andself.find_socket
and passes a socket index, instead of the readDatagram
s fromself.run
toself.process
, thus making the Rust borrow checker happy to handle borrowingDatagram<&[u8]>
sinstead of owning
Datagram
s.Follow-up to #2184.
Fixes #2190.
Hopefully speeds up #2199.