Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
devel
-----

* Fix unique _key generation in two cases: AQL INSERT operations in a loop
without user specified _key in collections with exactly one shard or with
multiple shard, non-standard sharding and `forceOneShardAttributeValue`.

* Fix BTS-1732 and BTS-2177: Possible crashes during shutdown, and maintainer
mode (i.e. non-production) assertions during tests.

Expand Down
20 changes: 20 additions & 0 deletions arangod/Cluster/ClusterInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,11 @@ uint64_t ClusterInfo::uniqid(uint64_t count) {
return idCounter.fetch_add(1);
}

TRI_IF_FAILURE("always-fetch-new-cluster-wide-uniqid") {
uint64_t result = _agency.uniqid(count, 0.0);
return result;
}

std::lock_guard mutexLocker{_idLock};

if (_uniqid._currentValue + count - 1 <= _uniqid._upperValue) {
Expand Down Expand Up @@ -661,6 +666,21 @@ uint64_t ClusterInfo::uniqid(uint64_t count) {
return result;
}

//////////////////////////////////////////////////////////////////////////////
/// @brief get a number of cluster-wide unique IDs, returns the first
/// one and guarantees that <number> are reserved for the caller.
/// This variant uses _agency to directly get things from the agency.
//////////////////////////////////////////////////////////////////////////////

std::optional<uint64_t> ClusterInfo::uniqidFromAgency(uint64_t number) {
try {
uint64_t result = _agency.uniqid(number, 0.0);
return {result};
} catch (...) {
return {};
}
}

////////////////////////////////////////////////////////////////////////////////
/// @brief flush the caches (used for testing)
////////////////////////////////////////////////////////////////////////////////
Expand Down
9 changes: 9 additions & 0 deletions arangod/Cluster/ClusterInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,15 @@ class ClusterInfo final {

uint64_t uniqid(uint64_t = 1);

//////////////////////////////////////////////////////////////////////////////
/// @brief get a number of cluster-wide unique IDs, returns the first
/// one and guarantees that <number> are reserved for the caller.
/// This variant uses _agency to directly get things from the agency.
/// If the optional value is empty, an error occurred.
//////////////////////////////////////////////////////////////////////////////

std::optional<uint64_t> uniqidFromAgency(uint64_t number);

/**
* @brief Agency dump including replicated log and compaction
* @param body Builder to fill with dump
Expand Down
167 changes: 167 additions & 0 deletions arangod/RestHandler/RestAdminClusterHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <velocypack/Buffer.h>
#include <velocypack/Iterator.h>

#include "Agency/AgencyComm.h"
#include "Agency/AgencyPaths.h"
#include "Agency/AsyncAgencyComm.h"
#include "Agency/Supervision.h"
Expand Down Expand Up @@ -329,6 +330,7 @@ RestAdminClusterHandler::RestAdminClusterHandler(ArangodServer& server,

std::string const RestAdminClusterHandler::Health = "health";
std::string const RestAdminClusterHandler::NumberOfServers = "numberOfServers";
std::string const RestAdminClusterHandler::UniqId = "uniqId";
std::string const RestAdminClusterHandler::Maintenance = "maintenance";
std::string const RestAdminClusterHandler::NodeVersion = "nodeVersion";
std::string const RestAdminClusterHandler::NodeEngine = "nodeEngine";
Expand Down Expand Up @@ -399,6 +401,9 @@ auto RestAdminClusterHandler::executeAsync() -> futures::Future<futures::Unit> {
} else if (command == NumberOfServers) {
co_await handleNumberOfServers();
co_return;
} else if (command == UniqId) {
co_await handleUniqId();
co_return;
} else if (command == Maintenance) {
co_await handleMaintenance();
co_return;
Expand Down Expand Up @@ -2070,6 +2075,168 @@ async<void> RestAdminClusterHandler::handleNumberOfServers() {
}
}

async<void> RestAdminClusterHandler::handleUniqId() {
if (!ServerState::instance()->isCoordinator()) {
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_HTTP_FORBIDDEN,
"only allowed on coordinators");
co_return;
}

// Only PUT method is allowed and always requires admin privileges
if (!ExecContext::current().isAdminUser()) {
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_HTTP_FORBIDDEN);
co_return;
}

switch (request()->requestType()) {
case rest::RequestType::PUT:
co_return co_await handlePutUniqId();
default:
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED,
TRI_ERROR_HTTP_METHOD_NOT_ALLOWED);
co_return;
}
}

// The following RestHandler method has two purposes:
// (1) Fetch and reserve a certain range of globally-unique IDs from the
// cluster. This is, when the `number` query parameter is given.
// (2) Make sure that subsequent ranges which anybody will reserve will
// always be ranges whose members are larger than the given value.
// This is, then the `minimum` query parameter is given.
// In the first case, there is a limitation to at most 1000000 keys at
// a time, in the second case the minimum must be at most 2^60 and if the
// currently maximal ID is already larger, nothing is done.
// Both cases return a range of reserved keys.

async<void> RestAdminClusterHandler::handlePutUniqId() {
if (AsyncAgencyCommManager::INSTANCE == nullptr) {
generateError(rest::ResponseCode::FORBIDDEN, TRI_ERROR_HTTP_FORBIDDEN,
"not allowed on single servers");
co_return;
}

// Parse query parameters
auto& req = *request();

std::string numberParam = req.value("number");
std::string minimumParam = req.value("minimum");

// Validate that exactly one parameter is provided
if (numberParam.empty() && minimumParam.empty()) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER,
"either 'number' or 'minimum' parameter must be provided");
co_return;
}

if (!numberParam.empty() && !minimumParam.empty()) {
generateError(
rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER,
"only one of 'number' or 'minimum' parameter can be provided");
co_return;
}

try {
uint64_t smallest, largest;

auto& ci = server().getFeature<ClusterFeature>().clusterInfo();

if (!numberParam.empty()) {
// Mode 1: Get a specific number of unique IDs
uint64_t number;
try {
number = std::stoull(numberParam);
} catch (std::exception const&) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER,
"parameter 'number' must be a valid positive integer");
co_return;
}

if (number < 1 || number > 1000000) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER,
"parameter 'number' must be between 1 and 1000000");
co_return;
}

// Use AgencyComm via ClusterInfo to get unique IDs:
auto val = ci.uniqidFromAgency(number);
if (!val.has_value()) {
generateError(rest::ResponseCode::SERVICE_UNAVAILABLE,
TRI_ERROR_HTTP_SERVICE_UNAVAILABLE,
"could not talk to agency");
co_return;
}

smallest = val.value();
largest = smallest + number - 1;

} else {
// Mode 2: Ensure LatestID is at least the minimum value
uint64_t minimum;
try {
minimum = std::stoull(minimumParam);
} catch (std::exception const&) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER,
"parameter 'minimum' must be a valid positive integer");
co_return;
}

// Limit to 2^60
constexpr uint64_t maxValue = 1ULL << 60;
if (minimum > maxValue) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_BAD_PARAMETER,
"parameter 'minimum' must not exceed 2^60");
co_return;
}

// Read current LatestID to see if we need to update it
auto val = ci.uniqidFromAgency(1);
if (!val.has_value()) {
generateError(rest::ResponseCode::SERVICE_UNAVAILABLE,
TRI_ERROR_HTTP_SERVICE_UNAVAILABLE,
"could not talk to agency");
co_return;
}
uint64_t currentValue = val.value();

if (currentValue >= minimum) {
// Current value is already >= minimum, return current range
smallest = currentValue;
largest = currentValue;
} else {
// Need to update LatestID to minimum
uint64_t diff = minimum - currentValue;
val = ci.uniqidFromAgency(diff);
if (!val.has_value()) {
generateError(rest::ResponseCode::SERVICE_UNAVAILABLE,
TRI_ERROR_HTTP_SERVICE_UNAVAILABLE,
"could not talk to agency");
co_return;
}
smallest = val.value();
largest = smallest + diff - 1;
}
}

// Generate response JSON
VPackBuilder builder;
{
VPackObjectBuilder ob(&builder);
builder.add("smallest", VPackValue(std::to_string(smallest)));
builder.add("largest", VPackValue(std::to_string(largest)));
}

generateOk(rest::ResponseCode::OK, builder);

} catch (basics::Exception const& e) {
generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
e.what());
} catch (std::exception const& e) {
generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
e.what());
}
}

async<void> RestAdminClusterHandler::handleHealth() {
// We allow this API whenever one is authenticated in some way. There used
// to be a check for isAdminUser here. However, we want that the UI with
Expand Down
4 changes: 4 additions & 0 deletions arangod/RestHandler/RestAdminClusterHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RestAdminClusterHandler : public RestVocbaseBaseHandler {
static std::string const CancelJob;
static std::string const Health;
static std::string const NumberOfServers;
static std::string const UniqId;
static std::string const Maintenance;
static std::string const NodeVersion;
static std::string const NodeStatistics;
Expand Down Expand Up @@ -89,6 +90,9 @@ class RestAdminClusterHandler : public RestVocbaseBaseHandler {
async<void> handleGetNumberOfServers();
async<void> handlePutNumberOfServers();

async<void> handleUniqId();
async<void> handlePutUniqId();

async<void> handleNodeVersion();
async<void> handleNodeStatistics();
async<void> handleNodeEngine();
Expand Down
49 changes: 38 additions & 11 deletions arangod/VocBase/KeyGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ class TraditionalKeyGeneratorSingle final : public TraditionalKeyGenerator {
bool allowUserKeys, uint64_t lastValue)
: TraditionalKeyGenerator(collection, allowUserKeys),
_lastValue(lastValue) {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
TRI_ASSERT(!ServerState::instance()->isCoordinator() &&
!ServerState::instance()->isDBServer());
}

/// @brief build a VelocyPack representation of the generator in the builder
Expand Down Expand Up @@ -368,14 +369,15 @@ class TraditionalKeyGeneratorCoordinator final
explicit TraditionalKeyGeneratorCoordinator(
ClusterInfo& ci, LogicalCollection const& collection, bool allowUserKeys)
: TraditionalKeyGenerator(collection, allowUserKeys), _ci(ci) {
TRI_ASSERT(ServerState::instance()->isCoordinator());
TRI_ASSERT(ServerState::instance()->isCoordinator() ||
ServerState::instance()->isDBServer());
}

private:
/// @brief generate a key value (internal)
uint64_t generateValue() override {
TRI_ASSERT(ServerState::instance()->isCoordinator());
TRI_ASSERT(_collection.numberOfShards() != 1 || _collection.isSmart());
TRI_ASSERT(ServerState::instance()->isCoordinator() ||
ServerState::instance()->isDBServer());
TRI_IF_FAILURE("KeyGenerator::generateOnCoordinator") {
// for testing purposes only
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
Expand Down Expand Up @@ -496,8 +498,6 @@ class PaddedKeyGeneratorSingle final : public PaddedKeyGenerator {
private:
/// @brief generate a key
uint64_t generateValue() override {
TRI_ASSERT(ServerState::instance()->isSingleServer() ||
_collection.numberOfShards() == 1);
TRI_IF_FAILURE("KeyGenerator::generateOnSingleServer") {
// for testing purposes only
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
Expand All @@ -519,14 +519,15 @@ class PaddedKeyGeneratorCoordinator final : public PaddedKeyGenerator {
LogicalCollection const& collection,
bool allowUserKeys, uint64_t lastValue)
: PaddedKeyGenerator(collection, allowUserKeys, lastValue), _ci(ci) {
TRI_ASSERT(ServerState::instance()->isCoordinator());
TRI_ASSERT(ServerState::instance()->isCoordinator() ||
ServerState::instance()->isDBServer());
}

private:
/// @brief generate a key value (internal)
uint64_t generateValue() override {
TRI_ASSERT(ServerState::instance()->isCoordinator());
TRI_ASSERT(_collection.numberOfShards() != 1 || _collection.isSmart());
TRI_ASSERT(ServerState::instance()->isCoordinator() ||
ServerState::instance()->isDBServer());
TRI_IF_FAILURE("KeyGenerator::generateOnCoordinator") {
// for testing purposes only
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
Expand Down Expand Up @@ -717,7 +718,20 @@ std::unordered_map<GeneratorMapType,
bool allowUserKeys = basics::VelocyPackHelper::getBooleanValue(
options, StaticStrings::AllowUserKeys, true);

if (ServerState::instance()->isCoordinator()) {
if (ServerState::instance()->isCoordinator() ||
ServerState::instance()->isDBServer()) {
// We are in a cluster where keys need to be globally unique. Under
// normal circumstances, coordinators create the keys.
// However, there are cases, in which this can happen on a DBServer,
// amongst them are:
// - collections with a single shard and an AQL query which does
// INSERT in a loop, no _key given by query
// - collections with multiple shards, non-standard sharding, an
// AQL query which does INSERT in a loop, no key given by query
// and usage of the option `forceOneShardAttributeValue`
// In these cases we have to generate a key using a cluster-wide
// unique identifier, so we use the "Coordinator" key generator
// also on DBServers.
auto& ci = collection.vocbase()
.server()
.getFeature<ClusterFeature>()
Expand Down Expand Up @@ -802,7 +816,20 @@ std::unordered_map<GeneratorMapType,
bool allowUserKeys = basics::VelocyPackHelper::getBooleanValue(
options, StaticStrings::AllowUserKeys, true);

if (ServerState::instance()->isCoordinator()) {
if (ServerState::instance()->isCoordinator() ||
ServerState::instance()->isDBServer()) {
// We are in a cluster where keys need to be globally unique. Under
// normal circumstances, coordinators create the keys.
// However, there are cases, in which this can happen on a DBServer,
// amongst them are:
// - collections with a single shard and an AQL query which does
// INSERT in a loop, no _key given by query
// - collections with multiple shards, non-standard sharding, an
// AQL query which does INSERT in a loop, no key given by query
// and usage of the option `forceOneShardAttributeValue`
// In these cases we have to generate a key using a cluster-wide
// unique identifier, so we use the "Coordinator" key generator
// also on DBServers.
auto& ci = collection.vocbase()
.server()
.getFeature<ClusterFeature>()
Expand Down
3 changes: 2 additions & 1 deletion tests/Transaction/ManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ TEST_F(TransactionManagerTest, simple_transaction_and_commit_is_follower) {
ASSERT_TRUE(
trx.state()->hasHint(transaction::Hints::Hint::IS_FOLLOWER_TRX));

auto doc = arangodb::velocypack::Parser::fromJson("{ \"abc\": 1}");
auto doc = arangodb::velocypack::Parser::fromJson(
"{ \"_key\": \"blablabla\", \"abc\": 1}");

OperationOptions opts;
auto opRes = trx.insert(coll->name(), doc->slice(), opts);
Expand Down
Loading