diff --git a/.travis.yml b/.travis.yml index 1aeb0949..868fe5cc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,8 +33,8 @@ notifications: sudo: false # Travis caching -cache: - directories: - - $HOME/.cache/pip -before_cache: - - rm -f $HOME/.cache/pip/log/debug.log +cache: false +# directories: +# - $HOME/.cache/pip +#before_cache: +# - rm -f $HOME/.cache/pip/log/debug.log diff --git a/influxdb/_dataframe_client.py b/influxdb/_dataframe_client.py index 2980be90..ddae0862 100644 --- a/influxdb/_dataframe_client.py +++ b/influxdb/_dataframe_client.py @@ -35,9 +35,18 @@ class DataFrameClient(InfluxDBClient): EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00') - def write_points(self, dataframe, measurement, tags=None, - time_precision=None, database=None, retention_policy=None, - batch_size=None): + def write_points(self, + dataframe, + measurement, + tags=None, + tag_columns=[], + field_columns=[], + time_precision=None, + database=None, + retention_policy=None, + batch_size=None, + protocol='line', + numeric_precision=None): """ Write to multiple time series names. @@ -50,27 +59,67 @@ def write_points(self, dataframe, measurement, tags=None, instead of all at one time. Useful for when doing data dumps from one database to another or when doing a massive write operation :type batch_size: int + :param protocol: Protocol for writing data. Either 'line' or 'json'. + :param numeric_precision: Precision for floating point values. + Either None, 'full' or some int, where int is the desired decimal + precision. 'full' preserves full precision for int and float + datatypes. Defaults to None, which preserves 14-15 significant + figures for float and all significant figures for int datatypes. """ if batch_size: - number_batches = int(math.ceil( - len(dataframe) / float(batch_size))) + number_batches = int(math.ceil(len(dataframe) / float(batch_size))) for batch in range(number_batches): start_index = batch * batch_size end_index = (batch + 1) * batch_size - points = self._convert_dataframe_to_json( - dataframe.ix[start_index:end_index].copy(), - measurement, tags, time_precision - ) + if protocol == 'line': + points = self._convert_dataframe_to_lines( + dataframe.ix[start_index:end_index].copy(), + measurement=measurement, + global_tags=tags, + time_precision=time_precision, + tag_columns=tag_columns, + field_columns=field_columns, + numeric_precision=numeric_precision) + else: + points = self._convert_dataframe_to_json( + dataframe.ix[start_index:end_index].copy(), + measurement=measurement, + tags=tags, + time_precision=time_precision, + tag_columns=tag_columns, + field_columns=field_columns) super(DataFrameClient, self).write_points( - points, time_precision, database, retention_policy) + points, + time_precision, + database, + retention_policy, + protocol=protocol) return True else: - points = self._convert_dataframe_to_json( - dataframe, measurement, tags, time_precision - ) + if protocol == 'line': + points = self._convert_dataframe_to_lines( + dataframe, + measurement=measurement, + global_tags=tags, + tag_columns=tag_columns, + field_columns=field_columns, + time_precision=time_precision, + numeric_precision=numeric_precision) + else: + points = self._convert_dataframe_to_json( + dataframe, + measurement=measurement, + tags=tags, + time_precision=time_precision, + tag_columns=tag_columns, + field_columns=field_columns) super(DataFrameClient, self).write_points( - points, time_precision, database, retention_policy) + points, + time_precision, + database, + retention_policy, + protocol=protocol) return True def query(self, query, chunked=False, database=None): @@ -108,7 +157,12 @@ def _to_dataframe(self, rs): result[key] = df return result - def _convert_dataframe_to_json(self, dataframe, measurement, tags=None, + def _convert_dataframe_to_json(self, + dataframe, + measurement, + tags=None, + tag_columns=[], + field_columns=[], time_precision=None): if not isinstance(dataframe, pd.DataFrame): @@ -119,6 +173,15 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None, raise TypeError('Must be DataFrame with DatetimeIndex or \ PeriodIndex.') + # Make sure tags and tag columns are correctly typed + tag_columns = tag_columns if tag_columns else [] + field_columns = field_columns if field_columns else [] + tags = tags if tags else {} + # Assume field columns are all columns not included in tag columns + if not field_columns: + field_columns = list( + set(dataframe.columns).difference(set(tag_columns))) + dataframe.index = dataframe.index.to_datetime() if dataframe.index.tzinfo is None: dataframe.index = dataframe.index.tz_localize('UTC') @@ -140,13 +203,151 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None, points = [ {'measurement': measurement, - 'tags': tags if tags else {}, + 'tags': dict(list(tag.items()) + list(tags.items())), 'fields': rec, - 'time': int(ts.value / precision_factor) - } - for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))] + 'time': int(ts.value / precision_factor)} + for ts, tag, rec in zip(dataframe.index, + dataframe[tag_columns].to_dict('record'), + dataframe[field_columns].to_dict('record')) + ] + + return points + + def _convert_dataframe_to_lines(self, + dataframe, + measurement, + field_columns=[], + tag_columns=[], + global_tags={}, + time_precision=None, + numeric_precision=None): + + if not isinstance(dataframe, pd.DataFrame): + raise TypeError('Must be DataFrame, but type was: {0}.' + .format(type(dataframe))) + if not (isinstance(dataframe.index, pd.tseries.period.PeriodIndex) or + isinstance(dataframe.index, pd.tseries.index.DatetimeIndex)): + raise TypeError('Must be DataFrame with DatetimeIndex or \ + PeriodIndex.') + + # Create a Series of columns for easier indexing + column_series = pd.Series(dataframe.columns) + + if field_columns is None: + field_columns = [] + if tag_columns is None: + tag_columns = [] + + # Make sure field_columns and tag_columns are lists + field_columns = list(field_columns) if list(field_columns) else [] + tag_columns = list(tag_columns) if list(tag_columns) else [] + + # If field columns but no tag columns, assume rest of columns are tags + if field_columns and (not tag_columns): + tag_columns = list(column_series[~column_series.isin( + field_columns)]) + + # If no field columns, assume non-tag columns are fields + if not field_columns: + field_columns = list(column_series[~column_series.isin( + tag_columns)]) + + precision_factor = { + "n": 1, + "u": 1e3, + "ms": 1e6, + "s": 1e9, + "m": 1e9 * 60, + "h": 1e9 * 3600, + }.get(time_precision, 1) + + # Make array of timestamp ints + time = ((dataframe.index.to_datetime().values.astype(int) / + precision_factor).astype(int).astype(str)) + + # If tag columns exist, make an array of formatted tag keys and values + if tag_columns: + tag_df = dataframe[tag_columns] + tag_df = self._stringify_dataframe( + tag_df, numeric_precision, datatype='tag') + tags = (',' + ( + (tag_df.columns.values + '=').tolist() + tag_df)).sum(axis=1) + del tag_df + + else: + tags = '' + + # Make an array of formatted field keys and values + field_df = dataframe[field_columns] + field_df = self._stringify_dataframe( + field_df, numeric_precision, datatype='field') + field_df = (field_df.columns.values + '=').tolist() + field_df + field_df[field_df.columns[1:]] = ',' + field_df[field_df.columns[1:]] + fields = field_df.sum(axis=1) + del field_df + + # Add any global tags to formatted tag strings + if global_tags: + global_tags = ','.join(['='.join([tag, global_tags[tag]]) + for tag in global_tags]) + if tag_columns: + tags = tags + ',' + global_tags + else: + tags = ',' + global_tags + + # Generate line protocol string + points = (measurement + tags + ' ' + fields + ' ' + time).tolist() return points + def _stringify_dataframe(self, + dataframe, + numeric_precision, + datatype='field'): + + # Find int and string columns for field-type data + int_columns = dataframe.select_dtypes(include=['integer']).columns + string_columns = dataframe.select_dtypes(include=['object']).columns + + # Convert dataframe to string + if numeric_precision is None: + # If no precision specified, convert directly to string (fast) + dataframe = dataframe.astype(str) + elif numeric_precision == 'full': + # If full precision, use repr to get full float precision + float_columns = (dataframe.select_dtypes(include=['floating']) + .columns) + nonfloat_columns = dataframe.columns[~dataframe.columns.isin( + float_columns)] + dataframe[float_columns] = dataframe[float_columns].applymap(repr) + dataframe[nonfloat_columns] = (dataframe[nonfloat_columns] + .astype(str)) + elif isinstance(numeric_precision, int): + # If precision is specified, round to appropriate precision + float_columns = (dataframe.select_dtypes(include=['floating']) + .columns) + nonfloat_columns = dataframe.columns[~dataframe.columns.isin( + float_columns)] + dataframe[float_columns] = (dataframe[float_columns] + .round(numeric_precision)) + # If desired precision is > 10 decimal places, need to use repr + if numeric_precision > 10: + dataframe[float_columns] = (dataframe[float_columns] + .applymap(repr)) + dataframe[nonfloat_columns] = (dataframe[nonfloat_columns] + .astype(str)) + else: + dataframe = dataframe.astype(str) + else: + raise ValueError('Invalid numeric precision.') + + if datatype == 'field': + # If dealing with fields, format ints and strings correctly + dataframe[int_columns] = dataframe[int_columns] + 'i' + dataframe[string_columns] = '"' + dataframe[string_columns] + '"' + + dataframe.columns = dataframe.columns.astype(str) + return dataframe + def _datetime_to_epoch(self, datetime, time_precision='s'): seconds = (datetime - self.EPOCH).total_seconds() if time_precision == 'h': diff --git a/influxdb/client.py b/influxdb/client.py index b6bbc72f..40978dd3 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -252,16 +252,20 @@ def request(self, url, method='GET', params=None, data=None, else: raise InfluxDBClientError(response.content, response.status_code) - def write(self, data, params=None, expected_response_code=204): + def write(self, data, params=None, expected_response_code=204, + protocol='json'): """Write data to InfluxDB. :param data: the data to be written - :type data: dict + :type data: (if protocol is 'json') dict + (if protocol is 'line') sequence of line protocol strings :param params: additional parameters for the request, defaults to None :type params: dict :param expected_response_code: the expected response code of the write operation, defaults to 204 :type expected_response_code: int + :param protocol: protocol of input data, either 'json' or 'line' + :type protocol: str :returns: True, if the write operation is successful :rtype: bool """ @@ -274,11 +278,16 @@ def write(self, data, params=None, expected_response_code=204): else: precision = None + if protocol == 'json': + data = make_lines(data, precision).encode('utf-8') + elif protocol == 'line': + data = ('\n'.join(data) + '\n').encode('utf-8') + self.request( url="write", method='POST', params=params, - data=make_lines(data, precision).encode('utf-8'), + data=data, expected_response_code=expected_response_code, headers=headers ) @@ -351,11 +360,15 @@ def write_points(self, retention_policy=None, tags=None, batch_size=None, + protocol='json' ): """Write to multiple time series names. :param points: the list of points to be written in the database :type points: list of dictionaries, each dictionary represents a point + :type data: (if protocol is 'json') list of dicts, where each dict + represents a point. + (if protocol is 'line') sequence of line protocol strings. :param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None :type time_precision: str :param database: the database to write the points to. Defaults to @@ -373,6 +386,8 @@ def write_points(self, one database to another or when doing a massive write operation, defaults to None :type batch_size: int + :param protocol: Protocol for writing data. Either 'line' or 'json'. + :type protocol: str :returns: True, if the operation is successful :rtype: bool @@ -386,14 +401,14 @@ def write_points(self, time_precision=time_precision, database=database, retention_policy=retention_policy, - tags=tags) + tags=tags, protocol=protocol) return True else: return self._write_points(points=points, time_precision=time_precision, database=database, retention_policy=retention_policy, - tags=tags) + tags=tags, protocol=protocol) def _batches(self, iterable, size): for i in xrange(0, len(iterable), size): @@ -404,7 +419,8 @@ def _write_points(self, time_precision, database, retention_policy, - tags): + tags, + protocol='json'): if time_precision not in ['n', 'u', 'ms', 's', 'm', 'h', None]: raise ValueError( "Invalid time precision is given. " @@ -415,12 +431,15 @@ def _write_points(self, "InfluxDB only supports seconds precision for udp writes" ) - data = { - 'points': points - } + if protocol == 'json': + data = { + 'points': points + } - if tags is not None: - data['tags'] = tags + if tags is not None: + data['tags'] = tags + else: + data = points params = { 'db': database or self._database @@ -433,12 +452,13 @@ def _write_points(self, params['rp'] = retention_policy if self.use_udp: - self.send_packet(data) + self.send_packet(data, protocol=protocol) else: self.write( data=data, params=params, - expected_response_code=204 + expected_response_code=204, + protocol=protocol ) return True @@ -737,13 +757,19 @@ def get_list_privileges(self, username): text = "SHOW GRANTS FOR {0}".format(username) return list(self.query(text).get_points()) - def send_packet(self, packet): + def send_packet(self, packet, protocol='json'): """Send an UDP packet. :param packet: the packet to be sent - :type packet: dict + :type packet: (if protocol is 'json') dict + (if protocol is 'line') sequence of line protocol strings + :param protocol: protocol of input data, either 'json' or 'line' + :type protocol: str """ - data = make_lines(packet).encode('utf-8') + if protocol == 'json': + data = make_lines(packet).encode('utf-8') + elif protocol == 'line': + data = ('\n'.join(data) + '\n').encode('utf-8') self.udp_socket.sendto(data, (self._host, self.udp_port)) diff --git a/influxdb/tests/dataframe_client_test.py b/influxdb/tests/dataframe_client_test.py index 5b868d14..0b3b9b90 100644 --- a/influxdb/tests/dataframe_client_test.py +++ b/influxdb/tests/dataframe_client_test.py @@ -37,8 +37,8 @@ def test_write_points_from_dataframe(self): columns=["column_one", "column_two", "column_three"]) expected = ( - b"foo column_one=\"1\",column_three=1.0,column_two=1i 0\n" - b"foo column_one=\"2\",column_three=2.0,column_two=2i " + b"foo column_one=\"1\",column_two=1i,column_three=1.0 0\n" + b"foo column_one=\"2\",column_two=2i,column_three=2.0 " b"3600000000000\n" ) @@ -69,6 +69,135 @@ def test_write_points_from_dataframe_in_batches(self): cli = DataFrameClient(database='db') self.assertTrue(cli.write_points(dataframe, "foo", batch_size=1)) + def test_write_points_from_dataframe_with_tag_columns(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, 1.0], + ['red', 0, "2", 2, 2.0]], + index=[now, now + timedelta(hours=1)], + columns=["tag_one", "tag_two", "column_one", + "column_two", "column_three"]) + expected = ( + b"foo,tag_one=blue,tag_two=1 " + b"column_one=\"1\",column_two=1i,column_three=1.0 " + b"0\n" + b"foo,tag_one=red,tag_two=0 " + b"column_one=\"2\",column_two=2i,column_three=2.0 " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', + tag_columns=['tag_one', 'tag_two']) + self.assertEqual(m.last_request.body, expected) + + cli.write_points(dataframe, 'foo', + tag_columns=['tag_one', 'tag_two'], tags=None) + self.assertEqual(m.last_request.body, expected) + + def test_write_points_from_dataframe_with_tag_cols_and_global_tags(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, 1.0], + ['red', 0, "2", 2, 2.0]], + index=[now, now + timedelta(hours=1)], + columns=["tag_one", "tag_two", "column_one", + "column_two", "column_three"]) + expected = ( + b"foo,tag_one=blue,tag_two=1,global_tag=value " + b"column_one=\"1\",column_two=1i,column_three=1.0 " + b"0\n" + b"foo,tag_one=red,tag_two=0,global_tag=value " + b"column_one=\"2\",column_two=2i,column_three=2.0 " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', + tag_columns=['tag_one', 'tag_two'], + tags={'global_tag': 'value'}) + self.assertEqual(m.last_request.body, expected) + + def test_write_points_from_dataframe_with_tag_cols_and_defaults(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + dataframe = pd.DataFrame(data=[['blue', 1, "1", 1, 1.0, 'hot'], + ['red', 0, "2", 2, 2.0, 'cold']], + index=[now, now + timedelta(hours=1)], + columns=["tag_one", "tag_two", "column_one", + "column_two", "column_three", + "tag_three"]) + expected_tags_and_fields = ( + b"foo,tag_one=blue " + b"column_one=\"1\",column_two=1i " + b"0\n" + b"foo,tag_one=red " + b"column_one=\"2\",column_two=2i " + b"3600000000000\n" + ) + + expected_tags_no_fields = ( + b"foo,tag_one=blue,tag_two=1 " + b"column_one=\"1\",column_two=1i,column_three=1.0," + b"tag_three=\"hot\" 0\n" + b"foo,tag_one=red,tag_two=0 " + b"column_one=\"2\",column_two=2i,column_three=2.0," + b"tag_three=\"cold\" 3600000000000\n" + ) + + expected_fields_no_tags = ( + b"foo,tag_one=blue,tag_two=1,tag_three=hot " + b"column_one=\"1\",column_two=1i,column_three=1.0 " + b"0\n" + b"foo,tag_one=red,tag_two=0,tag_three=cold " + b"column_one=\"2\",column_two=2i,column_three=2.0 " + b"3600000000000\n" + ) + + expected_no_tags_no_fields = ( + b"foo " + b"tag_one=\"blue\",tag_two=1i,column_one=\"1\"," + b"column_two=1i,column_three=1.0,tag_three=\"hot\" " + b"0\n" + b"foo " + b"tag_one=\"red\",tag_two=0i,column_one=\"2\"," + b"column_two=2i,column_three=2.0,tag_three=\"cold\" " + b"3600000000000\n" + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + + cli.write_points(dataframe, 'foo', + field_columns=['column_one', 'column_two'], + tag_columns=['tag_one']) + self.assertEqual(m.last_request.body, expected_tags_and_fields) + + cli.write_points(dataframe, 'foo', + tag_columns=['tag_one', 'tag_two']) + self.assertEqual(m.last_request.body, expected_tags_no_fields) + + cli.write_points(dataframe, 'foo', + field_columns=['column_one', 'column_two', + 'column_three']) + self.assertEqual(m.last_request.body, expected_fields_no_tags) + + cli.write_points(dataframe, 'foo') + self.assertEqual(m.last_request.body, expected_no_tags_no_fields) + def test_write_points_from_dataframe_with_numeric_column_names(self): now = pd.Timestamp('1970-01-01 00:00+00:00') # df with numeric column names @@ -90,15 +219,60 @@ def test_write_points_from_dataframe_with_numeric_column_names(self): self.assertEqual(m.last_request.body, expected) + def test_write_points_from_dataframe_with_numeric_precision(self): + now = pd.Timestamp('1970-01-01 00:00+00:00') + # df with numeric column names + dataframe = pd.DataFrame(data=[["1", 1, 1.1111111111111], + ["2", 2, 2.2222222222222]], + index=[now, now + timedelta(hours=1)]) + + expected_default_precision = ( + b'foo,hello=there 0=\"1\",1=1i,2=1.11111111111 0\n' + b'foo,hello=there 0=\"2\",1=2i,2=2.22222222222 3600000000000\n' + ) + + expected_specified_precision = ( + b'foo,hello=there 0=\"1\",1=1i,2=1.1111 0\n' + b'foo,hello=there 0=\"2\",1=2i,2=2.2222 3600000000000\n' + ) + + expected_full_precision = ( + b'foo,hello=there 0=\"1\",1=1i,2=1.1111111111111 0\n' + b'foo,hello=there 0=\"2\",1=2i,2=2.2222222222222 3600000000000\n' + ) + + with requests_mock.Mocker() as m: + m.register_uri(requests_mock.POST, + "http://localhost:8086/write", + status_code=204) + + cli = DataFrameClient(database='db') + cli.write_points(dataframe, "foo", {"hello": "there"}) + + self.assertEqual(m.last_request.body, expected_default_precision) + + cli = DataFrameClient(database='db') + cli.write_points(dataframe, "foo", {"hello": "there"}, + numeric_precision=4) + + self.assertEqual(m.last_request.body, expected_specified_precision) + + cli = DataFrameClient(database='db') + cli.write_points(dataframe, "foo", {"hello": "there"}, + numeric_precision='full') + + self.assertEqual(m.last_request.body, expected_full_precision) + def test_write_points_from_dataframe_with_period_index(self): dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]], index=[pd.Period('1970-01-01'), pd.Period('1970-01-02')], columns=["column_one", "column_two", "column_three"]) + expected = ( - b"foo column_one=\"1\",column_three=1.0,column_two=1i 0\n" - b"foo column_one=\"2\",column_three=2.0,column_two=2i " + b"foo column_one=\"1\",column_two=1i,column_three=1.0 0\n" + b"foo column_one=\"2\",column_two=2i,column_three=2.0 " b"86400000000000\n" ) @@ -130,48 +304,48 @@ def test_write_points_from_dataframe_with_time_precision(self): cli.write_points(dataframe, measurement, time_precision='h') self.assertEqual(m.last_request.qs['precision'], ['h']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\nfoo ' - b'column_one="2",column_three=2.0,column_two=2i 1\n', + b'foo column_one="1",column_two=1i,column_three=1.0 0\nfoo ' + b'column_one="2",column_two=2i,column_three=2.0 1\n', m.last_request.body, ) cli.write_points(dataframe, measurement, time_precision='m') self.assertEqual(m.last_request.qs['precision'], ['m']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\nfoo ' - b'column_one="2",column_three=2.0,column_two=2i 60\n', + b'foo column_one="1",column_two=1i,column_three=1.0 0\nfoo ' + b'column_one="2",column_two=2i,column_three=2.0 60\n', m.last_request.body, ) cli.write_points(dataframe, measurement, time_precision='s') self.assertEqual(m.last_request.qs['precision'], ['s']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\nfoo ' - b'column_one="2",column_three=2.0,column_two=2i 3600\n', + b'foo column_one="1",column_two=1i,column_three=1.0 0\nfoo ' + b'column_one="2",column_two=2i,column_three=2.0 3600\n', m.last_request.body, ) cli.write_points(dataframe, measurement, time_precision='ms') self.assertEqual(m.last_request.qs['precision'], ['ms']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\nfoo ' - b'column_one="2",column_three=2.0,column_two=2i 3600000\n', + b'foo column_one="1",column_two=1i,column_three=1.0 0\nfoo ' + b'column_one="2",column_two=2i,column_three=2.0 3600000\n', m.last_request.body, ) cli.write_points(dataframe, measurement, time_precision='u') self.assertEqual(m.last_request.qs['precision'], ['u']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\nfoo ' - b'column_one="2",column_three=2.0,column_two=2i 3600000000\n', + b'foo column_one="1",column_two=1i,column_three=1.0 0\nfoo ' + b'column_one="2",column_two=2i,column_three=2.0 3600000000\n', m.last_request.body, ) cli.write_points(dataframe, measurement, time_precision='n') self.assertEqual(m.last_request.qs['precision'], ['n']) self.assertEqual( - b'foo column_one="1",column_three=1.0,column_two=1i 0\n' - b'foo column_one="2",column_three=2.0,column_two=2i ' + b'foo column_one="1",column_two=1i,column_three=1.0 0\n' + b'foo column_one="2",column_two=2i,column_three=2.0 ' b'3600000000000\n', m.last_request.body, )