Skip to content

Commit 322b109

Browse files
authored
KAFKA-9652: Fix throttle metric in RequestChannel and request log due to KIP-219 (apache#8567)
After KIP-219, responses are sent immediately and we rely on a combination of clients and muting of the channel to throttle. The result of this is that we need to track `apiThrottleTimeMs` as an explicit value instead of inferring it. On the other hand, we no longer need `apiRemoteCompleteTimeNanos`. Extend `BaseQuotaTest` to verify that throttle time in the request channel metrics are being set. Given the nature of the throttling numbers, the test is not particularly precise. I included a few clean-ups: * Pass KafkaMetric to QuotaViolationException so that the caller doesn't have to retrieve it from the metrics registry. * Inline Supplier in SocketServer (use SAM). * Reduce redundant `time.milliseconds` and `time.nanoseconds`calls. * Use monotonic clock in ThrottledChannel and simplify `compareTo` method. * Simplify `TimerTaskList.compareTo`. * Consolidate the number of places where we update `apiLocalCompleteTimeNanos` and `responseCompleteTimeNanos`. * Added `toString` to ByteBufferSend` and `MultiRecordsSend`. * Restrict access to methods in `QuotaTestClients` to expose only what we need to. Reviewers: Jun Rao <junrao@gmail.com>
1 parent 8a83025 commit 322b109

File tree

15 files changed

+156
-113
lines changed

15 files changed

+156
-113
lines changed

clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,25 @@
1717
package org.apache.kafka.common.metrics;
1818

1919
import org.apache.kafka.common.KafkaException;
20-
import org.apache.kafka.common.MetricName;
2120

2221
/**
2322
* Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota
2423
*/
2524
public class QuotaViolationException extends KafkaException {
2625

2726
private static final long serialVersionUID = 1L;
28-
private final MetricName metricName;
27+
private final KafkaMetric metric;
2928
private final double value;
3029
private final double bound;
3130

32-
public QuotaViolationException(MetricName metricName, double value, double bound) {
33-
this.metricName = metricName;
31+
public QuotaViolationException(KafkaMetric metric, double value, double bound) {
32+
this.metric = metric;
3433
this.value = value;
3534
this.bound = bound;
3635
}
3736

38-
public MetricName metricName() {
39-
return metricName;
37+
public KafkaMetric metric() {
38+
return metric;
4039
}
4140

4241
public double value() {
@@ -51,7 +50,7 @@ public double bound() {
5150
public String toString() {
5251
return getClass().getName()
5352
+ ": '"
54-
+ metricName
53+
+ metric.metricName()
5554
+ "' violated quota. Actual: "
5655
+ value
5756
+ ", Threshold: "

clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,7 @@ public void checkQuotas(long timeMs) {
209209
if (quota != null) {
210210
double value = metric.measurableValue(timeMs);
211211
if (!quota.acceptable(value)) {
212-
throw new QuotaViolationException(metric.metricName(), value,
213-
quota.bound());
212+
throw new QuotaViolationException(metric, value, quota.bound());
214213
}
215214
}
216215
}

clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,14 @@ public long writeTo(GatheringByteChannel channel) throws IOException {
6868
public long remaining() {
6969
return remaining;
7070
}
71+
72+
@Override
73+
public String toString() {
74+
return "ByteBufferSend(" +
75+
"destination='" + destination + "'" +
76+
", size=" + size +
77+
", remaining=" + remaining +
78+
", pending=" + pending +
79+
')';
80+
}
7181
}

clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,15 @@ public Map<TopicPartition, RecordConversionStats> recordConversionStats() {
118118
return recordConversionStats;
119119
}
120120

121+
@Override
122+
public String toString() {
123+
return "MultiRecordsSend(" +
124+
"dest='" + dest + "'" +
125+
", size=" + size +
126+
", totalWritten=" + totalWritten +
127+
')';
128+
}
129+
121130
private void updateRecordConversionStats(Send completedSend) {
122131
// The underlying send might have accumulated statistics that need to be recorded. For example,
123132
// LazyDownConversionRecordsSend accumulates statistics related to the number of bytes down-converted, the amount

core/src/main/scala/kafka/network/RequestChannel.scala

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ object RequestChannel extends Logging {
8787
@volatile var apiLocalCompleteTimeNanos = -1L
8888
@volatile var responseCompleteTimeNanos = -1L
8989
@volatile var responseDequeueTimeNanos = -1L
90-
@volatile var apiRemoteCompleteTimeNanos = -1L
9190
@volatile var messageConversionsTimeNanos = 0L
91+
@volatile var apiThrottleTimeMs = 0L
9292
@volatile var temporaryMemoryBytes = 0L
9393
@volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
9494

@@ -170,16 +170,6 @@ object RequestChannel extends Logging {
170170

171171
def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = {
172172
val endTimeNanos = Time.SYSTEM.nanoseconds
173-
// In some corner cases, apiLocalCompleteTimeNanos may not be set when the request completes if the remote
174-
// processing time is really small. This value is set in KafkaApis from a request handling thread.
175-
// This may be read in a network thread before the actual update happens in KafkaApis which will cause us to
176-
// see a negative value here. In that case, use responseCompleteTimeNanos as apiLocalCompleteTimeNanos.
177-
if (apiLocalCompleteTimeNanos < 0)
178-
apiLocalCompleteTimeNanos = responseCompleteTimeNanos
179-
// If the apiRemoteCompleteTimeNanos is not set (i.e., for requests that do not go through a purgatory), then it is
180-
// the same as responseCompleteTimeNanos.
181-
if (apiRemoteCompleteTimeNanos < 0)
182-
apiRemoteCompleteTimeNanos = responseCompleteTimeNanos
183173

184174
/**
185175
* Converts nanos to millis with micros precision as additional decimal places in the request log have low
@@ -193,8 +183,7 @@ object RequestChannel extends Logging {
193183

194184
val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
195185
val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
196-
val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
197-
val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
186+
val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - apiLocalCompleteTimeNanos)
198187
val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
199188
val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
200189
val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
@@ -215,7 +204,7 @@ object RequestChannel extends Logging {
215204
m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
216205
m.localTimeHist.update(Math.round(apiLocalTimeMs))
217206
m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
218-
m.throttleTimeHist.update(Math.round(apiThrottleTimeMs))
207+
m.throttleTimeHist.update(apiThrottleTimeMs)
219208
m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
220209
m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
221210
m.totalTimeHist.update(Math.round(totalTimeMs))
@@ -276,12 +265,6 @@ object RequestChannel extends Logging {
276265
}
277266

278267
abstract class Response(val request: Request) {
279-
locally {
280-
val nowNs = Time.SYSTEM.nanoseconds
281-
request.responseCompleteTimeNanos = nowNs
282-
if (request.apiLocalCompleteTimeNanos == -1L)
283-
request.apiLocalCompleteTimeNanos = nowNs
284-
}
285268

286269
def processor: Int = request.processor
287270

@@ -326,7 +309,7 @@ object RequestChannel extends Logging {
326309
}
327310
}
328311

329-
class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
312+
class RequestChannel(val queueSize: Int, val metricNamePrefix : String, time: Time) extends KafkaMetricsGroup {
330313
import RequestChannel._
331314
val metrics = new RequestChannel.Metrics
332315
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
@@ -362,6 +345,7 @@ class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends
362345

363346
/** Send a response back to the socket server to be sent over the network */
364347
def sendResponse(response: RequestChannel.Response): Unit = {
348+
365349
if (isTraceEnabled) {
366350
val requestHeader = response.request.header
367351
val message = response match {
@@ -379,6 +363,18 @@ class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends
379363
trace(message)
380364
}
381365

366+
response match {
367+
// We should only send one of the following per request
368+
case _: SendResponse | _: NoOpResponse | _: CloseConnectionResponse =>
369+
val request = response.request
370+
val timeNanos = time.nanoseconds()
371+
request.responseCompleteTimeNanos = timeNanos
372+
if (request.apiLocalCompleteTimeNanos == -1L)
373+
request.apiLocalCompleteTimeNanos = timeNanos
374+
// For a given request, these may happen in addition to one in the previous section, skip updating the metrics
375+
case _: StartThrottlingResponse | _: EndThrottlingResponse => ()
376+
}
377+
382378
val processor = processors.get(response.processor)
383379
// The processor may be null if it was shutdown. In this case, the connections
384380
// are closed, so the response is dropped.
@@ -444,7 +440,8 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
444440
val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags)
445441
// time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests)
446442
val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags)
447-
// time a request is throttled
443+
// time a request is throttled, not part of the request processing time (throttling is done at the client level
444+
// for clients that support KIP-219 and by muting the channel for the rest)
448445
val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags)
449446
// time a response spent in a response queue
450447
val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, tags)

core/src/main/scala/kafka/network/SocketServer.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import java.util
2525
import java.util.Optional
2626
import java.util.concurrent._
2727
import java.util.concurrent.atomic._
28-
import java.util.function.Supplier
2928

3029
import kafka.cluster.{BrokerEndPoint, EndPoint}
3130
import kafka.metrics.KafkaMetricsGroup
@@ -92,11 +91,12 @@ class SocketServer(val config: KafkaConfig,
9291
// data-plane
9392
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
9493
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
95-
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix)
94+
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
9695
// control-plane
9796
private var controlPlaneProcessorOpt : Option[Processor] = None
9897
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
99-
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, ControlPlaneMetricPrefix))
98+
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
99+
new RequestChannel(20, ControlPlaneMetricPrefix, time))
100100

101101
private var nextProcessorId = 0
102102
private var connectionQuotas: ConnectionQuotas = _
@@ -908,10 +908,6 @@ private[kafka] class Processor(val id: Int,
908908
}
909909
}
910910

911-
private def nowNanosSupplier = new Supplier[java.lang.Long] {
912-
override def get(): java.lang.Long = time.nanoseconds()
913-
}
914-
915911
private def poll(): Unit = {
916912
val pollTimeout = if (newConnections.isEmpty) 300 else 0
917913
try selector.poll(pollTimeout)
@@ -929,7 +925,8 @@ private[kafka] class Processor(val id: Int,
929925
openOrClosingChannel(receive.source) match {
930926
case Some(channel) =>
931927
val header = RequestHeader.parse(receive.payload)
932-
if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
928+
if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive,
929+
() => time.nanoseconds()))
933930
trace(s"Begin re-authentication: $channel")
934931
else {
935932
val nowNanos = time.nanoseconds()

core/src/main/scala/kafka/server/ClientQuotaManager.scala

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -250,18 +250,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
250250
}
251251

252252
def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs: Long): Int = {
253-
var throttleTimeMs = 0
254253
val clientSensors = getOrCreateQuotaSensors(session, clientId)
255254
try {
256255
clientSensors.quotaSensor.record(value, timeMs)
256+
0
257257
} catch {
258-
case _: QuotaViolationException =>
259-
// Compute the delay
260-
val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags))
261-
throttleTimeMs = throttleTime(clientMetric).toInt
262-
debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
258+
case e: QuotaViolationException =>
259+
val throttleTimeMs = throttleTime(e.value, e.bound, windowSize(e.metric, timeMs)).toInt
260+
debug(s"Quota violated for sensor (${clientSensors.quotaSensor.name}). Delay time: ($throttleTimeMs)")
261+
throttleTimeMs
263262
}
264-
throttleTimeMs
265263
}
266264

267265
/** "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value
@@ -337,16 +335,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
337335
* we need to add a delay of X to W such that O * W / (W + X) = T.
338336
* Solving for X, we get X = (O - T)/T * W.
339337
*/
340-
protected def throttleTime(clientMetric: KafkaMetric): Long = {
341-
val config = clientMetric.config
342-
val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable())
343-
val quota = config.quota()
344-
val difference = clientMetric.metricValue.asInstanceOf[Double] - quota.bound
338+
protected def throttleTime(quotaValue: Double, quotaBound: Double, windowSize: Long): Long = {
339+
val difference = quotaValue - quotaBound
345340
// Use the precise window used by the rate calculation
346-
val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds())
347-
throttleTimeMs.round
341+
val throttleTimeMs = difference / quotaBound * windowSize
342+
Math.round(throttleTimeMs)
348343
}
349344

345+
private def windowSize(metric: KafkaMetric, timeMs: Long): Long =
346+
measurableAsRate(metric.metricName, metric.measurable).windowSize(metric.config, timeMs)
347+
350348
// Casting to Rate because we only use Rate in Quota computation
351349
private def measurableAsRate(name: MetricName, measurable: Measurable): Rate = {
352350
measurable match {

core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,12 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
4646
* @param request client request
4747
* @return Number of milliseconds to throttle in case of quota violation. Zero otherwise
4848
*/
49-
def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request): Int = {
50-
if (request.apiRemoteCompleteTimeNanos == -1) {
51-
// When this callback is triggered, the remote API call has completed
52-
request.apiRemoteCompleteTimeNanos = time.nanoseconds
53-
}
54-
49+
def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request, timeMs: Long): Int = {
5550
if (quotasEnabled) {
5651
request.recordNetworkThreadTimeCallback = Some(timeNanos => recordNoThrottle(
5752
getOrCreateQuotaSensors(request.session, request.header.clientId), nanosToPercentage(timeNanos)))
5853
recordAndGetThrottleTimeMs(request.session, request.header.clientId,
59-
nanosToPercentage(request.requestThreadTimeNanos), time.milliseconds())
54+
nanosToPercentage(request.requestThreadTimeNanos), timeMs)
6055
} else {
6156
0
6257
}
@@ -69,8 +64,8 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
6964
}
7065
}
7166

72-
override protected def throttleTime(clientMetric: KafkaMetric): Long = {
73-
math.min(super.throttleTime(clientMetric), maxThrottleTimeMs)
67+
override protected def throttleTime(quotaValue: Double, quotaBound: Double, windowSize: Long): Long = {
68+
math.min(super.throttleTime(quotaValue, quotaBound, windowSize), maxThrottleTimeMs)
7469
}
7570

7671
override protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {

0 commit comments

Comments
 (0)