Skip to content

Commit

Permalink
Prevent canceling uncancelable generic callbacks (#303)
Browse files Browse the repository at this point in the history
Generic callbacks should not be canceled once they start running. The guarantee provided by `ucxx::Worker` is that once the function returns it should be safe to destroy the callback and all its associated resources, which becomes invalid if the callback is scheduled for cancellation but it is already running, therefore, it's a requirement to check whether the callback is already executing and block it until it's finished. If the callback never completes this may cause an irrecoverable hang which cannot be dealt with from UCXX since it's impossible to stop a callback from executing once it has started, it's the user's responsibility to guarantee the callback must return. A warning is raised after multiples of 10 attempts have been tried to cancel a callback that is being executed and canceling did not succeed, so that the user is informed of what is happening.

The most notable issue is somewhat frequently observable in CI, where the Python async test `test_from_worker_address_multinode` would segfault, in particular with larger amount of endpoints. This was observable in those tests more frequently because there's a large amount of endpoints being created simultaneously by multiple processes, putting more pressure in the resources and causing endpoint creation to take several seconds to complete. In those cases the generic callback executing `ucp_ep_create` would take longer than the default timeout of 3 seconds and in some cases that would be interpreted as the callback timed out, since `ucp_ep_create` itself took longer than 3 seconds, causing the worker to attempt to cancel the callback while it was still executing. With this change, the callback will still timeout but only if it didn't start executing yet, if `ucp_ep_create` ends up never returning, this will cause a deadlock in the application but there's no way for UCXX to recover on its own and warnings are raised, although those hypothetical deadlocks have not been observed in local tests so far. Segfaults should not occur in this situation anymore.

Additionally, unit tests for generic callbacks are now included, which previously were a gap in the testing suite.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #303
  • Loading branch information
pentschev authored Oct 23, 2024
1 parent 6697d0d commit b94958d
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 31 deletions.
21 changes: 15 additions & 6 deletions cpp/include/ucxx/delayed_submission.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class BaseDelayedSubmissionCollection {
std::string _name{"undefined"}; ///< The human-readable name of the collection, used for logging
bool _enabled{true}; ///< Whether the resource required to process the collection is enabled.
ItemIdType _itemId{0}; ///< The item ID counter, used to allow cancelation.
std::optional<ItemIdType> _processing{
std::nullopt}; ///< The ID of the item being processed, if any.
std::deque<std::pair<ItemIdType, T>> _collection{}; ///< The collection.
std::set<ItemIdType> _canceled{}; ///< IDs of canceled items.
std::mutex _mutex{}; ///< Mutex to provide access to `_collection`.
Expand Down Expand Up @@ -150,10 +152,17 @@ class BaseDelayedSubmissionCollection {
item = std::move(_collection.front());
_collection.pop_front();
if (_canceled.erase(item.first)) continue;
_processing = std::optional<ItemIdType>{item.first};
}

processItem(item.first, item.second);
}

{
// Clear the value of `_processing` as no more requests will be processed.
std::lock_guard<std::mutex> lock(_mutex);
_processing = std::nullopt;
}
}

/**
Expand All @@ -162,17 +171,17 @@ class BaseDelayedSubmissionCollection {
* Cancel a pending callback and thus do not execute it, unless the execution has
* already begun, in which case cancelation cannot be done.
*
* @throws std::runtime_error if the item is being processed and canceling is not
* possible anymore.
*
* @param[in] id the ID of the scheduled item, as returned by `schedule()`.
*/
void cancel(ItemIdType id)
{
std::lock_guard<std::mutex> lock(_mutex);
// TODO: Check if not cancellable anymore? Will likely need a separate set to keep
// track of registered items.
//
// If the callback is already running
// and the user has no way of knowing that but still destroys it, undefined
// behavior may occur.
if (_processing.has_value() && _processing.value() == id)
throw std::runtime_error("Cannot cancel, item is being processed.");

_canceled.insert(id);
ucxx_trace_req("Canceled item: %lu", id);
}
Expand Down
16 changes: 12 additions & 4 deletions cpp/include/ucxx/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,12 @@ class Worker : public Component {
* If `period` is `0` this is a blocking call that only returns when the callback has been
* executed and will always return `true`, and if `period` is a positive integer the time
* in nanoseconds will be waited for the callback to complete and return `true` in the
* successful case or `false` otherwise. `period` only applies if the worker progress
* thread is running, otherwise the callback is immediately executed.
* successful case or `false` otherwise. However, if the callback is not cancelable
* anymore (i.e., it has already started), this method will keep retrying and may never
* return if the callback never completes, it is unsafe to return as this would allow the
* caller to destroy the callback and its resources causing undefined behavior. `period`
* only applies if the worker progress thread is running, otherwise the callback is
* immediately executed.
*
* @param[in] callback the callback to execute before progressing the worker.
* @param[in] period the time in nanoseconds to wait for the callback to complete.
Expand All @@ -462,8 +466,12 @@ class Worker : public Component {
* If `period` is `0` this is a blocking call that only returns when the callback has been
* executed and will always return `true`, and if `period` is a positive integer the time
* in nanoseconds will be waited for the callback to complete and return `true` in the
* successful case or `false` otherwise. `period` only applies if the worker progress
* thread is running, otherwise the callback is immediately executed.
* successful case or `false` otherwise. However, if the callback is not cancelable
* anymore (i.e., it has already started), this method will keep retrying and may never
* return if the callback never completes, it is unsafe to return as this would allow the
* caller to destroy the callback and its resources causing undefined behavior. `period`
* only applies if the worker progress thread is running, otherwise the callback is
* immediately executed.
*
* @param[in] callback the callback to execute before progressing the worker.
* @param[in] period the time in nanoseconds to wait for the callback to complete.
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/delayed_submission.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ ItemIdType DelayedSubmissionCollection::registerGenericPost(DelayedSubmissionCal

void DelayedSubmissionCollection::cancelGenericPre(ItemIdType id) { _genericPre.cancel(id); }

void DelayedSubmissionCollection::cancelGenericPost(ItemIdType id) { _genericPre.cancel(id); }
void DelayedSubmissionCollection::cancelGenericPost(ItemIdType id) { _genericPost.cancel(id); }

} // namespace ucxx
6 changes: 4 additions & 2 deletions cpp/src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,12 @@ void Endpoint::create(ucp_ep_params_t* params)
3000000000 /* 3s */))
break;

if (i == maxAttempts - 1)
if (i == maxAttempts - 1) {
status = UCS_ERR_TIMED_OUT;
ucxx_error("Timeout waiting for ucp_ep_create, all attempts failed");
else
} else {
ucxx_warn("Timeout waiting for ucp_ep_create, retrying");
}
}
utils::ucsErrorThrow(status);
} else {
Expand Down
40 changes: 30 additions & 10 deletions cpp/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,21 @@ bool Worker::registerGenericPre(DelayedSubmissionCallbackType callback, uint64_t
}
signalWorkerFunction();

auto ret = callbackNotifier.wait(period, signalWorkerFunction);

if (!ret) _delayedSubmissionCollection->cancelGenericPre(id);

return ret;
size_t retryCount = 0;
while (true) {
auto ret = callbackNotifier.wait(period, signalWorkerFunction);

try {
if (!ret) _delayedSubmissionCollection->cancelGenericPre(id);
return ret;
} catch (const std::runtime_error& e) {
if (++retryCount % 10 == 0)
ucxx_warn(
"Could not cancel after %lu attempts, the callback has not returned and the process "
"may stop responding.",
retryCount);
}
}
}
}

Expand Down Expand Up @@ -384,11 +394,21 @@ bool Worker::registerGenericPost(DelayedSubmissionCallbackType callback, uint64_
}
signalWorkerFunction();

auto ret = callbackNotifier.wait(period, signalWorkerFunction);

if (!ret) _delayedSubmissionCollection->cancelGenericPost(id);

return ret;
size_t retryCount = 0;
while (true) {
auto ret = callbackNotifier.wait(period, signalWorkerFunction);

try {
if (!ret) _delayedSubmissionCollection->cancelGenericPost(id);
return ret;
} catch (const std::runtime_error& e) {
if (++retryCount % 10 == 0)
ucxx_warn(
"Could not cancel after %lu attempts, the callback has not returned and the process "
"may stop responding.",
retryCount);
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/worker_progress_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void WorkerProgressThread::stop()
});
_signalWorkerFunction();
if (!callbackNotifierPost.wait(3000000000)) {
_delayedSubmissionCollection->cancelGenericPre(idPost);
_delayedSubmissionCollection->cancelGenericPost(idPost);
}

_thread.join();
Expand Down
Loading

0 comments on commit b94958d

Please sign in to comment.