Commit ed842801 by Abseil Team Committed by Copybara-Service

Rollback Mutex relative timeout support because of internal incompatibility

PiperOrigin-RevId: 515427893
Change-Id: I89e8756fcf400459b0226d14785c6511ad3e380b
parent 761f04dc
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
#include "absl/base/config.h" #include "absl/base/config.h"
#ifndef _WIN32 #ifdef _WIN32
#include <windows.h>
#else
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
#endif #endif
...@@ -83,60 +85,34 @@ namespace synchronization_internal { ...@@ -83,60 +85,34 @@ namespace synchronization_internal {
class FutexImpl { class FutexImpl {
public: public:
// Atomically check that `*v == val`, and if it is, then sleep until the static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
// timeout `t` has been reached, or until woken by `Wake()`.
static int WaitUntil(std::atomic<int32_t>* v, int32_t val,
KernelTimeout t) { KernelTimeout t) {
if (!t.has_timeout()) { long err = 0; // NOLINT(runtime/int)
return Wait(v, val); if (t.has_timeout()) {
} else if (t.is_absolute_timeout()) {
auto abs_timespec = t.MakeAbsTimespec();
return WaitAbsoluteTimeout(v, val, &abs_timespec);
} else {
auto rel_timespec = t.MakeRelativeTimespec();
return WaitRelativeTimeout(v, val, &rel_timespec);
}
}
// Atomically check that `*v == val`, and if it is, then sleep until the until
// woken by `Wake()`.
static int Wait(std::atomic<int32_t>* v, int32_t val) {
return WaitAbsoluteTimeout(v, val, nullptr);
}
// Atomically check that `*v == val`, and if it is, then sleep until
// CLOCK_REALTIME reaches `*abs_timeout`, or until woken by `Wake()`.
static int WaitAbsoluteTimeout(std::atomic<int32_t>* v, int32_t val,
const struct timespec* abs_timeout) {
// https://locklessinc.com/articles/futex_cheat_sheet/ // https://locklessinc.com/articles/futex_cheat_sheet/
// Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time. // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
auto err = struct timespec abs_timeout = t.MakeAbsTimespec();
syscall(SYS_futex, reinterpret_cast<int32_t*>(v),
FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME,
val, abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
if (err != 0) {
return -errno;
}
return 0;
}
// Atomically check that `*v == val`, and if it is, then sleep until
// `*rel_timeout` has elapsed, or until woken by `Wake()`.
static int WaitRelativeTimeout(std::atomic<int32_t>* v, int32_t val,
const struct timespec* rel_timeout) {
// Atomically check that the futex value is still 0, and if it // Atomically check that the futex value is still 0, and if it
// is, sleep until abs_timeout or until woken by FUTEX_WAKE. // is, sleep until abs_timeout or until woken by FUTEX_WAKE.
auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), err = syscall(
FUTEX_PRIVATE_FLAG, val, rel_timeout); SYS_futex, reinterpret_cast<int32_t *>(v),
if (err != 0) { FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
&abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
} else {
// Atomically check that the futex value is still 0, and if it
// is, sleep until woken by FUTEX_WAKE.
err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
}
if (ABSL_PREDICT_FALSE(err != 0)) {
return -errno; return -errno;
} }
return 0; return 0;
} }
// Wakes at most `count` waiters that have entered the sleep state on `v`. static int Wake(std::atomic<int32_t> *v, int32_t count) {
static int Wake(std::atomic<int32_t>* v, int32_t count) { // NOLINTNEXTLINE(runtime/int)
auto err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v), long err = syscall(SYS_futex, reinterpret_cast<int32_t*>(v),
FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count); FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
if (ABSL_PREDICT_FALSE(err < 0)) { if (ABSL_PREDICT_FALSE(err < 0)) {
return -errno; return -errno;
......
...@@ -67,74 +67,11 @@ static void MaybeBecomeIdle() { ...@@ -67,74 +67,11 @@ static void MaybeBecomeIdle() {
#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
Waiter::Waiter() : futex_(0) {} Waiter::Waiter() {
futex_.store(0, std::memory_order_relaxed);
bool Waiter::WaitAbsoluteTimeout(KernelTimeout t) {
// Loop until we can atomically decrement futex from a positive
// value, waiting on a futex while we believe it is zero.
// Note that, since the thread ticker is just reset, we don't need to check
// whether the thread is idle on the very first pass of the loop.
bool first_pass = true;
while (true) {
int32_t x = futex_.load(std::memory_order_relaxed);
while (x != 0) {
if (!futex_.compare_exchange_weak(x, x - 1,
std::memory_order_acquire,
std::memory_order_relaxed)) {
continue; // Raced with someone, retry.
}
return true; // Consumed a wakeup, we are done.
}
if (!first_pass) MaybeBecomeIdle();
auto abs_timeout = t.MakeAbsTimespec();
const int err = Futex::WaitAbsoluteTimeout(&futex_, 0, &abs_timeout);
if (err != 0) {
if (err == -EINTR || err == -EWOULDBLOCK) {
// Do nothing, the loop will retry.
} else if (err == -ETIMEDOUT) {
return false;
} else {
ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
}
}
first_pass = false;
}
}
#if defined(CLOCK_MONOTONIC)
// Subtracts the timespec `sub` from `in` if the result would not be negative,
// and returns true. Returns false if the result would be negative, and leaves
// `in` unchanged.
static bool TimespecSubtract(struct timespec& in, const struct timespec& sub) {
if (in.tv_sec < sub.tv_sec) {
return false;
}
if (in.tv_nsec < sub.tv_nsec) {
if (in.tv_sec == sub.tv_sec) {
return false;
}
// Borrow from tv_sec.
in.tv_sec -= 1;
in.tv_nsec += 1'000'000'000;
}
in.tv_sec -= sub.tv_sec;
in.tv_nsec -= sub.tv_nsec;
return true;
} }
// On some platforms a background thread periodically calls `Poke()` to briefly bool Waiter::Wait(KernelTimeout t) {
// wake waiter threads so that they may call `MaybeBecomeIdle()`. This means
// that `WaitRelativeTimeout()` differs slightly from `WaitAbsoluteTimeout()`
// because it must adjust the timeout by the amount of time that it has already
// slept.
bool Waiter::WaitRelativeTimeout(KernelTimeout t) {
struct timespec start;
ABSL_RAW_CHECK(clock_gettime(CLOCK_MONOTONIC, &start) == 0,
"clock_gettime() failed");
// Loop until we can atomically decrement futex from a positive // Loop until we can atomically decrement futex from a positive
// value, waiting on a futex while we believe it is zero. // value, waiting on a futex while we believe it is zero.
// Note that, since the thread ticker is just reset, we don't need to check // Note that, since the thread ticker is just reset, we don't need to check
...@@ -152,24 +89,8 @@ bool Waiter::WaitRelativeTimeout(KernelTimeout t) { ...@@ -152,24 +89,8 @@ bool Waiter::WaitRelativeTimeout(KernelTimeout t) {
return true; // Consumed a wakeup, we are done. return true; // Consumed a wakeup, we are done.
} }
auto relative_timeout = t.MakeRelativeTimespec(); if (!first_pass) MaybeBecomeIdle();
if (!first_pass) { const int err = Futex::WaitUntil(&futex_, 0, t);
MaybeBecomeIdle();
// Adjust relative_timeout for `Poke()`s.
struct timespec now;
ABSL_RAW_CHECK(clock_gettime(CLOCK_MONOTONIC, &now) == 0,
"clock_gettime() failed");
// If TimespecSubstract(now, start) returns false, then the clock isn't
// truly monotonic.
if (TimespecSubtract(now, start)) {
if (!TimespecSubtract(relative_timeout, now)) {
return false; // Timeout.
}
}
}
const int err = Futex::WaitRelativeTimeout(&futex_, 0, &relative_timeout);
if (err != 0) { if (err != 0) {
if (err == -EINTR || err == -EWOULDBLOCK) { if (err == -EINTR || err == -EWOULDBLOCK) {
// Do nothing, the loop will retry. // Do nothing, the loop will retry.
...@@ -183,23 +104,6 @@ bool Waiter::WaitRelativeTimeout(KernelTimeout t) { ...@@ -183,23 +104,6 @@ bool Waiter::WaitRelativeTimeout(KernelTimeout t) {
} }
} }
#else // CLOCK_MONOTONIC
// No support for CLOCK_MONOTONIC.
// KernelTimeout will automatically convert to an absolute timeout.
bool Waiter::WaitRelativeTimeout(KernelTimeout t) {
return WaitAbsoluteTimeout(t);
}
#endif // CLOCK_MONOTONIC
bool Waiter::Wait(KernelTimeout t) {
if (t.is_absolute_timeout()) {
return WaitAbsoluteTimeout(t);
}
return WaitRelativeTimeout(t);
}
void Waiter::Post() { void Waiter::Post() {
if (futex_.fetch_add(1, std::memory_order_release) == 0) { if (futex_.fetch_add(1, std::memory_order_release) == 0) {
// We incremented from 0, need to wake a potential waiter. // We incremented from 0, need to wake a potential waiter.
......
...@@ -110,9 +110,6 @@ class Waiter { ...@@ -110,9 +110,6 @@ class Waiter {
~Waiter() = delete; ~Waiter() = delete;
#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX #if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
bool WaitAbsoluteTimeout(KernelTimeout t);
bool WaitRelativeTimeout(KernelTimeout t);
// Futexes are defined by specification to be 32-bits. // Futexes are defined by specification to be 32-bits.
// Thus std::atomic<int32_t> must be just an int32_t with lockfree methods. // Thus std::atomic<int32_t> must be just an int32_t with lockfree methods.
std::atomic<int32_t> futex_; std::atomic<int32_t> futex_;
......
...@@ -635,6 +635,21 @@ void Mutex::InternalAttemptToUseMutexInFatalSignalHandler() { ...@@ -635,6 +635,21 @@ void Mutex::InternalAttemptToUseMutexInFatalSignalHandler() {
std::memory_order_release); std::memory_order_release);
} }
// --------------------------time support
// Return the current time plus the timeout. Use the same clock as
// PerThreadSem::Wait() for consistency. Unfortunately, we don't have
// such a choice when a deadline is given directly.
static absl::Time DeadlineFromTimeout(absl::Duration timeout) {
#ifndef _WIN32
struct timeval tv;
gettimeofday(&tv, nullptr);
return absl::TimeFromTimeval(tv) + timeout;
#else
return absl::Now() + timeout;
#endif
}
// --------------------------Mutexes // --------------------------Mutexes
// In the layout below, the msb of the bottom byte is currently unused. Also, // In the layout below, the msb of the bottom byte is currently unused. Also,
...@@ -1534,13 +1549,7 @@ void Mutex::LockWhen(const Condition &cond) { ...@@ -1534,13 +1549,7 @@ void Mutex::LockWhen(const Condition &cond) {
} }
bool Mutex::LockWhenWithTimeout(const Condition &cond, absl::Duration timeout) { bool Mutex::LockWhenWithTimeout(const Condition &cond, absl::Duration timeout) {
ABSL_TSAN_MUTEX_PRE_LOCK(this, 0); return LockWhenWithDeadline(cond, DeadlineFromTimeout(timeout));
GraphId id = DebugOnlyDeadlockCheck(this);
bool res = LockSlowWithDeadline(kExclusive, &cond,
KernelTimeout(timeout), 0);
DebugOnlyLockEnter(this, id);
ABSL_TSAN_MUTEX_POST_LOCK(this, 0, 0);
return res;
} }
bool Mutex::LockWhenWithDeadline(const Condition &cond, absl::Time deadline) { bool Mutex::LockWhenWithDeadline(const Condition &cond, absl::Time deadline) {
...@@ -1563,12 +1572,7 @@ void Mutex::ReaderLockWhen(const Condition &cond) { ...@@ -1563,12 +1572,7 @@ void Mutex::ReaderLockWhen(const Condition &cond) {
bool Mutex::ReaderLockWhenWithTimeout(const Condition &cond, bool Mutex::ReaderLockWhenWithTimeout(const Condition &cond,
absl::Duration timeout) { absl::Duration timeout) {
ABSL_TSAN_MUTEX_PRE_LOCK(this, __tsan_mutex_read_lock); return ReaderLockWhenWithDeadline(cond, DeadlineFromTimeout(timeout));
GraphId id = DebugOnlyDeadlockCheck(this);
bool res = LockSlowWithDeadline(kShared, &cond, KernelTimeout(timeout), 0);
DebugOnlyLockEnter(this, id);
ABSL_TSAN_MUTEX_POST_LOCK(this, __tsan_mutex_read_lock, 0);
return res;
} }
bool Mutex::ReaderLockWhenWithDeadline(const Condition &cond, bool Mutex::ReaderLockWhenWithDeadline(const Condition &cond,
...@@ -1593,18 +1597,7 @@ void Mutex::Await(const Condition &cond) { ...@@ -1593,18 +1597,7 @@ void Mutex::Await(const Condition &cond) {
} }
bool Mutex::AwaitWithTimeout(const Condition &cond, absl::Duration timeout) { bool Mutex::AwaitWithTimeout(const Condition &cond, absl::Duration timeout) {
if (cond.Eval()) { // condition already true; nothing to do return AwaitWithDeadline(cond, DeadlineFromTimeout(timeout));
if (kDebugMode) {
this->AssertReaderHeld();
}
return true;
}
KernelTimeout t{timeout};
bool res = this->AwaitCommon(cond, t);
ABSL_RAW_CHECK(res || t.has_timeout(),
"condition untrue on return from Await");
return res;
} }
bool Mutex::AwaitWithDeadline(const Condition &cond, absl::Time deadline) { bool Mutex::AwaitWithDeadline(const Condition &cond, absl::Time deadline) {
...@@ -2670,7 +2663,7 @@ bool CondVar::WaitCommon(Mutex *mutex, KernelTimeout t) { ...@@ -2670,7 +2663,7 @@ bool CondVar::WaitCommon(Mutex *mutex, KernelTimeout t) {
} }
bool CondVar::WaitWithTimeout(Mutex *mu, absl::Duration timeout) { bool CondVar::WaitWithTimeout(Mutex *mu, absl::Duration timeout) {
return WaitCommon(mu, KernelTimeout(timeout)); return WaitWithDeadline(mu, DeadlineFromTimeout(timeout));
} }
bool CondVar::WaitWithDeadline(Mutex *mu, absl::Time deadline) { bool CondVar::WaitWithDeadline(Mutex *mu, absl::Time deadline) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment