Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Convert dataframes directly to line protocol during 'write_points()' operation #363

Closed
@mdbartos

Description

@mdbartos

The current dataframe client's write_points method is probably not as efficient as it could be: the dataframe is first converted to a json-like dict with _convert_dataframe_to_json, and then written to line protocol with make_lines. Converting the dataframe to line protocol directly would avoid a lot of overhead associated with iterating through dicts.

This could be done by implementing a simple _convert_dataframe_to_lines method in the dataframe client that takes advantage of pandas' vectorized string methods. This could then be passed to the write method directly.

def _convert_dataframe_to_lines(self, dataframe, measurement, field_columns=[],
                                tag_columns=[], global_tags={},
                                time_precision=None, numeric_precision=None):

    if not isinstance(dataframe, pd.DataFrame):
        raise TypeError('Must be DataFrame, but type was: {0}.'
                        .format(type(dataframe)))
    if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or
            isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)):
        raise TypeError('Must be DataFrame with DatetimeIndex or \
                        PeriodIndex.')

    string_columns = dataframe.select_dtypes(include=['object']).columns

    if field_columns is None:
        field_columns = []
    if tag_columns is None:
        tag_columns = []

    field_columns = list(field_columns) if list(field_columns) else []
    tag_columns = list(tag_columns) if list(tag_columns) else []

    # Assume that all columns not listed as tag columns are field columns
    if not field_columns:
        field_columns = list(set(dataframe.columns).difference(set(tag_columns)))

    precision_factor = {
        "n": 1,
        "u": 1e3,
        "ms": 1e6,
        "s": 1e9,
        "m": 1e9 * 60,
        "h": 1e9 * 3600,
    }.get(time_precision, 1)


    # Make array of timestamp ints
    time = ((dataframe.index.astype(int) / precision_factor)
            .astype(int).astype(str))

    # If tag columns exist, make an array of formatted tag keys and values
    if tag_columns:
        tag_df = dataframe[tag_columns]
        tag_df = self._stringify_dataframe(tag_df, numeric_precision)
        tags = (',' + ((tag_df.columns + '=').tolist() + tag_df)).sum(axis=1)
        del tag_df

    else:
        tags = ''

    # Make an array of formatted field keys and values
    field_df = dataframe[field_columns]
    field_df = self._stringify_dataframe(field_df, numeric_precision)
    field_df[string_columns] = '"' + field_df[string_columns] + '"'
    field_df = (field_df.columns + '=').tolist() + field_df
    field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]]
    fields = field_df.sum(axis=1)
    del field_df

    # Add any global tags to formatted tag strings
    if global_tags:
        global_tags = ','.join(['='.join([tag, global_tags[tag]])
                            for tag in global_tags])
        if tag_columns:
            tags = tags + ',' + global_tags
        else:
            tags = ',' + global_tags

    # Generate line protocol string
    points = (measurement +  tags + ' ' +
            fields + ' ' + time).tolist()
    return points

def _stringify_dataframe(self, dataframe, numeric_precision):
    float_columns = dataframe.select_dtypes(include=['floating']).columns
    nonfloat_columns = dataframe.columns[
        ~dataframe.columns.isin(float_columns)
    ]
    numeric_columns = dataframe.select_dtypes(include=['number']).columns

    # Convert dataframe to string
    if numeric_precision is None:
        dataframe = dataframe.astype(str)
    elif numeric_precision == 'full':
        dataframe[float_columns] = dataframe[float_columns].applymap(repr)
        dataframe[nonfloat_columns] = dataframe[nonfloat_columns].astype(str)
    elif isinstance(numeric_precision, int):
        dataframe[numeric_columns] = (dataframe[numeric_columns]
                                    .round(numeric_precision))
        dataframe = dataframe.astype(str)
    else:
        raise ValueError('Invalid numeric precision')
    dataframe.columns = dataframe.columns.astype(str)
    return dataframe

Here's an example usage:

>>> print(df)
                                     color  value  status
2016-08-12 02:17:28.191727141+00:00    red    9.1       1
2016-08-12 02:17:51.082619738+00:00   blue    8.3       0
2016-08-18 07:05:10.979251109+00:00  green    2.0       0
2016-08-18 07:11:11.434227816+00:00  green    1.0       0
2016-08-09 21:48:05.394423800+00:00   blue    1.0       0
2016-08-09 21:48:16.696269144+00:00   blue    1.3       1
2016-08-09 21:48:28.603980937+00:00   blue    0.9       0
2016-08-09 22:01:56.629486728+00:00   blue    0.8       1
2016-08-09 22:02:24.966032304+00:00   blue    0.7       0
2016-08-09 22:16:46.645324098+00:00   blue    1.9       0

>>> _convert_dataframe_to_lines(None, dataframe=df, measurement='var1', tag_columns=['color', 'status'], field_columns=['value'], global_tags={'global':'tag'})

b'var1,color=red,status=1,global=tag value=9.1 1470968248191727104\nvar1,color=blue,status=0,global=tag value=8.3 1470968271082619648\nvar1,color=green,status=0,global=tag value=2.0 1471503910979251200\nvar1,color=green,status=0,global=tag value=1.0 1471504271434227712\nvar1,color=blue,status=0,global=tag value=1.0 1470779285394423808\nvar1,color=blue,status=1,global=tag value=1.3 1470779296696269056\nvar1,color=blue,status=0,global=tag value=0.9 1470779308603981056\nvar1,color=blue,status=1,global=tag value=0.8 1470780116629486848\nvar1,color=blue,status=0,global=tag value=0.7 1470780144966032384\nvar1,color=blue,status=0,global=tag value=1.9 1470781006645324032\n'

This approach also uses the tag_columns functionality I proposed in #362

This implementation could use some more thorough testing for edge cases, but I can start a PR if anyone is interested.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions