3
3
DataFrame client for InfluxDB
4
4
"""
5
5
import math
6
- import warnings
6
+
7
+ import pandas as pd
7
8
8
9
from .client import InfluxDBClient
9
10
10
- import pandas as pd
11
+
12
+ def _pandas_time_unit (time_precision ):
13
+ unit = time_precision
14
+ if time_precision == 'm' :
15
+ unit = 'ms'
16
+ elif time_precision == 'u' :
17
+ unit = 'us'
18
+ elif time_precision == 'n' :
19
+ unit = 'ns'
20
+ assert unit in ('s' , 'ms' , 'us' , 'ns' )
21
+ return unit
11
22
12
23
13
24
class DataFrameClient (InfluxDBClient ):
@@ -19,112 +30,128 @@ class DataFrameClient(InfluxDBClient):
19
30
20
31
EPOCH = pd .Timestamp ('1970-01-01 00:00:00.000+00:00' )
21
32
22
- def write_points (self , data , * args , ** kwargs ):
33
+ def write_points (self , dataframe , measurement , tags = None ,
34
+ time_precision = None , database = None , retention_policy = None ,
35
+ batch_size = None ):
23
36
"""
24
37
Write to multiple time series names.
25
38
26
- :param data: A dictionary mapping series names to pandas DataFrames
27
- :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
28
- or 'u'.
39
+ :param dataframe: data points in a DataFrame
40
+ :param measurement: name of measurement
41
+ :param tags: dictionary of tags, with string key-values
42
+ :param time_precision: [Optional, default 's'] Either 's', 'ms', 'u'
43
+ or 'n'.
29
44
:param batch_size: [Optional] Value to write the points in batches
30
45
instead of all at one time. Useful for when doing data dumps from
31
46
one database to another or when doing a massive write operation
32
47
:type batch_size: int
33
- """
34
48
35
- batch_size = kwargs .get ('batch_size' )
36
- time_precision = kwargs .get ('time_precision' , 's' )
49
+ """
37
50
if batch_size :
38
- kwargs .pop ('batch_size' ) # don't hand over to InfluxDBClient
39
- for key , data_frame in data .items ():
40
- number_batches = int (math .ceil (
41
- len (data_frame ) / float (batch_size )))
42
- for batch in range (number_batches ):
43
- start_index = batch * batch_size
44
- end_index = (batch + 1 ) * batch_size
45
- data = [self ._convert_dataframe_to_json (
46
- name = key ,
47
- dataframe = data_frame .ix [start_index :end_index ].copy (),
48
- time_precision = time_precision )]
49
- super (DataFrameClient , self ).write_points (data ,
50
- * args , ** kwargs )
51
+ number_batches = int (math .ceil (
52
+ len (dataframe ) / float (batch_size )))
53
+ for batch in range (number_batches ):
54
+ start_index = batch * batch_size
55
+ end_index = (batch + 1 ) * batch_size
56
+ points = self ._convert_dataframe_to_json (
57
+ dataframe .ix [start_index :end_index ].copy (),
58
+ measurement ,
59
+ tags
60
+ )
61
+ super (DataFrameClient , self ).write_points (
62
+ points , time_precision , database , retention_policy )
51
63
return True
52
64
else :
53
- data = [self ._convert_dataframe_to_json (
54
- name = key , dataframe = dataframe , time_precision = time_precision )
55
- for key , dataframe in data .items ()]
56
- return super (DataFrameClient , self ).write_points (data ,
57
- * args , ** kwargs )
58
-
59
- def write_points_with_precision (self , data , time_precision = 's' ):
60
- """
61
- DEPRECATED. Write to multiple time series names
65
+ points = self ._convert_dataframe_to_json (
66
+ dataframe , measurement , tags
67
+ )
68
+ super (DataFrameClient , self ).write_points (
69
+ points , time_precision , database , retention_policy )
70
+ return True
62
71
63
- """
64
- warnings .warn (
65
- "write_points_with_precision is deprecated, and will be removed "
66
- "in future versions. Please use "
67
- "``DataFrameClient.write_points(time_precision='..')`` instead." ,
68
- FutureWarning )
69
- return self .write_points (data , time_precision = 's' )
70
-
71
- def query (self , query , time_precision = 's' , chunked = False , database = None ):
72
+ def query (self , query , chunked = False , database = None ):
72
73
"""
73
74
Quering data into a DataFrame.
74
75
75
- :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
76
- or 'u'.
77
76
:param chunked: [Optional, default=False] True if the data shall be
78
77
retrieved in chunks, False otherwise.
79
78
80
79
"""
81
80
results = super (DataFrameClient , self ).query (query , database = database )
82
- if len (results ) > 0 :
83
- return self ._to_dataframe (results , time_precision )
81
+ if query .upper ().startswith ("SELECT" ):
82
+ if len (results ) > 0 :
83
+ return self ._to_dataframe (results )
84
+ else :
85
+ return {}
84
86
else :
85
87
return results
86
88
87
- def _to_dataframe (self , json_result , time_precision ):
88
- dataframe = pd .DataFrame (data = json_result ['points' ],
89
- columns = json_result ['columns' ])
90
- if 'sequence_number' in dataframe :
91
- dataframe .sort (['time' , 'sequence_number' ], inplace = True )
89
+ def get_list_series (self , database = None ):
90
+ """
91
+ Get the list of series, in DataFrame
92
+
93
+ """
94
+ results = super (DataFrameClient , self )\
95
+ .query ("SHOW SERIES" , database = database )
96
+ if len (results ):
97
+ return dict (
98
+ (key [0 ], pd .DataFrame (data )) for key , data in results .items ()
99
+ )
92
100
else :
93
- dataframe .sort (['time' ], inplace = True )
94
- pandas_time_unit = time_precision
95
- if time_precision == 'm' :
96
- pandas_time_unit = 'ms'
97
- elif time_precision == 'u' :
98
- pandas_time_unit = 'us'
99
- dataframe .index = pd .to_datetime (list (dataframe ['time' ]),
100
- unit = pandas_time_unit ,
101
- utc = True )
102
- del dataframe ['time' ]
103
- return dataframe
104
-
105
- def _convert_dataframe_to_json (self , dataframe , name , time_precision = 's' ):
101
+ return {}
102
+
103
+ def _to_dataframe (self , rs ):
104
+ result = {}
105
+ for key , data in rs .items ():
106
+ name , tags = key
107
+ if tags is None :
108
+ key = name
109
+ else :
110
+ key = (name , tuple (sorted (tags .items ())))
111
+ df = pd .DataFrame (data )
112
+ df .time = pd .to_datetime (df .time )
113
+ df .set_index ('time' , inplace = True )
114
+ df .index = df .index .tz_localize ('UTC' )
115
+ df .index .name = None
116
+ result [key ] = df
117
+ return result
118
+
119
+ def _convert_dataframe_to_json (self , dataframe , measurement , tags = None ):
120
+
106
121
if not isinstance (dataframe , pd .DataFrame ):
107
122
raise TypeError ('Must be DataFrame, but type was: {}.'
108
123
.format (type (dataframe )))
109
124
if not (isinstance (dataframe .index , pd .tseries .period .PeriodIndex ) or
110
125
isinstance (dataframe .index , pd .tseries .index .DatetimeIndex )):
111
126
raise TypeError ('Must be DataFrame with DatetimeIndex or \
112
127
PeriodIndex.' )
128
+
113
129
dataframe .index = dataframe .index .to_datetime ()
114
130
if dataframe .index .tzinfo is None :
115
131
dataframe .index = dataframe .index .tz_localize ('UTC' )
116
- dataframe ['time' ] = [self ._datetime_to_epoch (dt , time_precision )
117
- for dt in dataframe .index ]
118
- data = {'name' : name ,
119
- 'columns' : [str (column ) for column in dataframe .columns ],
120
- 'points' : list ([list (x ) for x in dataframe .values ])}
121
- return data
132
+
133
+ # Convert column to strings
134
+ dataframe .columns = dataframe .columns .astype ('str' )
135
+
136
+ # Convert dtype for json serialization
137
+ dataframe = dataframe .astype ('object' )
138
+
139
+ points = [
140
+ {'name' : measurement ,
141
+ 'tags' : tags if tags else {},
142
+ 'fields' : rec ,
143
+ 'timestamp' : ts .isoformat ()
144
+ }
145
+ for ts , rec in zip (dataframe .index , dataframe .to_dict ('record' ))]
146
+ return points
122
147
123
148
def _datetime_to_epoch (self , datetime , time_precision = 's' ):
124
149
seconds = (datetime - self .EPOCH ).total_seconds ()
125
150
if time_precision == 's' :
126
151
return seconds
127
- elif time_precision == 'm' or time_precision == ' ms' :
128
- return seconds * 1000
152
+ elif time_precision == 'ms' :
153
+ return seconds * 10 ** 3
129
154
elif time_precision == 'u' :
130
- return seconds * 1000000
155
+ return seconds * 10 ** 6
156
+ elif time_precision == 'n' :
157
+ return seconds * 10 ** 9
0 commit comments