Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Support for 'tag columns' in dataframe client #362

Closed
mdbartos opened this issue Aug 22, 2016 · 3 comments
Closed

Support for 'tag columns' in dataframe client #362

mdbartos opened this issue Aug 22, 2016 · 3 comments

Comments

@mdbartos
Copy link
Contributor

(Looked around and couldn't find this issue, so I'm posting here)

The current dataframe client allows tags to be specified in its write_points method through the tags keyword argument. However, it seems that the current 'tags' keyword argument applies the tags globally to each datapoint. It would be useful to allow certain columns to be treated as tags in the write_points method through a tag_columns keyword argument. In other words, columns in the tag_columns list would be treated as tags, while all other columns are treated as fields.

One big advantage of this approach is that it makes the dataframe client's read and write methods symmetrical. The dataframe client's query method currently returns dataframes with tags included as columns. Allowing tag columns to be specified in the write_points method would make it easy to copy one database to another by calling the query and write methods in succession.

Fortunately, the fix for this is super easy. All that would need to be done is to make the following changes to _convert_dataframe_to_json:

def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
                              tag_columns=[], time_precision=None):

    if not isinstance(dataframe, pd.DataFrame):
        raise TypeError('Must be DataFrame, but type was: {0}.'
                        .format(type(dataframe)))
    if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
            isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
        raise TypeError('Must be DataFrame with DatetimeIndex or \
                        PeriodIndex.')

    # Make sure tags and tag columns are correctly typed
    tag_columns = tag_columns if tag_columns else []
    tags = tags if tags else {}

    # Assume field columns are all columns not included in tag columns
    rec_columns = list(set(dataframe.columns).difference(set(tag_columns)))

    dataframe.index = dataframe.index.to_datetime()
    if dataframe.index.tzinfo is None:
        dataframe.index = dataframe.index.tz_localize('UTC')

    # Convert column to strings
    dataframe.columns = dataframe.columns.astype('str')

    # Convert dtype for json serialization
    dataframe = dataframe.astype('object')

    precision_factor = {
        "n": 1,
        "u": 1e3,
        "ms": 1e6,
        "s": 1e9,
        "m": 1e9 * 60,
        "h": 1e9 * 3600,
    }.get(time_precision, 1)

    points = [
        {'measurement': measurement,
            'tags': dict(list(tag.items()) + list(tags.items())),
            'fields': rec,
            'time': int(ts.value / precision_factor)
            }
        for ts, tag, rec in zip(dataframe.index,
                                dataframe[tag_columns].to_dict('record'),
                                dataframe[rec_columns].to_dict('record'))]
    return points

This implementation allows columns specified in the tag_columns keyword argument to be treated as tags instead of fields. It also maintains backwards compatibility with the previous tags behavior (applying tags globally to each row of the dataframe).*

Here's an example usage:

Example:

>>> print(df)
                                     color  value  status
2016-08-12 02:17:28.191727141+00:00    red    9.1       0
2016-08-12 02:17:51.082619738+00:00   blue    8.3       1
2016-08-09 21:48:05.394423800+00:00   blue    1.0       1
2016-08-09 21:48:16.696269144+00:00   blue    1.3       0
2016-08-09 21:48:28.603980937+00:00   blue    0.9       1
2016-08-09 22:01:56.629486728+00:00   blue    0.8       1
2016-08-09 22:02:24.966032304+00:00   blue    0.7       0
2016-08-09 22:16:46.645324098+00:00   blue    1.9       1
2016-08-18 07:05:10.979251109+00:00  green    2.0       1
2016-08-18 07:11:11.434227816+00:00  green    1.0       0

>>> _convert_dataframe_to_json(None, dataframe=df, measurement='var1', tag_columns=['color', 'status'], tags={'global':'tag'}

[{'fields': {'value': 9.1},
  'measurement': 'var1',
  'tags': {'color': 'red', 'global': 'tag', 'status': 0},
  'time': 1470968248191727104},
 {'fields': {'value': 8.3},
  'measurement': 'var1',
  'tags': {'color': 'blue', 'global': 'tag', 'status': 1},
  'time': 1470968271082619648},
 {'fields': {'value': 1.0},
  'measurement': 'var1',
  'tags': {'color': 'blue', 'global': 'tag', 'status': 1},
  'time': 1470779285394423808},
 {'fields': {'value': 1.3},
  'measurement': 'var1',
  'tags': {'color': 'blue', 'global': 'tag', 'status': 0},
  'time': 1470779296696269056},
 {'fields': {'value': 0.9},
  'measurement': 'var1',
  'tags': {'color': 'blue', 'global': 'tag', 'status': 1},
  'time': 1470779308603981056},
 {'fields': {'value': 0.8},
  'measurement': 'var1',
  'tags': {'color': 'blue', 'global': 'tag', 'status': 1},
  'time': 1470780116629486848},
 {'fields': {'value': 0.7},
  'measurement': 'var1',
  'tags': {'color': 'blue', 'global': 'tag', 'status': 0},
  'time': 1470780144966032384},
 {'fields': {'value': 1.9},
  'measurement': 'var1',
  'tags': {'color': 'blue', 'global': 'tag', 'status': 1},
  'time': 1470781006645324032},
 {'fields': {'value': 2.0},
  'measurement': 'var1',
  'tags': {'color': 'green', 'global': 'tag', 'status': 1},
  'time': 1471503910979251200},
 {'fields': {'value': 1.0},
  'measurement': 'var1',
  'tags': {'color': 'green', 'global': 'tag', 'status': 0},
  'time': 1471504271434227712}]

I can start a PR if anyone is interested.

  • That said, the current dataframe write operation is probably not as efficient as it could be: the dataframe is first converted to a json-like dict, and then written to line protocol. Converting the dataframe to line protocol directly would avoid a lot of overhead associated with iterating through dicts. I'll open a separate issue on this.
@PaulKuiper
Copy link

+1. The current code is slow. I like this approach.

@ikding
Copy link

ikding commented Aug 25, 2016

+1 on specifying the tag columns directly!

@kaiseu
Copy link

kaiseu commented Sep 14, 2017

it seems if tag_columns is not specified it would failed:
File "C:\Python27\lib\site-packages\influxdb_dataframe_client.py", line 125, in write_points
field_columns=field_columns)
File "C:\Python27\lib\site-packages\influxdb_dataframe_client.py", line 220, in _convert_dataframe_to_json
dataframe[field_columns].to_dict('record'))
File "C:\Python27\lib\site-packages\pandas\core\frame.py", line 1958, in getitem
return self._getitem_array(key)
File "C:\Python27\lib\site-packages\pandas\core\frame.py", line 2002, in _getitem_array
indexer = self.loc._convert_to_indexer(key, axis=1)
File "C:\Python27\lib\site-packages\pandas\core\indexing.py", line 1231, in _convert_to_indexer
raise KeyError('%s not in index' % objarr[mask])
KeyError: '[0] not in index'

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants