Skip to content

Commit

Permalink
MB-54279: Pause / Resume: Unlock vb_mutexes from locking thread
Browse files Browse the repository at this point in the history
As part of EPBucket::prepareForPause(), all of the vb_mutexes are
lock()ed - and left locked:

a) To ensure that any in-flight VBucket writes have completed and
b) To inhibit and new writes from occurring.

Then, when the EPBucket is resumed all the vb_mutexes are unlock()ed
which allows VBucket writes to resume.

However, EPBucket::prepareForResume() is not called on the same thread
which called prepareForPause() - prepareForPause() runs on a
background NonIO thread whereas resume runs synchronously in the
front-end thread. As such, we are incorrectly unlocking a mutex from a
different thread than the one which locked it - which is Undefined
Behaviour - from cppreference.com[1]:

    void unlock();

    Unlocks the mutex.

    The mutex must be locked by the current thread of execution,
    otherwise, the behavior is undefined.

This is helpfully reported by ThreadSanitizer:

    WARNING: ThreadSanitizer: unlock of an unlocked mutex (or by a wrong thread) (pid=58528)
        #0 pthread_mutex_unlock <null> (libtsan.so.0+0x3bf9a)
        #1 __gthread_mutex_unlock(pthread_mutex_t*) c++/10.2.0/x86_64-pc-linux-gnu/bits/gthr-default.h:779 (memcached+0x5b594f)
        #2 std::mutex::unlock() c++/10.2.0/bits/std_mutex.h:118 (memcached+0x602555)
        #3 EPBucket::prepareForResume() kv_engine/engines/ep/src/ep_bucket.cc:2575 (memcached+0x84b94f)
        #4 EventuallyPersistentEngine::resume() kv_engine/engines/ep/src/ep_engine.cc:7002 (memcached+0x7d52d3)
        ...

Fix by changing how we achieve inhibition of future VBucket writes:

- Introduce a EPBucket::paused flag which is set in prepareForPause
  after all vb_mutexes have been acquired (and hence all in-flight
  VBucket writes have finished), but then unlock all vb_mutexes before
  returning from prepareForPause().

- When attempting to acquire a locked VBucket, check new paused flag
  before attempting to acquire the vb_mutex - if paused is set then
  block / return early (for try() variant).

This keeps the required pause behaviour but avoids keeping vb_mutexes
locked and having to later unlock (on a different thread).

[1]: https://en.cppreference.com/w/cpp/thread/mutex

Change-Id: I062583951a101a866866b79dfd6329672bb4ff42
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/182099
Tested-by: Build Bot <[email protected]>
Reviewed-by: Jim Walker <[email protected]>
  • Loading branch information
daverigby committed Nov 3, 2022
1 parent 09969cd commit d8a842f
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 8 deletions.
34 changes: 26 additions & 8 deletions engines/ep/src/ep_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ void EPBucket::initializeShards() {
}

void EPBucket::deinitialize() {
// If Bucket is currently paused; need to resume to allow flushers
// etc to complete.
if (paused) {
prepareForResume();
}

stopFlusher();

allVbucketsDeinitialize();
Expand Down Expand Up @@ -2498,11 +2504,16 @@ cb::engine_errc EPBucket::prepareForPause() {
// acquire _all_ of them here, which means any of the above in-flight
// operations will have to complete before we continue - and that no
// more can begin.
// Once all preparations have been completed; we set EPBucket::paused
// to true and unlock the mutexes - any future attempts to lock them
// will be blocked until EPBucket::paused is cleared
// (via prepareForResume()).
EP_LOG_DEBUG_RAW(
"EPBucket::prepareForPause: waiting for in-flight Flusher, "
"Rollback, DeleteVB tasks to complete");
std::vector<std::unique_lock<std::mutex>> vb_locks;
for (auto& mutex : vb_mutexes) {
mutex.lock();
vb_locks.emplace_back(mutex);
}

// b) Compaction - This only requires that the appropriate vb_mutexes is
Expand Down Expand Up @@ -2551,7 +2562,15 @@ cb::engine_errc EPBucket::prepareForPause() {
}
allSuccess &= success;
});
return allSuccess ? cb::engine_errc::success : cb::engine_errc::failed;

if (allSuccess) {
// Successfully prepared for pausing; set paused flag to true before
// we unlock all the vb_mutexes; that will inhibit anyone from acquiring
// the mutexes again until paused is set to false.
paused.store(true);
return cb::engine_errc::success;
}
return cb::engine_errc::failed;
}

cb::engine_errc EPBucket::prepareForResume() {
Expand All @@ -2567,17 +2586,16 @@ cb::engine_errc EPBucket::prepareForResume() {
});

// 2. Resume ep-engine operations.
// a) Unblock disk writing operations from ep-engine.
// a) Clear EPBucket::paused so disk writing operations can
// resume.
EP_LOG_DEBUG_RAW(
"EPBucket::prepareForPause: unblocking all Flusher, "
"EPBucket::prepareForResume: unblocking all Flusher, "
"Rollback, DeleteVB tasks.");
for (auto& mutex : vb_mutexes) {
mutex.unlock();
}
paused.store(false);

// b) Reset compaction concurrency
EP_LOG_DEBUG_RAW(
"EPBucket::prepareForPause: resuming all Compaction tasks");
"EPBucket::prepareForResume: resuming all Compaction tasks");
compactionSemaphore->release();
updateCompactionConcurrency();

Expand Down
1 change: 1 addition & 0 deletions engines/ep/src/kv_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ KVBucket::KVBucket(EventuallyPersistentEngine& theEngine)
itemCompressorTask(nullptr),
itemFreqDecayerTask(nullptr),
vb_mutexes(engine.getConfiguration().getMaxVbuckets()),
paused(false),
backfillMemoryThreshold(0.95),
lastTransTimePerItem(0),
collectionsManager(std::make_shared<Collections::Manager>()),
Expand Down
12 changes: 12 additions & 0 deletions engines/ep/src/kv_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ class KVBucket : public KVBucketIface {
* alongside a shared pointer to the requested VBucket.
*/
LockedVBucketPtr getLockedVBucket(Vbid vbid) {
// While the Bucket is paused, no Locked VBuckets can be acquired.
while (paused.load()) {
std::this_thread::yield();
}
std::unique_lock<std::mutex> lock(vb_mutexes[vbid.get()]);
return {vbMap.getBucket(vbid), std::move(lock)};
}
Expand All @@ -270,6 +274,10 @@ class KVBucket : public KVBucketIface {
* successfully acquired a locked VBucket.
*/
LockedVBucketPtr getLockedVBucket(Vbid vbid, std::try_to_lock_t) {
// While the Bucket is paused, no Locked VBuckets can be acquired.
if (paused.load()) {
return {};
}
std::unique_lock<std::mutex> lock(vb_mutexes[vbid.get()],
std::try_to_lock);
if (!lock) {
Expand Down Expand Up @@ -1178,6 +1186,10 @@ class KVBucket : public KVBucketIface {
* Used by flush operations: flushVB, deleteVB, compactVB, snapshotVB */
std::vector<std::mutex> vb_mutexes;

/// Is this Bucket currently paused? If true, inhibits any of the vb_mutexes
/// from being acquired.
std::atomic_bool paused;

std::mutex vbsetMutex;
double backfillMemoryThreshold;

Expand Down
2 changes: 2 additions & 0 deletions engines/ep/src/locked_vbucket_ptr.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*/
class LockedVBucketPtr {
public:
LockedVBucketPtr() = default;

LockedVBucketPtr(VBucketPtr vb, std::unique_lock<std::mutex>&& lock)
: vb(std::move(vb)), lock(std::move(lock)) {
}
Expand Down

0 comments on commit d8a842f

Please sign in to comment.