Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NO-OOM] Exploration of cudf-spilling + UVM #1504

Draft
wants to merge 3 commits into
base: branch-24.06
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions include/rmm/mr/device/detail/stream_ordered_memory_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

namespace rmm::mr::detail {

using expand_callback_t = std::function<bool(std::size_t, void*)>;

/**
* @brief A CRTP helper function
*
Expand Down Expand Up @@ -81,6 +83,12 @@ class stream_ordered_memory_resource : public crtp<PoolResource>, public device_
stream_ordered_memory_resource& operator=(stream_ordered_memory_resource const&) = delete;
stream_ordered_memory_resource& operator=(stream_ordered_memory_resource&&) = delete;

void set_expand_callback(expand_callback_t func, void* args)
{
expand_callback_ = func;
expand_callback_args_ = args;
}

protected:
using free_list = FreeListType;
using block_type = typename free_list::block_type;
Expand Down Expand Up @@ -203,7 +211,7 @@ class stream_ordered_memory_resource : public crtp<PoolResource>, public device_

if (size <= 0) { return nullptr; }

lock_guard lock(mtx_);
// lock_guard lock(mtx_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: why comment out the lock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is because of a deadlock that would otherwise trigger when the allocation results in cudf-spilling. In this case, cudf-spilling will find another buffer to spill and de-allocate its memory, which also
requires the lock.

I haven't given this too much thought, but I think this could be handled with a reentrant lock.


auto stream_event = get_event(stream);

Expand Down Expand Up @@ -236,7 +244,7 @@ class stream_ordered_memory_resource : public crtp<PoolResource>, public device_

if (size <= 0 || ptr == nullptr) { return; }

lock_guard lock(mtx_);
// lock_guard lock(mtx_);
auto stream_event = get_event(stream);

size = rmm::align_up(size, rmm::CUDA_ALLOCATION_ALIGNMENT);
Expand Down Expand Up @@ -318,14 +326,7 @@ class stream_ordered_memory_resource : public crtp<PoolResource>, public device_
return allocated;
}

/**
* @brief Get an available memory block of at least `size` bytes
*
* @param size The number of bytes to allocate
* @param stream_event The stream and associated event on which the allocation will be used.
* @return block_type A block of memory of at least `size` bytes
*/
block_type get_block(std::size_t size, stream_event_pair stream_event)
block_type get_block_no_expand(std::size_t size, stream_event_pair stream_event)
{
// Try to find a satisfactory block in free list for the same stream (no sync required)
auto iter = stream_free_blocks_.find(stream_event);
Expand All @@ -344,13 +345,39 @@ class stream_ordered_memory_resource : public crtp<PoolResource>, public device_
}

// no large enough blocks available on other streams, so sync and merge until we find one
return get_block_from_other_stream(size, stream_event, blocks, true);
}

/**
* @brief Get an available memory block of at least `size` bytes
*
* @param size The number of bytes to allocate
* @param stream_event The stream and associated event on which the allocation will be used.
* @return block_type A block of memory of at least `size` bytes
*/
block_type get_block(std::size_t size, stream_event_pair stream_event)
{
{
block_type const block = get_block_from_other_stream(size, stream_event, blocks, true);
block_type const block = get_block_no_expand(size, stream_event);
if (block.is_valid()) { return block; }
}

log_summary_trace();
// std::cout << "get_block(no free blocks) - size: " << size << std::endl;
if (expand_callback_.has_value()) {
while (expand_callback_.value()(size, expand_callback_args_)) {
// Let's try one more time
// std::cout << "get_block_no_expand() - size: " << size << std::endl;
block_type const block = get_block_no_expand(size, stream_event);
if (block.is_valid()) { return block; }
}
}

auto iter = stream_free_blocks_.find(stream_event);
free_list& blocks =
(iter != stream_free_blocks_.end()) ? iter->second : stream_free_blocks_[stream_event];

log_summary_trace();
// std::cout << "get_block(spilling didn't help) - size: " << size << std::endl;
// no large enough blocks available after merging, so grow the pool
block_type const block =
this->underlying().expand_pool(size, blocks, cuda_stream_view{stream_event.stream});
Expand Down Expand Up @@ -488,6 +515,10 @@ class stream_ordered_memory_resource : public crtp<PoolResource>, public device_
std::mutex mtx_; // mutex for thread-safe access

rmm::cuda_device_id device_id_{rmm::get_current_cuda_device()};

std::optional<expand_callback_t> expand_callback_;
void* expand_callback_args_ = nullptr;

}; // namespace detail

} // namespace rmm::mr::detail
8 changes: 8 additions & 0 deletions include/rmm/mr/device/managed_memory_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class managed_memory_resource final : public device_memory_resource {

void* ptr{nullptr};
RMM_CUDA_TRY_ALLOC(cudaMallocManaged(&ptr, bytes));

static bool has_not_prefetched_once_before = true;
if (has_not_prefetched_once_before) {
has_not_prefetched_once_before = false;
std::cout << "do_allocate(managed) - prefetched to device bytes: " << bytes << std::endl;
RMM_CUDA_TRY_ALLOC(
cudaMemPrefetchAsync(ptr, bytes, get_current_cuda_device().value(), stream));
}
return ptr;
}

Expand Down
1 change: 1 addition & 0 deletions python/rmm/_lib/memory_resource.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ cdef class CudaAsyncMemoryResource(DeviceMemoryResource):
pass

cdef class PoolMemoryResource(UpstreamResourceAdaptor):
cdef object _callback
pass

cdef class FixedSizeMemoryResource(UpstreamResourceAdaptor):
Expand Down
43 changes: 29 additions & 14 deletions python/rmm/_lib/memory_resource.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ cdef extern from "rmm/mr/device/cuda_async_memory_resource.hpp" \
cdef extern from "rmm/cuda_device.hpp" namespace "rmm" nogil:
size_t percent_of_free_device_memory(int percent) except +

cdef extern from "rmm/mr/device/pool_memory_resource.hpp" \
namespace "rmm::mr::detail" nogil:
ctypedef bool (*expand_callback_t)(size_t, void*)

cdef extern from "rmm/mr/device/pool_memory_resource.hpp" \
namespace "rmm::mr" nogil:
cdef cppclass pool_memory_resource[Upstream](device_memory_resource):
Expand All @@ -120,6 +124,7 @@ cdef extern from "rmm/mr/device/pool_memory_resource.hpp" \
size_t initial_pool_size,
optional[size_t] maximum_pool_size) except +
size_t pool_size()
void set_expand_callback(expand_callback_t func, void* args) except +

cdef extern from "rmm/mr/device/fixed_size_memory_resource.hpp" \
namespace "rmm::mr" nogil:
Expand Down Expand Up @@ -216,6 +221,19 @@ cdef extern from "rmm/mr/device/failure_callback_resource_adaptor.hpp" \
) except +


# Note that this function is specifically designed to rethrow Python exceptions
# as C++ exceptions when called as a callback from C++, so it is noexcept from
# Cython's perspective.
cdef bool _retry_callback_function(size_t nbytes, void *callback_arg) noexcept nogil:
cdef CppExcept err
with gil:
try:
return (<object>callback_arg)(nbytes)
except BaseException as e:
err = translate_python_except_to_cpp(e)
throw_cpp_except(err)


cdef class DeviceMemoryResource:

cdef device_memory_resource* get_mr(self):
Expand Down Expand Up @@ -414,6 +432,16 @@ cdef class PoolMemoryResource(UpstreamResourceAdaptor):
)
return c_mr.pool_size()

def set_expand_callback(self, object callback):
cdef pool_memory_resource[device_memory_resource]* c_mr = (
<pool_memory_resource[device_memory_resource]*>(self.get_mr())
)
self._callback = callback
c_mr.set_expand_callback(
<expand_callback_t>_retry_callback_function,
<void*>callback
)

cdef class FixedSizeMemoryResource(UpstreamResourceAdaptor):
def __cinit__(
self,
Expand Down Expand Up @@ -892,19 +920,6 @@ cdef class TrackingResourceAdaptor(UpstreamResourceAdaptor):
self.c_obj.get()))[0].log_outstanding_allocations()


# Note that this function is specifically designed to rethrow Python exceptions
# as C++ exceptions when called as a callback from C++, so it is noexcept from
# Cython's perspective.
cdef bool _oom_callback_function(size_t bytes, void *callback_arg) noexcept nogil:
cdef CppExcept err
with gil:
try:
return (<object>callback_arg)(bytes)
except BaseException as e:
err = translate_python_except_to_cpp(e)
throw_cpp_except(err)


cdef class FailureCallbackResourceAdaptor(UpstreamResourceAdaptor):

def __cinit__(
Expand All @@ -916,7 +931,7 @@ cdef class FailureCallbackResourceAdaptor(UpstreamResourceAdaptor):
self.c_obj.reset(
new failure_callback_resource_adaptor[device_memory_resource](
upstream_mr.get_mr(),
<failure_callback_t>_oom_callback_function,
<failure_callback_t>_retry_callback_function,
<void*>callback
)
)
Expand Down
Loading