Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Convert dataframes directly to line protocol during 'write_points()' operation #363

Closed
mdbartos opened this issue Aug 22, 2016 · 4 comments

Comments

@mdbartos
Copy link
Contributor

mdbartos commented Aug 22, 2016

The current dataframe client's write_points method is probably not as efficient as it could be: the dataframe is first converted to a json-like dict with _convert_dataframe_to_json, and then written to line protocol with make_lines. Converting the dataframe to line protocol directly would avoid a lot of overhead associated with iterating through dicts.

This could be done by implementing a simple _convert_dataframe_to_lines method in the dataframe client that takes advantage of pandas' vectorized string methods. This could then be passed to the write method directly.

def _convert_dataframe_to_lines(self, dataframe, measurement, field_columns=[],
                                tag_columns=[], global_tags={},
                                time_precision=None, numeric_precision=None):

    if not isinstance(dataframe, pd.DataFrame):
        raise TypeError('Must be DataFrame, but type was: {0}.'
                        .format(type(dataframe)))
    if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
            isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
        raise TypeError('Must be DataFrame with DatetimeIndex or \
                        PeriodIndex.')

    string_columns = dataframe.select_dtypes(include=['object']).columns

    if field_columns is None:
        field_columns = []
    if tag_columns is None:
        tag_columns = []

    field_columns = list(field_columns) if list(field_columns) else []
    tag_columns = list(tag_columns) if list(tag_columns) else []

    # Assume that all columns not listed as tag columns are field columns
    if not field_columns:
        field_columns = list(set(dataframe.columns).difference(set(tag_columns)))

    precision_factor = {
        "n": 1,
        "u": 1e3,
        "ms": 1e6,
        "s": 1e9,
        "m": 1e9 * 60,
        "h": 1e9 * 3600,
    }.get(time_precision, 1)


    # Make array of timestamp ints
    time = ((dataframe.index.astype(int) / precision_factor)
            .astype(int).astype(str))

    # If tag columns exist, make an array of formatted tag keys and values
    if tag_columns:
        tag_df = dataframe[tag_columns]
        tag_df = self._stringify_dataframe(tag_df, numeric_precision)
        tags = (',' + ((tag_df.columns + '=').tolist() + tag_df)).sum(axis=1)
        del tag_df

    else:
        tags = ''

    # Make an array of formatted field keys and values
    field_df = dataframe[field_columns]
    field_df = self._stringify_dataframe(field_df, numeric_precision)
    field_df[string_columns] = '"' + field_df[string_columns] + '"'
    field_df = (field_df.columns + '=').tolist() + field_df
    field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
    fields = field_df.sum(axis=1)
    del field_df

    # Add any global tags to formatted tag strings
    if global_tags:
        global_tags = ','.join(['='.join([tag, global_tags[tag]])
                            for tag in global_tags])
        if tag_columns:
            tags = tags + ',' + global_tags
        else:
            tags = ',' + global_tags

    # Generate line protocol string
    points = (measurement +  tags + ' ' +
            fields + ' ' + time).tolist()
    return points

def _stringify_dataframe(self, dataframe, numeric_precision):
    float_columns = dataframe.select_dtypes(include=['floating']).columns
    nonfloat_columns = dataframe.columns[
        ~dataframe.columns.isin(float_columns)
    ]
    numeric_columns = dataframe.select_dtypes(include=['number']).columns

    # Convert dataframe to string
    if numeric_precision is None:
        dataframe = dataframe.astype(str)
    elif numeric_precision == 'full':
        dataframe[float_columns] = dataframe[float_columns].applymap(repr)
        dataframe[nonfloat_columns] = dataframe[nonfloat_columns].astype(str)
    elif isinstance(numeric_precision, int):
        dataframe[numeric_columns] = (dataframe[numeric_columns]
                                    .round(numeric_precision))
        dataframe = dataframe.astype(str)
    else:
        raise ValueError('Invalid numeric precision')
    dataframe.columns = dataframe.columns.astype(str)
    return dataframe

Here's an example usage:

>>> print(df)
                                     color  value  status
2016-08-12 02:17:28.191727141+00:00    red    9.1       1
2016-08-12 02:17:51.082619738+00:00   blue    8.3       0
2016-08-18 07:05:10.979251109+00:00  green    2.0       0
2016-08-18 07:11:11.434227816+00:00  green    1.0       0
2016-08-09 21:48:05.394423800+00:00   blue    1.0       0
2016-08-09 21:48:16.696269144+00:00   blue    1.3       1
2016-08-09 21:48:28.603980937+00:00   blue    0.9       0
2016-08-09 22:01:56.629486728+00:00   blue    0.8       1
2016-08-09 22:02:24.966032304+00:00   blue    0.7       0
2016-08-09 22:16:46.645324098+00:00   blue    1.9       0

>>> _convert_dataframe_to_lines(None, dataframe=df, measurement='var1', tag_columns=['color', 'status'], field_columns=['value'], global_tags={'global':'tag'})

b'var1,color=red,status=1,global=tag value=9.1 1470968248191727104\nvar1,color=blue,status=0,global=tag value=8.3 1470968271082619648\nvar1,color=green,status=0,global=tag value=2.0 1471503910979251200\nvar1,color=green,status=0,global=tag value=1.0 1471504271434227712\nvar1,color=blue,status=0,global=tag value=1.0 1470779285394423808\nvar1,color=blue,status=1,global=tag value=1.3 1470779296696269056\nvar1,color=blue,status=0,global=tag value=0.9 1470779308603981056\nvar1,color=blue,status=1,global=tag value=0.8 1470780116629486848\nvar1,color=blue,status=0,global=tag value=0.7 1470780144966032384\nvar1,color=blue,status=0,global=tag value=1.9 1470781006645324032\n'

This approach also uses the tag_columns functionality I proposed in #362

This implementation could use some more thorough testing for edge cases, but I can start a PR if anyone is interested.

@mdbartos
Copy link
Contributor Author

mdbartos commented Aug 26, 2016

Tested this approach. It's about 5x faster than the current method when float precision is ignored, and roughly 3x faster when float precision is controlled (i.e. such that the output is exactly the same as the output produced under the current method).

The following test uses a dataframe of 1 million entries divided into 5 columns.

import influxdb
import pandas as pd
import numpy as np

client = influxdb.DataFrameClient()
df = pd.DataFrame(np.random.random(1000000).reshape(-1,5)) 
df.index = pd.date_range(start='1970-01-01', periods=len(df), freq='s') 
df.rename(columns=(lambda x: 'var_' + str(x)), inplace=True)

def line_interface(measurement, field_columns, tag_columns,
                   numeric_precision='full'):
    L = client._convert_dataframe_to_lines(df, measurement, field_columns=field_columns, tag_columns=tag_columns, numeric_precision=numeric_precision)
    L = ('\n'.join(L)) + '\n'
    return L

def json_interface(measurement, field_columns, tag_columns):
    J = client._convert_dataframe_to_json(df, measurement, field_columns=field_columns, tag_columns=tag_columns)
    J = {'points' : J}
    L = influxdb.line_protocol.make_lines(J)
    return L


# Direct line protocol approach
In [2]: %timeit line_method = line_interface('test_measurement', ['var_1'], list(df.co
   ...: lumns[1:]), numeric_precision=None)
1 loop, best of 3: 1.93 s per loop

# Direct line protocol approach with precision control (output is exactly equal to current approach)
In [3]: %timeit line_method = line_interface('test_measurement', ['var_1'], list(df.co
   ...: lumns[1:]), numeric_precision='full')
1 loop, best of 3: 3.18 s per loop

# Current json protocol approach
In [4]: %timeit json_method = json_interface('test_measurement', ['var_1'], list(df.co
   ...: lumns[1:]))
1 loop, best of 3: 10.9 s per loop

Note that the implementation in the previous post has been updated.

@aviau
Copy link
Collaborator

aviau commented Aug 29, 2016

Can you please open a pull request?

@mdbartos
Copy link
Contributor Author

PR opened as #364

@mdbartos mdbartos closed this as completed Sep 6, 2016
@mdbartos
Copy link
Contributor Author

mdbartos commented Sep 6, 2016

Addressed in #364

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants