diff --git a/influxdb/graphite.py b/influxdb/graphite.py new file mode 100644 index 00000000..2bc76f44 --- /dev/null +++ b/influxdb/graphite.py @@ -0,0 +1,440 @@ +# -*- python -*- + +import time +# from . import util +# from util import tree +# from . import metadata # XXX add metdata +# from . import reader +from collections import deque +import fnmatch +import re + +# TODO +# XXX move away from pyKairosDB metadata, reader util. +# + +RETENTION_TAG = "gr-ret" # terse in order to save space on-disk +SECONDS_PER_MINUTE = 60 +SECONDS_PER_HOUR = SECONDS_PER_MINUTE * 60 +SECONDS_PER_DAY = SECONDS_PER_HOUR * 24 +SECONDS_PER_WEEK = SECONDS_PER_DAY * 7 +SECONDS_PER_MONTH = SECONDS_PER_DAY * 30 # OK, I'm approximating +SECONDS_PER_YEAR = SECONDS_PER_DAY * 365 + +INVALID_CHARS = re.compile(r'[^A-Za-z0-9-_.]') # not alpha, dash, underscore and period +RET_SEPERATOR_CHAR = "_" # Character we use to separate the retentions +RET_GRAPHITE_CHAR = ":" # Character graphite uses to separate the retentions + +def _graphite_metric_list_retentions(metric_list, storage_schemas): + """:type metric_list: list + :param metric_list: a list of lists/tuples, each one is the standard graphite format of metrics + + :type storage_schemas: list + :param storage_schemas: list of carbon.stroage.Schema objects which will be matched against + each of the graphite metrics and used to tag each metric. + + """ + def get_retentions(metric_name): + for s in storage_schemas: + if s.test(metric_name): + return _input_retention_resolution(s.options['retentions'].split(',')) + retentions = [ get_retentions(m[0])[1].replace(RET_GRAPHITE_CHAR, RET_SEPERATOR_CHAR) for m in metric_list ] + return retentions + +# how carbon writes to influxdb +def graphite_metric_list_to_influxdb_list(metric_list, tags): + """The term "tags" here comes from kairosdb. I don't believe that + influxdb uses the term tag, but we're going to use the moral + equivalent of a tag. + + :type metric_list: list + :param metric_list: A list of lists/tuples, each one being the standard graphite formatting of metrics + + :type tags: dict + :param tags: a dict of name: value pairs that will be recorded as tags + + :rtype: list + :return: List of dicts formatted appropriately for influxdb + + Use this for entering a large set of metrics that have the same set of tags (e.g. same retentions, etc.) + """ + return [graphite_metric_to_influxdb(m, tags=tags) for m in metric_list] + +# how graphite will write lots of points to influxdb +def graphite_metric_list_with_retentions_to_influxdb_list(metric_list, storage_schemas, pervasive_tags={}): + """ + :type metric_list: list + :param metric_list: A list of line-formatted lists or tuples, each one being the standard graphite formatting of metrics + + :type storage_schemas: list + :param storage_schemas: A list of carbon.storage.Schema objects to be matched against. + + :type pervasive_tags: dict + :param pervasive_tags: tags that will be applied to (almost) all metrics. This won't override the + retentions configuration. + + :rtype: generator + :return: generator of dicts formatted appropriately for influxdb + + Use this for entering a large set of metrics that have disparate retentions. The expected + way to call this from a sender is to first call _graphite_metric_list_retentions() + + XXX this API is getting messy - it should be simpler. -PN + """ + # Generate a lazy list of retentions that will match + retentions_generator = _graphite_metric_list_retentions(metric_list, storage_schemas) + # print retentions_generator.next() + for m,r in zip(metric_list, retentions_generator): # XXX use itertools' izip? + tags = {} + if len(pervasive_tags) > 0: + tags.update(pervasive_tags) + tags[RETENTION_TAG] = r + yield graphite_metric_to_influxdb(m, tags=tags) + + +# XXX metadata +# def _fnmatch_expand_graphite_wildcard_metric_name(conn, name, cache_ttl=60): +# """ +# :type conn: influxdb.InfluxDBClient +# :param conn: the connection to the database +# +# :type name: string +# :param name: the graphite-like name which can include ".*." to provide wildcard expansion +# +# :type cache_ttl: int +# :param cache_ttl: how often to update the cache from KairosDB, in seconds +# +# :rtype: list +# :return: a list of unicode strings. Each unicode string contains an expanded metric name. +# +# KairosDB doesn't currently support wildcards, so get all metric +# names and expand them. +# +# Currently only ".*." or "\*." or ".\*" expansions are supported. +# Substring expansions aren't supported at this time. +# +# Graphite-web uses fnmatch or something similar, perhaps this +# should provide a list and re-use the same functionality. +# +# This function caches the created tree for cache_ttl seconds and +# refreshes when the cache has aged beyond the cache_ttl. +# """ +# all_metric_name_list = metadata.get_all_metric_names(conn) +# return [ n for n in all_metric_name_list if fnmatch.fnmatch(n, name) ] + +# XXX until I've fixed the reliance on util. +# def expand_graphite_wildcard_metric_name(conn, name, cache_ttl=60): +# """ +# :type conn: pyKairosDB.KairosDBConnection +# :param conn: the connection to the database +# +# :type name: string +# :param name: the graphite-like name which can include ".*." to provide wildcard expansion +# +# :type cache_ttl: int +# :param cache_ttl: how often to update the cache from KairosDB, in seconds +# +# :rtype: list +# :return: a list of unicode strings. Each unicode string contains an expanded metric name. +# +# KairosDB doesn't currently support wildcards, so get all metric +# names and expand them. +# +# Currently only ".*." or "\*." or ".\*" expansions are supported. +# Substring expansions aren't supported at this time. +# +# Graphite-web uses fnmatch or something similar, perhaps this +# should provide a list and re-use the same functionality. +# +# This function caches the created tree for cache_ttl seconds and +# refreshes when the cache has aged beyond the cache_ttl. +# """ +# +# if "*" not in name: +# return [u'{0}'.format(name)] +# if "." in name: +# name_list = [ u'{0}'.format(n) for n in name.split(".")] +# else: +# name_list = [ name ] +# # print "Name_list is {0}".format(name_list) +# ts = expand_graphite_wildcard_metric_name.cache_timestamp +# cache_tree = expand_graphite_wildcard_metric_name.cache_tree +# if ts == 0 or (time.time() - ts > cache_ttl): +# all_metric_name_list = metadata.get_all_metric_names(conn) +# cache_tree = tree() +# _make_graphite_name_cache(cache_tree, all_metric_name_list) +# expand_graphite_wildcard_metric_name.cache_tree = cache_tree +# expand_graphite_wildcard_metric_name.cache_timestamp = time.time() +# if name == "*": # special case for the root of the tree: +# return cache_tree.keys() +# if '*' in name and not '.' in name: +# return [ ctk for ctk in cache_tree.keys() if fnmatch.fnmatch(ctk, name)] +# expanded_name_list = util.metric_name_wildcard_expansion(cache_tree, name_list) +# # print "expanded_name_list is {0}".format(expanded_name_list) +# +# return_list = [ ".".join(en) for en in expanded_name_list] +# return list(set(return_list)) +# +# expand_graphite_wildcard_metric_name.cache_tree = tree() +# expand_graphite_wildcard_metric_name.cache_timestamp = 0 + + +def leaf_or_branch(conn, name): + """ + :type conn: pyKairosDB.KairosDBConnection + :param conn: Connection to the pyrosdb + + :type name: string + :param name: The metric name or part of a name that we're checking for + + :rtype: str + :return: Either the string "leaf" or "branch" + + Graphite wants to know if a name is a "leaf" or a "branch" in + its ultimate storage location that is, whether or not it can be + traversed further + """ + # print "Trying to expand the name {0}".format(name) + if name.endswith('*'): + wildcard_expansion = expand_graphite_wildcard_metric_name(conn, name) + else: + wildcard_expansion = expand_graphite_wildcard_metric_name(conn, name + ".*") + + if len(wildcard_expansion) == 1 and wildcard_expansion[0] == name: + return "leaf" + elif len(wildcard_expansion) > 0: + return "branch" + else: + return "leaf" + + +# XXX fix along with util +# def _make_graphite_name_cache(cache_tree, list_of_names): +# """ :type cache_tree: defaultdict +# :param cache_tree: a defaultdict initialized with the tree() function. Contains names +# of entries in the kairosdb, separated by "." per the graphite convention. +# +# :type list_of_names: list +# :param list_of_names: list of strings, in order, that will be sought after in the cache tree. +# +# Given a list of names - all name - that kairosdb has, make a +# tree of all those names. +# """ +# for n in list_of_names: +# util._add_to_cache(cache_tree, n.split('.')) + +def graphite_metric_to_influxdb(metric, tags): + """:type metric: tuple + :param metric: tuple of ("metric_name", timestamp, value) + + :type tags: dict + :param tags: a dict of {name: value} strings that will be recorded as tags + + :rtype: dict + :return: Re-formatted dict appropriate for influxdb + + Write graphite metrics to influxdb. + + Graphite metrics are a tuple with a metric name, a timestamp, and + a value, and they have a storage schema attached, which specifies + the time period which should be used for that metric. This must + be recorded in the columns for graphite querying to work + + { + "name": "some.graphite.metric", + "columns": [ + "time", "value", "gr-ret" + ], + "points": [ + [1387998050, 200.0, "60s_90d" ] + ] + } + + + Even though influxdb uses a 64-bit long int, the python API here expects + a float, as returned by time.time(). This module handles + converting this when the data is written and read, and doesn't + make the user deal with this conversion. + + Influxdb only allows alphanumeric and the following punctuation + characters (see http://influxdb.org/docs/api/http.html) + + "-", "_", and ".". + + Graphite is less restrictive. Anything that doesn't match the + above are converted to an underscore. + + TODO: Keep an index of metric names that already have updates, and + update that metric instead of creating another metric. + + For now this is naive. + """ + converted_metric_name = INVALID_CHARS.sub(RET_SEPERATOR_CHAR, metric[0]) + if len(tags) > 0: + (c, p) = zip(*tags.items()) + columns_list = list(c) + points_list = list(p) + else: + columns_list = [] + points_list = [] + columns_list.extend(("time", "value",)) + points_list.append(int(metric[1] * 1000)) # Deal with time as a longing counting ms since the epoch instead of seconds + points_list.append(metric[2]) + + return { + "name" : converted_metric_name, + "columns" : columns_list, + "points" : [points_list] + } + + +def seconds_from_retention_tag(tag_value, sep=RET_GRAPHITE_CHAR): + """:type tag_value: str + :param tag_value: the retention info tag + + :rtype: int + :return: Number of seconds for the given tag value + + The retention tag is a colon-separated resolution_retention period + when it's input, taken from the carbon storage-schemas.conf. When + reading from kairosdb, the ':' is not a legal character to have in + a tag, so we input them with an underscore instead. That + separator is configurable so this can be used on tags that are + queried as well. + + For this function we're not worried about the retention period, we + just care about the resolution of the data. + + So get the first part of it, and expand the number of seconds + so we can make a valid comparison. + + """ + resolution, _ = tag_value.split(sep) + if resolution[-1].lower() == "s": + return int(resolution[:-1]) + elif resolution[-1].lower() == "m": + return int(resolution[:-1]) * SECONDS_PER_MINUTE + elif resolution[-1].lower() == "h": + return int(resolution[:-1]) * SECONDS_PER_HOUR + elif resolution[-1].lower() == "d": + return int(resolution[:-1]) * SECONDS_PER_DAY + elif resolution[-1].lower() == "w": + return int(resolution[:-1]) * SECONDS_PER_WEEK + elif resolution[-1].lower() == "y": + return int(resolution[:-1]) * SECONDS_PER_YEAR + else: # Seconds is the default + return int(resolution) + +def _input_retention_resolution(retention_string_list): + """ + :type retention_string_list: list + :param retention_string_list: A list of strings containing retention definitions + + :rtype: tuple + :return: A tuple containing the (number_of_seconds, highest_resoultion_retention_string), + which is an (int, str) + + When inputting data, we want the highest resolution data - the + actual metrics can be summarized later based on the same policy or + a different one if that works better. + """ + all_tags_set = [t for t in retention_string_list] + return min([(seconds_from_retention_tag(tag), tag) for tag in all_tags_set])# return the highest resolution + + +# XXX: util +# def _lowest_resolution_retention(data, name): +# """ +# :type data: requests.response.content +# :param data: Content from the KairosDB query +# +# :type name: str +# :param name: The name of the metric whose retention will be extracted. +# +# :rtype: int +# :return: The number of seconds in the lowest-resolution retention period in the data +# +# Graphite needs data to be divided into even time slices. We +# must store the slice information when writing data so that it can +# be read out here. +# +# The relevant tags are in the README.md for this project. +# """ +# values = util.get_content_values_by_name(data, name) +# all_tags_set = set() # easiest case - all tags are the same, otherwise we use the set +# for result in values: +# all_tags_set.update(util.get_matching_tags_from_result(result, RETENTION_TAG)) +# return max([ seconds_from_retention_tag(tag, RET_SEPERATOR_CHAR) for tag in all_tags_set])# return the lowest resolution + +def read_absolute(conn, metric_name, start_time, end_time): + """ + :type conn: pyKairosDB.KairosDBConnection + :param conn: The connection to KairosDB + + :type metric_name: string + :param metric_name: The name of the metric to query (graphite does one at a time, though KairosDB can do more) + + :type start_time: float + :param start_time: The float representing the number of seconds since the epoch that this query starts at. + + :type end_time: float + :param end_time: The float representing the number of seconds since the epoch that this query endsa at. + + :rtype: tuple + :return: 2-element tuple - ((start_time, end_time, interval), list_of_metric_values). Graphite wants evenly-spaced metrics, + and None for any interval that doesn't have data. It infers the time for each update by the order and place of each + value provided. + + This function returns the values being queried, in the format that the graphite-web app requires. + """ + def cache_query(): + def cache_query_closure(query_dict): + reader.cache_time(10, query_dict) + return cache_query_closure + tags = conn.read_absolute([metric_name], start_time, end_time, + query_modifying_function=cache_query(), + only_read_tags=True) + + interval_seconds = _lowest_resolution_retention(tags, metric_name) + def modify_query(): + def modify_query_closure(query_dict): + group_by = reader.default_group_by() + group_by["range_size"] = { "value" : interval_seconds, "unit" : "seconds"} + aggregator = reader.default_aggregator() + aggregator["sampling"] = group_by["range_size"] + reader.group_by([group_by], query_dict["metrics"][0]) + reader.aggregation([aggregator], query_dict["metrics"][0]) + return modify_query_closure + # now that we've gotten the tags and have set the retention time, get data + content = conn.read_absolute([metric_name], start_time, end_time, + query_modifying_function=modify_query()) + return_list = list() + if len(content['queries'][0]['results']) > 0: + # by_interval_dict = dict([(v[1], v[0]) for v in content["queries"][0]["results"][0]["values"] ]) + value_deque = deque(content["queries"][0]["results"][0]["values"]) + slots = list() + for slot_begin in range(start_time, end_time, interval_seconds): + slot_buffer = list() + slot_end = slot_begin + interval_seconds + slots.append((slot_begin, slot_end)) + try: + if slot_end < value_deque[0][0]: # we haven't caught up with the beginning of the deque + return_list.append(None) + continue + if slot_begin > value_deque[-1][0]: # We have nothing more of value + return_list.append(None) + continue + if len(value_deque) == 0: + return_list.append(None) + continue + while slot_begin <= value_deque[0][0] < slot_end: + slot_buffer.append(value_deque.popleft()[1]) + except IndexError: + return_list.append(None) + if len(slot_buffer) < 1: + return_list.append(None) + else: + return_list.append(sum(slot_buffer)/len(slot_buffer)) # take the average of the points for this slot + else: + return_list = [ None for n in range(start_time, end_time, interval_seconds)] + return ((start_time, end_time, interval_seconds), return_list) diff --git a/tests/influxdb/graphite_test.py b/tests/influxdb/graphite_test.py new file mode 100644 index 00000000..b199028b --- /dev/null +++ b/tests/influxdb/graphite_test.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +""" +unit tests +""" +import json +import itertools +import time +import requests +from nose.tools import raises +from mock import patch +import re +import pprint + +from influxdb import InfluxDBClient +import influxdb.graphite as graphite + +USER='testuser' +PASS='testpassword' +PORT=8086 +DBNAME='graphite' +HOST='10.255.0.2' +TEST_METRIC_NAME='peter.test.metric' +TEST_METRIC_NAME2='foo.test.metric' + +TEST_METRIC_NAME_LIST=[TEST_METRIC_NAME, TEST_METRIC_NAME2] + +retention = "60m:1y" +fixed_ret = "60m_1y" +month_ret = "5m:30d" +fixed_month_ret = "5m_1m" + +retention_tags={"gr-ret":fixed_ret} + +class PatternSchema(object): + """A work-alike for the patternschema in carbon""" + def __init__(self, name, pattern, retentions): + self.name = name + self.pattern = pattern + self.regex = re.compile(pattern) + self.options = { 'retentions' : retentions } # comma-separated string + + def test(self, metric): + return self.regex.search(metric) + + def matches(self, metric): + return bool( self.test(metric) ) + + +schema_list = [PatternSchema(retention, "peter.*", retention), + PatternSchema(month_ret, "foo.*", month_ret)] + +def test_single_metric_json(): + "No connection, just tests json" + now = time.time() + db_name = "graphite" + test_metric = (TEST_METRIC_NAME, now, now) + + influxdb_metric = graphite.graphite_metric_to_influxdb(test_metric, retention_tags) + + expected_result = {'columns': ['time', 'value', 'gr-ret'], + 'name': TEST_METRIC_NAME, + 'points': [[int(now * 1000), now, '60m_1y']]} # * 1000 for ms since the epoch + assert (set(influxdb_metric['columns']) == set(expected_result['columns'])) + assert (set(influxdb_metric['points'][0]) == set(expected_result['points'][0])) + assert (set(influxdb_metric['name']) == set(expected_result['name'])) + assert (influxdb_metric.keys() == expected_result.keys()) + +def test_send_to_influxdb(): + conn = InfluxDBClient(HOST, PORT, USER, PASS, DBNAME) + now = time.time() + db_name = "graphite" + test_metric_list = (TEST_METRIC_NAME, now, now) + test_metric_list_tags = + influxdb_metric = graphite.graphite_metric_list_to_influxdb_list(test_metric, retention_tags) + + print (influxdb_metric) + url = "{0}/db/{1}/series?u={2}&p={3}&time_precision={4}".format(conn._baseurl, + conn._database, + conn._username, + conn._password, + 'm') + print url + response = requests.post(url, data=json.dumps([influxdb_metric])) + return response + print dir(response) + print response + conn.write_points_with_precision(influxdb_metric, 'm') + +def test_example_send_to_influxdb(): + conn = InfluxDBClient(HOST, PORT, USER, PASS, DBNAME) + conn.switch_db(DBNAME) + now = time.time() + db_name = "graphite" + test_metric = (TEST_METRIC_NAME, now, now) + influxdb_metric = [ { "name": "response_times", + "columns": ["time", "value"], + "points": [ [1382819388, 234.3], + [1382819389, 120.1], + [1382819380, 340.9] + ] } ] + + print (influxdb_metric) + url = "{0}/db/{1}/series?u={2}&p={3}&time_precision={4}".format(conn._baseurl, + conn._database, + conn._username, + conn._password, + 'm') + print url + response = requests.post(url, data=json.dumps([influxdb_metric])) + return response + + +def test_metric_list_with_retentions_json(): + "No connection, just tests json" + now = time.time() + nowlist = [ now + n for n in range(5)] + db_name = "graphite" + retentions = itertools.cycle(schema_list) + metrics = itertools.cycle(TEST_METRIC_NAME_LIST) + test_metric_list = zip(*[[(metrics.next(), n, n,), retentions.next()] for n in nowlist]) + + influxdb_metric_generator = graphite.graphite_metric_list_with_retentions_to_influxdb_list( + test_metric_list[0], test_metric_list[1]) + pprint.pprint( list(influxdb_metric_generator)) + + # expected_result = {'columns': ['time', 'value', 'gr-ret'], + # 'name': TEST_METRIC_NAME, + # 'points': [[int(now * 1000), now, '60m_1y']]} # * 1000 for ms since the epoch + # assert (set(influxdb_metric['columns']) == set(expected_result['columns'])) + # assert (set(influxdb_metric['points'][0]) == set(expected_result['points'][0])) + # assert (set(influxdb_metric['name']) == set(expected_result['name'])) + # assert (influxdb_metric.keys() == expected_result.keys())