Skip to content

Change Shutdown() and ForceFlush() to return bool for traces #419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/cpp-metrics-sdk-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ class StdoutExporter: public exporter {
* Shuts down the channel and cleans up resources as required.
*
*/
void Shutdown();
bool Shutdown();
};
```

Expand Down
3 changes: 2 additions & 1 deletion docs/cpp-ostream-exporter-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,10 @@ public:
return sdktrace::ExportResult::kSuccess;
}

void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept
{
isShutdown = true;
return true;
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
/**
* @param timeout an optional value containing the timeout of the exporter
* note: passing custom timeout values is not currently supported for this exporter
* @return Returns the status of the operation
*/
void Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override{};
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
return true;
};

/**
* @return Returns a shared pointer to this exporters InMemorySpanData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class OStreamSpanExporter final : public sdktrace::SpanExporter
sdktrace::ExportResult Export(
const nostd::span<std::unique_ptr<sdktrace::Recordable>> &spans) noexcept override;

void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override;
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

private:
std::ostream &sout_;
Expand Down
3 changes: 2 additions & 1 deletion exporters/ostream/src/span_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ sdktrace::ExportResult OStreamSpanExporter::Export(
return sdktrace::ExportResult::kSuccess;
}

void OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
isShutdown_ = true;
return true;
}

} // namespace trace
Expand Down
2 changes: 1 addition & 1 deletion exporters/ostream/test/ostream_span_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ TEST(OStreamSpanExporter, Shutdown)
// Redirect cout to our stringstream buffer
std::cout.rdbuf(stdoutOutput.rdbuf());

processor->Shutdown();
EXPECT_TRUE(processor->Shutdown());
processor->OnEnd(std::move(recordable));

std::cout.rdbuf(sbuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ class OtlpExporter final : public opentelemetry::sdk::trace::SpanExporter
* Shut down the exporter.
* @param timeout an optional timeout, the default timeout of 0 means that no
* timeout is applied.
* @return return the status of this operation
*/
void Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override{};
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
return true;
}

private:
// The configuration options associated with this exporter.
Expand Down
20 changes: 13 additions & 7 deletions ext/include/opentelemetry/ext/zpages/tracez_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,28 @@ class TracezSpanProcessor : public opentelemetry::sdk::trace::SpanProcessor
/*
* For now, does nothing. In the future, it
* may send all ended spans that have not yet been sent to the aggregator.
* @param timeout an optional timeout, the default timeout of 0 means that no
* timeout is applied. Currently, timeout does nothing.
* @param timeout an optional timeout. Currently, timeout does nothing.
* @return return the status of the operation.
*/
void ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
{}
bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
return true;
}

/*
* Shut down the processor and do any cleanup required, which is none.
* After the call to Shutdown, subsequent calls to OnStart, OnEnd, ForceFlush
* or Shutdown will return immediately without doing anything.
* @param timeout an optional timeout, the default timeout of 0 means that no
* timeout is applied. Currently, timeout does nothing.
* @return return the status of the operation.
*/
void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
{}
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
return true;
}

private:
mutable std::mutex mtx_;
Expand Down
4 changes: 2 additions & 2 deletions ext/test/zpages/tracez_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,8 @@ TEST_F(TracezProcessor, FlushShutdown)
auto pre_running_sz = running.size();
auto pre_completed_sz = completed.size();

processor->ForceFlush();
processor->Shutdown();
EXPECT_TRUE(processor->ForceFlush());
EXPECT_TRUE(processor->Shutdown());

UpdateSpans(processor, completed, running);

Expand Down
7 changes: 4 additions & 3 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class BatchSpanProcessor : public SpanProcessor
*
* NOTE: Timeout functionality not supported yet.
*/
void ForceFlush(
std::chrono::microseconds timeout = std::chrono::milliseconds(0)) noexcept override;
bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
Expand All @@ -77,7 +77,8 @@ class BatchSpanProcessor : public SpanProcessor
*
* NOTE: Timeout functionality not supported yet.
*/
void Shutdown(std::chrono::microseconds timeout = std::chrono::milliseconds(0)) noexcept override;
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Class destructor which invokes the Shutdown() method. The Shutdown() method is supposed to be
Expand Down
8 changes: 4 additions & 4 deletions sdk/include/opentelemetry/sdk/trace/exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ class SpanExporter

/**
* Shut down the exporter.
* @param timeout an optional timeout, the default timeout of 0 means that no
* timeout is applied.
* @param timeout an optional timeout.
* @return return the status of the operation.
*/
virtual void Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;
virtual bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0;
};
} // namespace trace
} // namespace sdk
Expand Down
8 changes: 4 additions & 4 deletions sdk/include/opentelemetry/sdk/trace/processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class SpanProcessor
* @param timeout an optional timeout, the default timeout of 0 means that no
* timeout is applied.
*/
virtual void ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;
virtual bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0;

/**
* Shut down the processor and do any cleanup required. Ended spans are
Expand All @@ -57,8 +57,8 @@ class SpanProcessor
* @param timeout an optional timeout, the default timeout of 0 means that no
* timeout is applied.
*/
virtual void Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;
virtual bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0;
};
} // namespace trace
} // namespace sdk
Expand Down
14 changes: 9 additions & 5 deletions sdk/include/opentelemetry/sdk/trace/simple_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,21 @@ class SimpleSpanProcessor : public SpanProcessor
}
}

void ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
{}
bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
return true;
}

void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
// We only call shutdown ONCE.
if (exporter_ != nullptr && !shutdown_latch_.test_and_set(std::memory_order_acquire))
{
exporter_->Shutdown(timeout);
return exporter_->Shutdown(timeout);
}
return true;
}

~SimpleSpanProcessor() { Shutdown(); }
Expand Down
2 changes: 1 addition & 1 deletion sdk/include/opentelemetry/sdk/trace/tracer_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TracerProvider final : public opentelemetry::trace::TracerProvider
/**
* Shutdown the span processor associated with this tracer provider.
*/
void Shutdown() noexcept;
bool Shutdown() noexcept;

private:
opentelemetry::sdk::AtomicSharedPtr<SpanProcessor> processor_;
Expand Down
12 changes: 8 additions & 4 deletions sdk/src/trace/batch_span_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ void BatchSpanProcessor::OnEnd(std::unique_ptr<Recordable> &&span) noexcept
}
}

void BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
if (is_shutdown_.load() == true)
{
return;
return false;
}

is_force_flush_ = true;
Expand All @@ -77,6 +77,8 @@ void BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept

// Notify the worker thread
is_force_flush_notified_ = false;

return true;
}

void BatchSpanProcessor::DoBackgroundWork()
Expand Down Expand Up @@ -173,16 +175,18 @@ void BatchSpanProcessor::DrainQueue()
}
}

void BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_.store(true);

cv_.notify_one();
worker_thread_.join();
if (exporter_ != nullptr)
{
exporter_->Shutdown();
return exporter_->Shutdown();
}

return true;
}

BatchSpanProcessor::~BatchSpanProcessor()
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/trace/tracer_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ std::shared_ptr<Sampler> TracerProvider::GetSampler() const noexcept
return sampler_;
}

void TracerProvider::Shutdown() noexcept
bool TracerProvider::Shutdown() noexcept
{
GetProcessor()->Shutdown();
return GetProcessor()->Shutdown();
}
} // namespace trace
} // namespace sdk
Expand Down
14 changes: 8 additions & 6 deletions sdk/test/trace/batch_span_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ class MockSpanExporter final : public sdk::trace::SpanExporter
return sdk::trace::ExportResult::kSuccess;
}

void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
*is_shutdown_ = true;
return true;
}

bool IsExportCompleted() { return is_export_completed_->load(); }
Expand Down Expand Up @@ -135,7 +137,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestShutdown)
batch_processor->OnEnd(std::move(test_spans->at(i)));
}

batch_processor->Shutdown();
EXPECT_TRUE(batch_processor->Shutdown());

EXPECT_EQ(num_spans, spans_received->size());
for (int i = 0; i < num_spans; ++i)
Expand Down Expand Up @@ -165,7 +167,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
// Give some time to export
std::this_thread::sleep_for(std::chrono::milliseconds(50));

batch_processor->ForceFlush();
EXPECT_TRUE(batch_processor->ForceFlush());

EXPECT_EQ(num_spans, spans_received->size());
for (int i = 0; i < num_spans; ++i)
Expand All @@ -183,7 +185,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
// Give some time to export the spans
std::this_thread::sleep_for(std::chrono::milliseconds(50));

batch_processor->ForceFlush();
EXPECT_TRUE(batch_processor->ForceFlush());

EXPECT_EQ(num_spans * 2, spans_received->size());
for (int i = 0; i < num_spans; ++i)
Expand Down Expand Up @@ -215,7 +217,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss)
// Give some time to export the spans
std::this_thread::sleep_for(std::chrono::milliseconds(700));

batch_processor->ForceFlush();
EXPECT_TRUE(batch_processor->ForceFlush());

// Span should be exported by now
EXPECT_GE(max_queue_size, spans_received->size());
Expand Down Expand Up @@ -243,7 +245,7 @@ TEST_F(BatchSpanProcessorTestPeer, TestManySpansLossLess)
// Give some time to export the spans
std::this_thread::sleep_for(std::chrono::milliseconds(50));

batch_processor->ForceFlush();
EXPECT_TRUE(batch_processor->ForceFlush());

EXPECT_EQ(num_spans, spans_received->size());
for (int i = 0; i < num_spans; ++i)
Expand Down
6 changes: 4 additions & 2 deletions sdk/test/trace/simple_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ TEST(SimpleProcessor, ToInMemorySpanExporter)

ASSERT_EQ(1, span_data->GetSpans().size());

processor.Shutdown();
EXPECT_TRUE(processor.Shutdown());
}

// An exporter that does nothing but record (and give back ) the # of times Shutdown was called.
Expand All @@ -46,9 +46,11 @@ class RecordShutdownExporter final : public SpanExporter
return ExportResult::kSuccess;
}

void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
*shutdown_counter_ += 1;
return true;
}

private:
Expand Down
5 changes: 1 addition & 4 deletions sdk/test/trace/tracer_provider_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,5 @@ TEST(TracerProvider, Shutdown)

TracerProvider tp1(processor1);

tp1.Shutdown();

// Verify Shutdown returns.
ASSERT_TRUE(true);
EXPECT_TRUE(tp1.Shutdown());
}