Skip to content

Commit

Permalink
Fix hashtable locking (#415)
Browse files Browse the repository at this point in the history
This PR fixes a sneaky issue in the hashtable locking, the construct
used to get the worker index only once per thread was actually being
called only the very first time by only one thread causing all the other
threads to have a worker_index of 0.

As consequence the locking was potentially generating overlappy
transaction ids leading to a potential clash.

This behaviour was put in evidence by redis-benchmark which uses only 1
single key for all the requests unless the `--threads` option is passed.

The PR also changes 2 other aspects:
- drops the pthread_once, as it's necessary to check per thread a simple
boolean can do the job and simplify the checks and the logic, and also
providing a minor but nice speed improvement.
- reduce the amount of spinlock traced by defaylt to reduce the amount
of memory allocated, infact the vast majority of cases need just 1
spinlock tracked therefore the size of the list initialized by default
has been set to the size of the smallest object that can be initialized
by FFMA, 16 bytes.
  • Loading branch information
danielealbano authored May 27, 2023
1 parent 3eb32e4 commit 5ef5bf7
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 22 deletions.
16 changes: 11 additions & 5 deletions src/transaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

thread_local uint32_t transaction_manager_worker_index = 0;
thread_local uint16_t transaction_manager_transaction_index = 0;
pthread_once_t transaction_manager_init_once_control = PTHREAD_ONCE_INIT;
thread_local bool_volatile_t transaction_manager_inited = false;

void transaction_set_worker_index(
uint32_t worker_index) {
Expand Down Expand Up @@ -59,20 +59,26 @@ uint16_t transaction_peek_current_thread_index() {

bool transaction_acquire(
transaction_t *transaction) {
pthread_once(&transaction_manager_init_once_control, transaction_manager_init);
if (unlikely(transaction_manager_inited == false)) {
transaction_manager_init();
transaction_manager_inited = true;
}

transaction_manager_transaction_index++;

if (unlikely(transaction_manager_worker_index == 0 &&
transaction_manager_transaction_index == TRANSACTION_ID_NOT_ACQUIRED)) {
// Having both the worker index and the transaction index to 0 matches the transaction id not acquired value
// therefore we need to increment the transaction index by 1 to avoid it
if (unlikely(transaction_manager_worker_index == 0 && transaction_manager_transaction_index == 0)) {
transaction_manager_transaction_index++;
}

transaction->transaction_id.worker_index = transaction_manager_worker_index;
transaction->transaction_id.transaction_index = transaction_manager_transaction_index;

// FFMA_OBJECT_SIZE_MIN should be 16 bytes, therefore the size of the locks list should be 16 / 8 = 2 by default
// and it should be enough for most of the cases
transaction->locks.size = FFMA_OBJECT_SIZE_MIN / sizeof(transaction_spinlock_lock_volatile_t*);
transaction->locks.count = 0;
transaction->locks.size = 8;
transaction->locks.list = ffma_mem_alloc(
sizeof(transaction_spinlock_lock_volatile_t*) * transaction->locks.size);

Expand Down
7 changes: 4 additions & 3 deletions src/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ typedef struct transaction_spinlock_lock transaction_spinlock_lock_t;
typedef _Volatile(transaction_spinlock_lock_t) transaction_spinlock_lock_volatile_t;

typedef struct transaction_id transaction_id_t;
typedef _Volatile(transaction_id_t) transaction_id_volatile_t;
struct transaction_id {
union {
struct {
uint16_t worker_index;
uint16_t transaction_index;
uint16_volatile_t transaction_index;
uint16_volatile_t worker_index;
};
uint32_volatile_t id;
};
};

typedef struct transaction transaction_t;
struct transaction {
transaction_id_t transaction_id;
transaction_id_volatile_t transaction_id;
struct {
uint32_t count;
uint32_t size;
Expand Down
23 changes: 15 additions & 8 deletions tests/unit_tests/test-transaction-spinlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,20 @@ using namespace std;
#include "worker/worker_context.h"
#include "worker/worker.h"

std::atomic<uint32_t> worker_index_counter(1);

// Returns 1 if it can do the initial lock, 2 instead if it's able to reach the point in which has
// to wait for the spinlock to become free
void* test_transaction_spinlock_lock_lock_retry_try_lock_thread_func(void* rawdata) {
worker_context_t worker_context = { 0 };
worker_context.worker_index = worker_index_counter.fetch_add(1);
worker_context_set(&worker_context);
transaction_set_worker_index(worker_context.worker_index);

transaction_t transaction = { .transaction_id = { }, .locks = { .count = 0 , .size = 0, .list = nullptr, } };
transaction_id_t *transaction_id = &transaction.transaction_id;
transaction_id_volatile_t *transaction_id = &transaction.transaction_id;
transaction_id->worker_index = 0; transaction_id->transaction_index = 0;
transaction_spinlock_lock_t* lock = (transaction_spinlock_lock_t*)rawdata;
auto* lock = (transaction_spinlock_lock_t*)rawdata;

transaction_acquire(&transaction);
if (transaction_spinlock_try_lock(lock, &transaction)) {
Expand Down Expand Up @@ -95,7 +102,7 @@ void* test_transaction_spinlock_lock_counter_thread_func(void* rawdata) {

for(uint64_t i = 0; i < data->increments; i++) {
transaction_t transaction = { .transaction_id = { }, .locks = { .count = 0 , .size = 0, .list = nullptr, } };
transaction_id_t *transaction_id = &transaction.transaction_id;
transaction_id_volatile_t *transaction_id = &transaction.transaction_id;
transaction_id->worker_index = 0; transaction_id->transaction_index = 0;

transaction_acquire(&transaction);
Expand Down Expand Up @@ -138,7 +145,7 @@ TEST_CASE("transaction_spinlock.c", "[transaction_spinlock]") {
SECTION("transaction_spinlock_try_lock") {
SECTION("lock") {
transaction_t transaction = { .transaction_id = { }, .locks = { .count = 0 , .size = 0, .list = nullptr, } };
transaction_id_t *transaction_id = &transaction.transaction_id;
transaction_id_volatile_t *transaction_id = &transaction.transaction_id;
transaction_id->worker_index = 0; transaction_id->transaction_index = 0;
transaction_spinlock_lock_t lock = {0};
transaction_spinlock_init(&lock);
Expand All @@ -148,7 +155,7 @@ TEST_CASE("transaction_spinlock.c", "[transaction_spinlock]") {
REQUIRE(transaction_spinlock_try_lock(&lock, &transaction));

REQUIRE(lock.transaction_id != TRANSACTION_SPINLOCK_UNLOCKED);
REQUIRE(transaction.locks.size == 8);
REQUIRE(transaction.locks.size == 2);
REQUIRE(transaction.locks.count == 1);

transaction_release(&transaction);
Expand All @@ -169,7 +176,7 @@ TEST_CASE("transaction_spinlock.c", "[transaction_spinlock]") {
SECTION("transaction_spinlock_unlock") {
SECTION("unlock") {
transaction_t transaction = { .transaction_id = { }, .locks = { .count = 0 , .size = 0, .list = nullptr, } };
transaction_id_t *transaction_id = &transaction.transaction_id;
transaction_id_volatile_t *transaction_id = &transaction.transaction_id;
transaction_id->worker_index = 0; transaction_id->transaction_index = 1;
transaction_spinlock_lock_t lock = {0};
transaction_spinlock_init(&lock);
Expand All @@ -188,7 +195,7 @@ TEST_CASE("transaction_spinlock.c", "[transaction_spinlock]") {
SECTION("transaction_spinlock_lock") {
SECTION("fail already locked") {
transaction_t transaction = { .transaction_id = { }, .locks = { .count = 0 , .size = 0, .list = nullptr, } };
transaction_id_t *transaction_id = &transaction.transaction_id;
transaction_id_volatile_t *transaction_id = &transaction.transaction_id;
transaction_id->worker_index = 0; transaction_id->transaction_index = 0;
transaction_spinlock_lock_t lock = {0};
transaction_spinlock_init(&lock);
Expand All @@ -205,7 +212,7 @@ TEST_CASE("transaction_spinlock.c", "[transaction_spinlock]") {
SECTION("lock") {
int res, pthread_return_val, *pthread_return = nullptr;
transaction_t transaction = { .transaction_id = { }, .locks = { .count = 0 , .size = 0, .list = nullptr, } };
transaction_id_t *transaction_id = &transaction.transaction_id;
transaction_id_volatile_t *transaction_id = &transaction.transaction_id;
transaction_id->worker_index = 0; transaction_id->transaction_index = 0;
transaction_spinlock_lock_t lock = {0};
pthread_t pthread;
Expand Down
26 changes: 20 additions & 6 deletions tests/unit_tests/test-transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ TEST_CASE("transaction.c", "[transaction]") {
}

SECTION("transaction_locks_list_add") {
uint32_t locks_size = FFMA_OBJECT_SIZE_MIN / sizeof(transaction_spinlock_lock_volatile_t*);

transaction_t transaction = { };
transaction.locks.count = 0;
transaction.locks.size = 8;
transaction.locks.size = locks_size;
transaction.locks.list = (transaction_spinlock_lock_volatile_t**)ffma_mem_alloc(
sizeof(void*) * transaction.locks.size);

Expand All @@ -100,24 +102,36 @@ TEST_CASE("transaction.c", "[transaction]") {

REQUIRE(transaction_locks_list_add(&transaction, &lock));
REQUIRE(transaction.locks.count == 1);
REQUIRE(transaction.locks.size == 8);
REQUIRE(transaction.locks.size == locks_size);
REQUIRE(transaction.locks.list[0] == &lock);
}

SECTION("Add two lock") {
transaction_spinlock_lock_volatile_t lock[2] = { 0 };
REQUIRE(transaction_locks_list_add(&transaction, &lock[0]));
REQUIRE(transaction.locks.count == 1);
REQUIRE(transaction.locks.size == 8);
REQUIRE(transaction.locks.size == locks_size);
REQUIRE(transaction.locks.list[0] == &lock[0]);

REQUIRE(transaction_locks_list_add(&transaction, &lock[1]));
REQUIRE(transaction.locks.count == 2);
REQUIRE(transaction.locks.size == 8);
REQUIRE(transaction.locks.size == locks_size);
REQUIRE(transaction.locks.list[1] == &lock[1]);
}

SECTION("Force expansions") {
SECTION("Trigger an expansion expansion") {
transaction_spinlock_lock_volatile_t lock[3] = { 0 };

for(int index = 0; index < ARRAY_SIZE(lock); index++) {
REQUIRE(transaction_locks_list_add(&transaction, &lock[index]));
}

REQUIRE(transaction.locks.count == ARRAY_SIZE(lock));
REQUIRE(transaction.locks.size == locks_size * 2);
REQUIRE(transaction.locks.list[ARRAY_SIZE(lock) - 1] == &lock[ARRAY_SIZE(lock) - 1]);
}

SECTION("Trigger multiple expansion") {
transaction_spinlock_lock_volatile_t lock[10] = { 0 };

for(int index = 0; index < ARRAY_SIZE(lock); index++) {
Expand All @@ -144,7 +158,7 @@ TEST_CASE("transaction.c", "[transaction]") {
REQUIRE(transaction.transaction_id.transaction_index == current_transaction_index + 1);

REQUIRE(transaction.locks.count == 0);
REQUIRE(transaction.locks.size == 8);
REQUIRE(transaction.locks.size == FFMA_OBJECT_SIZE_MIN / sizeof(transaction_spinlock_lock_volatile_t*));
REQUIRE(transaction.locks.list != nullptr);

ffma_mem_free(transaction.locks.list);
Expand Down

0 comments on commit 5ef5bf7

Please sign in to comment.