Skip to content

Commit 6b312cd

Browse files
committed
Rework push gateway code.
1 parent a9ae378 commit 6b312cd

File tree

4 files changed

+136
-60
lines changed

4 files changed

+136
-60
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ from prometheus_client import CollectorRegistry,Gauge,push_to_gateway
256256
registry = CollectorRegistry()
257257
g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry)
258258
g.set_to_current_time()
259-
push_to_gateway(registry, job='batchA')
259+
push_to_gateway('localhost:9091', job='batchA', registry=registry)
260260
```
261261

262262
A separate registry is used, as the default registry may contain other metrics

prometheus_client/__init__.py

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,6 @@
44
from . import exposition
55
from . import process_collector
66

7-
try:
8-
from urllib2 import urlopen, quote
9-
except ImportError:
10-
# Python 3
11-
from urllib.request import urlopen
12-
from urllib.parse import quote
13-
14-
157
__all__ = ['Counter', 'Gauge', 'Summary', 'Histogram']
168
# http://stackoverflow.com/questions/19913653/no-unicode-in-all-for-a-packages-init
179
__all__ = [n.encode('ascii') for n in __all__]
@@ -29,38 +21,9 @@
2921
MetricsHandler = exposition.MetricsHandler
3022
start_http_server = exposition.start_http_server
3123
write_to_textfile = exposition.write_to_textfile
32-
33-
34-
def build_pushgateway_url(job, instance=None, host='localhost', port=9091):
35-
'''
36-
Build a valid pushgateway url
37-
'''
38-
39-
if instance:
40-
instancestr = '/instances/{}'.format(instance)
41-
else:
42-
instancestr = ''
43-
44-
url = 'http://{}:{}/metrics/jobs/{}{}'.format(host, port,
45-
quote(job),
46-
quote(instancestr))
47-
return url
48-
49-
50-
def push_to_gateway_url(url, registry, timeout=None):
51-
'''Push metrics to the given url'''
52-
53-
resp = urlopen(url, data=generate_latest(registry), timeout=timeout)
54-
if resp.code >= 400:
55-
raise IOError("error pushing to pushgateway: {0} {1}".format(
56-
resp.code, resp.msg))
57-
58-
59-
def push_to_gateway(registry, job, instance=None, host='localhost', port=9091, timeout=None):
60-
'''Push metrics to a pushgateway'''
61-
62-
url = build_pushgateway_url(job, instance, host, port)
63-
push_to_gateway_url(url, registry, timeout)
24+
push_to_gateway = exposition.push_to_gateway
25+
pushadd_to_gateway = exposition.pushadd_to_gateway
26+
delete_from_gateway = exposition.delete_from_gateway
6427

6528
ProcessCollector = process_collector.ProcessCollector
6629
PROCESS_COLLECTOR = process_collector.PROCESS_COLLECTOR

prometheus_client/exposition.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@
1010
try:
1111
from BaseHTTPServer import BaseHTTPRequestHandler
1212
from BaseHTTPServer import HTTPServer
13+
from urllib2 import build_opener, Request, HTTPHandler
14+
from urllib import quote_plus
1315
except ImportError:
1416
# Python 3
1517
unicode = str
1618
from http.server import BaseHTTPRequestHandler
1719
from http.server import HTTPServer
20+
from urllib.request import build_opener, Request, HTTPHandler
21+
from urllib.parse import quote_plus
1822

1923

2024
CONTENT_TYPE_LATEST = 'text/plain; version=0.0.4; charset=utf-8'
@@ -72,3 +76,48 @@ def write_to_textfile(path, registry):
7276
f.write(generate_latest(registry))
7377
# rename(2) is atomic.
7478
os.rename(tmppath, path)
79+
80+
81+
def push_to_gateway(gateway, job, registry=core.REGISTRY, grouping_key=None, timeout=None):
82+
'''Push metrics to the given pushgateway.
83+
84+
This overwrites all metrics with the same job and grouping_key.
85+
This uses the PUT HTTP method.'''
86+
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout)
87+
88+
89+
def pushadd_to_gateway(gateway, job, registry=core.REGISTRY, grouping_key=None, timeout=None):
90+
'''PushAdd metrics to the given pushgateway.
91+
92+
This replaces metrics with the same name, job and grouping_key.
93+
This uses the POST HTTP method.'''
94+
_use_gateway('POST', gateway, job, registry, grouping_key, timeout)
95+
96+
97+
def delete_from_gateway(gateway, job, grouping_key=None, timeout=None):
98+
'''Delete metrics from the given pushgateway.
99+
100+
This deletes metrics with the given job and grouping_key.
101+
This uses the DELETE HTTP method.'''
102+
_use_gateway('DELETE', gateway, job, None, grouping_key, timeout)
103+
104+
105+
def _use_gateway(method, gateway, job, registry, grouping_key, timeout):
106+
url = 'http://{0}/job/{1}'.format(gateway, quote_plus(job))
107+
108+
data = b''
109+
if method != 'DELETE':
110+
data = generate_latest(registry)
111+
112+
if grouping_key is None:
113+
grouping_key = {}
114+
url = url + ''.join(['/{0}/{1}'.format(quote_plus(str(k)), quote_plus(str(v)))
115+
for k, v in sorted(grouping_key.items())])
116+
117+
request = Request(url, data=data)
118+
request.add_header('Content-Type', CONTENT_TYPE_LATEST)
119+
request.get_method = lambda: method
120+
resp = build_opener(HTTPHandler).open(request, timeout=timeout)
121+
if resp.code >= 400:
122+
raise IOError("error talking to pushgateway: {0} {1}".format(
123+
resp.code, resp.msg))

tests/test_client.py

Lines changed: 83 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,22 @@
11
from __future__ import unicode_literals
22
import os
3+
import threading
34
import unittest
45

56

67
from prometheus_client import Gauge, Counter, Summary, Histogram, Metric
78
from prometheus_client import CollectorRegistry, generate_latest, ProcessCollector
8-
from prometheus_client import build_pushgateway_url
9+
from prometheus_client import push_to_gateway, pushadd_to_gateway, delete_from_gateway
10+
from prometheus_client import CONTENT_TYPE_LATEST
11+
12+
try:
13+
from BaseHTTPServer import BaseHTTPRequestHandler
14+
from BaseHTTPServer import HTTPServer
15+
except ImportError:
16+
# Python 3
17+
from http.server import BaseHTTPRequestHandler
18+
from http.server import HTTPServer
19+
920

1021

1122
class TestCounter(unittest.TestCase):
@@ -373,24 +384,77 @@ def test_working_fake_pid(self):
373384
self.assertEqual(None, self.registry.get_sample_value('process_fake_namespace'))
374385

375386

376-
class TestBuildPushgatewayUrl(unittest.TestCase):
377-
def test_job_instance(self):
378-
expected = 'http://localhost:9091/metrics/jobs/foojob/instances/fooinstance'
379-
380-
url = build_pushgateway_url('foojob', 'fooinstance')
381-
self.assertEqual(url, expected)
382-
383-
def test_host_port(self):
384-
expected = 'http://foohost:9092/metrics/jobs/foojob'
385-
386-
url = build_pushgateway_url('foojob', host='foohost', port=9092)
387-
self.assertEqual(url, expected)
388-
389-
def test_url_escaping(self):
390-
expected = 'http://localhost:9091/metrics/jobs/foo%20job'
391-
392-
url = build_pushgateway_url('foo job')
393-
self.assertEqual(url, expected)
387+
class TestPushGateway(unittest.TestCase):
388+
def setUp(self):
389+
self.registry = CollectorRegistry()
390+
self.counter = Gauge('g', 'help', registry=self.registry)
391+
self.requests = requests = []
392+
class TestHandler(BaseHTTPRequestHandler):
393+
def do_PUT(self):
394+
self.send_response(201)
395+
length = int(self.headers['content-length'])
396+
requests.append((self, self.rfile.read(length)))
397+
398+
do_POST = do_PUT
399+
do_DELETE = do_PUT
400+
401+
httpd = HTTPServer(('', 0), TestHandler)
402+
self.address = ':'.join([str(x) for x in httpd.server_address])
403+
class TestServer(threading.Thread):
404+
def run(self):
405+
httpd.handle_request()
406+
self.server = TestServer()
407+
self.server.daemon = True
408+
self.server.start()
409+
410+
def test_push(self):
411+
push_to_gateway(self.address, "my_job", self.registry)
412+
self.assertEqual(self.requests[0][0].command, 'PUT')
413+
self.assertEqual(self.requests[0][0].path, '/job/my_job')
414+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
415+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
416+
417+
def test_push_with_groupingkey(self):
418+
push_to_gateway(self.address, "my_job", self.registry, {'a': 9})
419+
self.assertEqual(self.requests[0][0].command, 'PUT')
420+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9')
421+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
422+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
423+
424+
def test_push_with_complex_groupingkey(self):
425+
push_to_gateway(self.address, "my_job", self.registry, {'a': 9, 'b': 'a/ z'})
426+
self.assertEqual(self.requests[0][0].command, 'PUT')
427+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9/b/a%2F+z')
428+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
429+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
430+
431+
def test_pushadd(self):
432+
pushadd_to_gateway(self.address, "my_job", self.registry)
433+
self.assertEqual(self.requests[0][0].command, 'POST')
434+
self.assertEqual(self.requests[0][0].path, '/job/my_job')
435+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
436+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
437+
438+
def test_pushadd_with_groupingkey(self):
439+
pushadd_to_gateway(self.address, "my_job", self.registry, {'a': 9})
440+
self.assertEqual(self.requests[0][0].command, 'POST')
441+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9')
442+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
443+
self.assertEqual(self.requests[0][1], b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
444+
445+
def test_delete(self):
446+
delete_from_gateway(self.address, "my_job")
447+
self.assertEqual(self.requests[0][0].command, 'DELETE')
448+
self.assertEqual(self.requests[0][0].path, '/job/my_job')
449+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
450+
self.assertEqual(self.requests[0][1], b'')
451+
452+
def test_pushadd_with_groupingkey(self):
453+
delete_from_gateway(self.address, "my_job", {'a': 9})
454+
self.assertEqual(self.requests[0][0].command, 'DELETE')
455+
self.assertEqual(self.requests[0][0].path, '/job/my_job/a/9')
456+
self.assertEqual(self.requests[0][0].headers.get('content-type'), CONTENT_TYPE_LATEST)
457+
self.assertEqual(self.requests[0][1], b'')
394458

395459

396460
if __name__ == '__main__':

0 commit comments

Comments
 (0)