Commit b06ab1f3 by Abseil Team Committed by Copybara-Service

absl: fix a priority bug in CondVar wait morphing

Enqueue updates priority of the queued thread.
It was assumed that the queued thread is the current thread.
But it's not the case in CondVar wait morhping,
where we requeue an existing CondVar waiter on the Mutex.
As the result one thread can falsely get priority of another thread.

Fix this by not updating priority in this case.
And make the assumption explicit and checked.

PiperOrigin-RevId: 561249402
Change-Id: I9476c047757090b893a88a2839b795b85fe220ad
parent f6fc4efa
...@@ -682,6 +682,7 @@ static const intptr_t kMuOne = 0x0100; // a count of one reader ...@@ -682,6 +682,7 @@ static const intptr_t kMuOne = 0x0100; // a count of one reader
// flags passed to Enqueue and LockSlow{,WithTimeout,Loop} // flags passed to Enqueue and LockSlow{,WithTimeout,Loop}
static const int kMuHasBlocked = 0x01; // already blocked (MUST == 1) static const int kMuHasBlocked = 0x01; // already blocked (MUST == 1)
static const int kMuIsCond = 0x02; // conditional waiter (CV or Condition) static const int kMuIsCond = 0x02; // conditional waiter (CV or Condition)
static const int kMuIsFer = 0x04; // wait morphing from a CondVar
static_assert(PerThreadSynch::kAlignment > kMuLow, static_assert(PerThreadSynch::kAlignment > kMuLow,
"PerThreadSynch::kAlignment must be greater than kMuLow"); "PerThreadSynch::kAlignment must be greater than kMuLow");
...@@ -920,20 +921,23 @@ static PerThreadSynch* Enqueue(PerThreadSynch* head, SynchWaitParams* waitp, ...@@ -920,20 +921,23 @@ static PerThreadSynch* Enqueue(PerThreadSynch* head, SynchWaitParams* waitp,
s->wake = false; // not being woken s->wake = false; // not being woken
s->cond_waiter = ((flags & kMuIsCond) != 0); s->cond_waiter = ((flags & kMuIsCond) != 0);
#ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM #ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM
int64_t now_cycles = CycleClock::Now(); if ((flags & kMuIsFer) == 0) {
if (s->next_priority_read_cycles < now_cycles) { assert(s == Synch_GetPerThread());
// Every so often, update our idea of the thread's priority. int64_t now_cycles = CycleClock::Now();
// pthread_getschedparam() is 5% of the block/wakeup time; if (s->next_priority_read_cycles < now_cycles) {
// CycleClock::Now() is 0.5%. // Every so often, update our idea of the thread's priority.
int policy; // pthread_getschedparam() is 5% of the block/wakeup time;
struct sched_param param; // CycleClock::Now() is 0.5%.
const int err = pthread_getschedparam(pthread_self(), &policy, &param); int policy;
if (err != 0) { struct sched_param param;
ABSL_RAW_LOG(ERROR, "pthread_getschedparam failed: %d", err); const int err = pthread_getschedparam(pthread_self(), &policy, &param);
} else { if (err != 0) {
s->priority = param.sched_priority; ABSL_RAW_LOG(ERROR, "pthread_getschedparam failed: %d", err);
s->next_priority_read_cycles = } else {
now_cycles + static_cast<int64_t>(CycleClock::Frequency()); s->priority = param.sched_priority;
s->next_priority_read_cycles =
now_cycles + static_cast<int64_t>(CycleClock::Frequency());
}
} }
} }
#endif #endif
...@@ -2436,7 +2440,8 @@ void Mutex::Fer(PerThreadSynch* w) { ...@@ -2436,7 +2440,8 @@ void Mutex::Fer(PerThreadSynch* w) {
} else { } else {
if ((v & (kMuSpin | kMuWait)) == 0) { // no waiters if ((v & (kMuSpin | kMuWait)) == 0) { // no waiters
// This thread tries to become the one and only waiter. // This thread tries to become the one and only waiter.
PerThreadSynch* new_h = Enqueue(nullptr, w->waitp, v, kMuIsCond); PerThreadSynch* new_h =
Enqueue(nullptr, w->waitp, v, kMuIsCond | kMuIsFer);
ABSL_RAW_CHECK(new_h != nullptr, ABSL_RAW_CHECK(new_h != nullptr,
"Enqueue failed"); // we must queue ourselves "Enqueue failed"); // we must queue ourselves
if (mu_.compare_exchange_strong( if (mu_.compare_exchange_strong(
...@@ -2447,7 +2452,7 @@ void Mutex::Fer(PerThreadSynch* w) { ...@@ -2447,7 +2452,7 @@ void Mutex::Fer(PerThreadSynch* w) {
} else if ((v & kMuSpin) == 0 && } else if ((v & kMuSpin) == 0 &&
mu_.compare_exchange_strong(v, v | kMuSpin | kMuWait)) { mu_.compare_exchange_strong(v, v | kMuSpin | kMuWait)) {
PerThreadSynch* h = GetPerThreadSynch(v); PerThreadSynch* h = GetPerThreadSynch(v);
PerThreadSynch* new_h = Enqueue(h, w->waitp, v, kMuIsCond); PerThreadSynch* new_h = Enqueue(h, w->waitp, v, kMuIsCond | kMuIsFer);
ABSL_RAW_CHECK(new_h != nullptr, ABSL_RAW_CHECK(new_h != nullptr,
"Enqueue failed"); // we must queue ourselves "Enqueue failed"); // we must queue ourselves
do { do {
......
...@@ -36,10 +36,16 @@ ...@@ -36,10 +36,16 @@
#include "absl/log/check.h" #include "absl/log/check.h"
#include "absl/log/log.h" #include "absl/log/log.h"
#include "absl/memory/memory.h" #include "absl/memory/memory.h"
#include "absl/synchronization/internal/create_thread_identity.h"
#include "absl/synchronization/internal/thread_pool.h" #include "absl/synchronization/internal/thread_pool.h"
#include "absl/time/clock.h" #include "absl/time/clock.h"
#include "absl/time/time.h" #include "absl/time/time.h"
#ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM
#include <pthread.h>
#include <string.h>
#endif
namespace { namespace {
// TODO(dmauro): Replace with a commandline flag. // TODO(dmauro): Replace with a commandline flag.
...@@ -1868,6 +1874,60 @@ TEST(Mutex, WriterPriority) { ...@@ -1868,6 +1874,60 @@ TEST(Mutex, WriterPriority) {
EXPECT_TRUE(saw_wrote.load()); EXPECT_TRUE(saw_wrote.load());
} }
#ifdef ABSL_HAVE_PTHREAD_GETSCHEDPARAM
TEST(Mutex, CondVarPriority) {
// A regression test for a bug in condition variable wait morphing,
// which resulted in the waiting thread getting priority of the waking thread.
int err = 0;
sched_param param;
param.sched_priority = 7;
std::thread test([&]() {
err = pthread_setschedparam(pthread_self(), SCHED_FIFO, &param);
});
test.join();
if (err) {
// Setting priority usually requires special privileges.
GTEST_SKIP() << "failed to set priority: " << strerror(err);
}
absl::Mutex mu;
absl::CondVar cv;
bool locked = false;
bool notified = false;
bool waiting = false;
bool morph = false;
std::thread th([&]() {
EXPECT_EQ(0, pthread_setschedparam(pthread_self(), SCHED_FIFO, &param));
mu.Lock();
locked = true;
mu.Await(absl::Condition(&notified));
mu.Unlock();
EXPECT_EQ(absl::synchronization_internal::GetOrCreateCurrentThreadIdentity()
->per_thread_synch.priority,
param.sched_priority);
mu.Lock();
mu.Await(absl::Condition(&waiting));
morph = true;
absl::SleepFor(absl::Seconds(1));
cv.Signal();
mu.Unlock();
});
mu.Lock();
mu.Await(absl::Condition(&locked));
notified = true;
mu.Unlock();
mu.Lock();
waiting = true;
while (!morph) {
cv.Wait(&mu);
}
mu.Unlock();
th.join();
EXPECT_NE(absl::synchronization_internal::GetOrCreateCurrentThreadIdentity()
->per_thread_synch.priority,
param.sched_priority);
}
#endif
TEST(Mutex, LockWhenWithTimeoutResult) { TEST(Mutex, LockWhenWithTimeoutResult) {
// Check various corner cases for Await/LockWhen return value // Check various corner cases for Await/LockWhen return value
// with always true/always false conditions. // with always true/always false conditions.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment