Skip to content

Commit 75b7120

Browse files
committed
Fixes gearpump#467 Expose codahale metrics by rest service
1 parent 16da7ce commit 75b7120

File tree

19 files changed

+394
-81
lines changed

19 files changed

+394
-81
lines changed

conf/gear.conf

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,16 @@ gearpump {
4545

4646
### Flag to enable metrics
4747
metrics {
48-
enabled = false
48+
enabled = true
4949

5050
# We will take one metric out of ${sample.rate}
5151
sample-rate = 10
5252

5353
report-interval-ms = 15000
5454

5555
# reporter = "logfile"
56-
reporter = "graphite"
56+
# reporter = "graphite"
57+
reporter = "akka"
5758

5859
graphite {
5960
## Graphite host settings

core/src/main/resources/reference.conf

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ gearpump {
2727

2828
report-interval-ms = 15000
2929

30-
# reporter = "logfile"
31-
reporter = "graphite"
30+
# reporter = "graphite"
31+
# reporter = "akka"
32+
reporter = "logfile"
3233

3334
graphite {
3435
## Graphite host settings

core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.gearpump.cluster.master.Master.{MasterInfo, MasterDescription}
2424
import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
2525
import org.apache.gearpump.cluster.worker.WorkerDescription
2626

27+
import scala.reflect.ClassTag
2728
import scala.util.Try
2829

2930
/**
@@ -106,11 +107,13 @@ object MasterToAppMaster {
106107
val AppMasterActive: AppMasterStatus = "active"
107108
val AppMasterInActive: AppMasterStatus = "inactive"
108109

110+
sealed trait StreamingType
109111
case class AppMasterData(appId: Int, appName: String, appMasterPath: String, workerPath: String, status: AppMasterStatus)
110112
case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
111113
case class AppMastersData(appMasters: List[AppMasterData])
112114
case object AppMastersDataRequest
113115
case class AppMasterDataDetailRequest(appId: Int)
116+
case class AppMasterMetricsRequest(appId: Int) extends StreamingType
114117

115118
case class ReplayFromTimestampWindowTrailingEdge(appId: Int)
116119

@@ -128,3 +131,4 @@ object WorkerToAppMaster {
128131
case class ShutdownExecutorFailed(reason: String = null, ex: Throwable = null)
129132
}
130133

134+

core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,15 @@ private[cluster] class AppManager(masterHA : ActorRef, kvService: ActorRef, laun
185185
case None =>
186186
sender ! AppMasterDataDetail(appId)
187187
}
188+
case appMasterMetricsRequest: AppMasterMetricsRequest =>
189+
val appId = appMasterMetricsRequest.appId
190+
val (appMaster, info) = appMasterRegistry.getOrElse(appId, (null, null))
191+
Option(appMaster) match {
192+
case Some(_appMaster) =>
193+
_appMaster forward appMasterMetricsRequest
194+
case None =>
195+
}
196+
188197
}
189198

190199
def workerMessage: Receive = {

core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ private[cluster] class Master extends Actor with Stash {
165165
case app : AppMasterDataDetailRequest =>
166166
LOG.info(s"Receive from client, forwarding to AppManager")
167167
appManager.forward(app)
168+
case app : AppMasterMetricsRequest =>
169+
LOG.info(s"AppMasterMetricsRequestFromActor Receive from client, forwarding to AppManager")
170+
appManager.forward(app)
168171

169172
}
170173

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.gearpump.metrics
20+
21+
import java.util.concurrent.TimeUnit
22+
23+
import akka.actor.ActorSystem
24+
import com.codahale.metrics._
25+
import org.slf4j.Marker
26+
;
27+
28+
/**
29+
* A reporter class for logging metrics values to a remote actor periodically
30+
*/
31+
class AkkaReporter(system: ActorSystem, registry: MetricRegistry, marker: Marker, rateUnit:TimeUnit, durationUnit:TimeUnit, filter: MetricFilter) extends ScheduledReporter(registry, "akka-reporter", filter, rateUnit, durationUnit) {
32+
override def report(gauges: java.util.SortedMap[String, com.codahale.metrics.Gauge[_]],
33+
counters: java.util.SortedMap[String, com.codahale.metrics.Counter],
34+
histograms: java.util.SortedMap[String, com.codahale.metrics.Histogram],
35+
meters: java.util.SortedMap[String, com.codahale.metrics.Meter],
36+
timers: java.util.SortedMap[String, com.codahale.metrics.Timer]): Unit = {
37+
import scala.collection.JavaConversions._
38+
39+
val sgauges = collection.SortedMap(gauges.toSeq: _*)
40+
sgauges.foreach(pair => {
41+
import org.apache.gearpump.metrics.Metrics._
42+
val (key, value) = pair
43+
system.eventStream.publish(Gauge(key, value))
44+
})
45+
46+
val scounters = collection.SortedMap(counters.toSeq: _*)
47+
scounters.foreach(pair => {
48+
import org.apache.gearpump.metrics.Metrics._
49+
val (key, value: com.codahale.metrics.Counter) = pair
50+
system.eventStream.publish(Counter(key, value.getCount))
51+
})
52+
53+
val shistograms = collection.SortedMap(histograms.toSeq: _*)
54+
shistograms.foreach(pair => {
55+
import org.apache.gearpump.metrics.Metrics._
56+
val (_, value: com.codahale.metrics.Histogram) = pair
57+
val s = value.getSnapshot
58+
system.eventStream.publish(Histogram(value.getCount, s.getMin, s.getMax, s.getMean, s.getStdDev,
59+
s.getMedian, s.get75thPercentile, s.get95thPercentile, s.get98thPercentile, s.get99thPercentile, s.get999thPercentile))
60+
})
61+
62+
val smeters = collection.SortedMap(meters.toSeq: _*)
63+
smeters.foreach(pair => {
64+
import org.apache.gearpump.metrics.Metrics._
65+
val (key, value: com.codahale.metrics.Meter) = pair
66+
system.eventStream.publish(Meter(key, value.getCount, convertRate(value.getMeanRate), convertRate(value.getOneMinuteRate), convertRate(value.getFiveMinuteRate),
67+
convertRate(value.getFifteenMinuteRate), getRateUnit))
68+
})
69+
70+
val stimers = collection.SortedMap(timers.toSeq: _*)
71+
stimers.foreach(pair => {
72+
import org.apache.gearpump.metrics.Metrics._
73+
val (key, value: com.codahale.metrics.Timer) = pair
74+
val s = value.getSnapshot
75+
system.eventStream.publish(Timer(key, value.getCount, convertDuration(s.getMin), convertDuration(s.getMax), convertDuration(s.getMean),
76+
convertDuration(s.getStdDev), convertDuration(s.getMedian), convertDuration(s.get75thPercentile), convertDuration(s.get95thPercentile), convertDuration(s.get98thPercentile),
77+
convertDuration(s.get99thPercentile), convertDuration(s.get999thPercentile), convertRate(value.getMeanRate), convertRate(value.getOneMinuteRate),
78+
convertRate(value.getFiveMinuteRate), convertRate(value.getFifteenMinuteRate), getRateUnit, getDurationUnit))
79+
80+
})
81+
}
82+
83+
override def getRateUnit: String = {
84+
"events/" + super.getRateUnit
85+
}
86+
}
87+
88+
object AkkaReporter {
89+
case class Builder(registry: MetricRegistry, marker: Marker, var rateUnit: TimeUnit, var durationUnit: TimeUnit, var flter: MetricFilter) {
90+
91+
def build(system: ActorSystem) = {
92+
new AkkaReporter(system, registry, marker, rateUnit, durationUnit, flter)
93+
}
94+
95+
def convertRatesTo(ru: TimeUnit) = {
96+
this.rateUnit = ru
97+
this
98+
}
99+
100+
def convertDurationsTo(du: TimeUnit) = {
101+
this.durationUnit = du
102+
this
103+
}
104+
105+
def filter(f: MetricFilter) = {
106+
this.flter = f
107+
this
108+
}
109+
}
110+
111+
object Builder {
112+
def apply(registry: MetricRegistry): Builder = Builder(registry, null, TimeUnit.SECONDS, TimeUnit.MILLISECONDS, MetricFilter.ALL)
113+
}
114+
def forRegistry(registry: MetricRegistry): Builder = {
115+
Builder(registry)
116+
}
117+
}
118+

core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@ import java.util.concurrent.TimeUnit
2424
import akka.actor._
2525
import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
2626
import com.codahale.metrics.{Slf4jReporter, ConsoleReporter, MetricFilter, MetricRegistry}
27+
import org.apache.gearpump.TimeStamp
2728
import org.apache.gearpump.util.LogUtil
2829
import org.slf4j.Logger
2930

31+
import scala.reflect.ClassTag
32+
3033
class Metrics(sampleRate: Int) extends Extension {
3134

3235
val registry = new MetricRegistry()
@@ -48,6 +51,15 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
4851
val LOG: Logger = LogUtil.getLogger(getClass)
4952
import org.apache.gearpump.util.Constants._
5053

54+
sealed trait MetricType
55+
case class Histogram(count: Long, min: Long, max: Long, mean: Double, stddev: Double, median: Double, p75: Double, p95: Double, p98: Double, p99: Double, p999: Double) extends MetricType
56+
case class Counter(name: String, value: Long) extends MetricType
57+
case class Meter(name: String, count: Long, meanRate: Double, m1: Double, m5: Double, m15: Double, rateUnit: String) extends MetricType
58+
case class Timer(name: String, count: Long, min: Double, max: Double, mean: Double, stddev: Double,
59+
median: Double, p75: Double, p95: Double, p98: Double, p99: Double, p999: Double, meanRate: Double,
60+
m1: Double, m5: Double, m15: Double, rateUnit: String, durationUnit: String) extends MetricType
61+
case class Gauge[T:ClassTag](name: String, value: T) extends MetricType
62+
5163
override def get(system: ActorSystem): Metrics = super.get(system)
5264

5365
override def lookup = Metrics
@@ -103,13 +115,29 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
103115
})
104116
}
105117

118+
def startAkkaReporter = {
119+
120+
val reporter = AkkaReporter.forRegistry(meters.registry)
121+
.convertRatesTo(TimeUnit.SECONDS)
122+
.convertDurationsTo(TimeUnit.MILLISECONDS)
123+
.filter(MetricFilter.ALL)
124+
.build(system)
125+
126+
reporter.start(reportInterval, TimeUnit.MILLISECONDS)
127+
128+
system.registerOnTermination(new Runnable {
129+
override def run = reporter.stop()
130+
})
131+
}
132+
106133
val reporter = system.settings.config.getString(GEARPUMP_METRIC_REPORTER)
107134

108135
LOG.info(s"Metrics reporter is enabled, using $reporter reporter")
109136

110137
reporter match {
111138
case "graphite" => startGraphiteReporter
112139
case "logfile" => startSlf4jReporter
140+
case "akka" => startAkkaReporter
113141
case other =>
114142
LOG.error(s"Metrics reporter will be disabled, as we cannot recognize reporter: $other")
115143
}

services/dashboard/index.html

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@
145145

146146
<!-- app -->
147147
<script src="scripts/app.js"></script>
148+
<script src="scripts/services.js"></script>
149+
<script src="scripts/controllers.js"></script>
148150
<script src="scripts/app-01.js"></script>
149151
<script src="scripts/app-02.js"></script>
150152
<script src="scripts/app-03.js"></script>

services/dashboard/scripts/app.js

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,4 @@ angular.module('app', [
4848
controller: 'app03Ctrl'
4949
}).otherwise({redirectTo: '/cluster'});
5050

51-
})
52-
.controller('navigationCtrl', function($scope, $location){
53-
54-
$scope.navCollapsed = true;
55-
56-
$scope.toggleNav = function(){
57-
$scope.navCollapsed = !$scope.navCollapsed;
58-
};
59-
60-
$scope.$on('$routeChangeStart', function() {
61-
$scope.navCollapsed = true;
62-
});
63-
64-
$scope.navClass = function(page) {
65-
var currentRoute = $location.path().substring(1) || 'Applications';
66-
return page === currentRoute || new RegExp(page).test(currentRoute) ? 'active' : '';
67-
};
68-
6951
});
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Copyright (c) 2013, Sebastian Sdorra
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in
14+
* all copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
'use strict';
25+
26+
angular.module('app')
27+
.controller('navigationCtrl', function($scope, $location){
28+
$scope.navCollapsed = true;
29+
30+
$scope.toggleNav = function(){
31+
$scope.navCollapsed = !$scope.navCollapsed;
32+
};
33+
34+
$scope.$on('$routeChangeStart', function() {
35+
$scope.navCollapsed = true;
36+
});
37+
38+
$scope.navClass = function(page) {
39+
var currentRoute = $location.path().substring(1) || 'Applications';
40+
return page === currentRoute || new RegExp(page).test(currentRoute) ? 'active' : '';
41+
};
42+
});

0 commit comments

Comments
 (0)