6
6
import six
7
7
8
8
class SeriesHelper (object ):
9
+ '''
10
+ Subclassing this helper eases writing data points in bulk.
11
+ All data points are immutable, insuring they do not get overwritten.
12
+ Each subclass can write to its own database.
13
+ The time series names can also be based on one or more defined fields.
14
+
15
+ Annotated example:
16
+ ```
17
+ class MySeriesHelper(SeriesHelper):
18
+ class Meta:
19
+ # Meta class stores time series helper configuration.
20
+ client = TestSeriesHelper.client
21
+ # The client should be an instance of InfluxDBClient.
22
+ series_name = 'events.stats.{server_name}'
23
+ # The series name must be a string. Add dependent field names in curly brackets.
24
+ fields = ['time', 'server_name']
25
+ # Defines all the fields in this time series.
26
+ bulk_size = 5
27
+ # Defines the number of data points to store prior to writing on the wire.
28
+
29
+ # The following will create *five* (immutable) data points.
30
+ # Since bulk_size is set to 5, upon the fifth construction call, *all* data
31
+ # points will be written on the wire via MySeriesHelper.Meta.client.
32
+ MySeriesHelper(server_name='us.east-1', time=159)
33
+ MySeriesHelper(server_name='us.east-1', time=158)
34
+ MySeriesHelper(server_name='us.east-1', time=157)
35
+ MySeriesHelper(server_name='us.east-1', time=156)
36
+ MySeriesHelper(server_name='us.east-1', time=155)
37
+
38
+ # To manually submit data points which are not yet written, call commit:
39
+ MySeriesHelper.commit()
40
+
41
+ # To inspect the JSON which will be written, call _json_body_():
42
+ MySeriesHelper._json_body_()
43
+ ```
44
+ '''
9
45
__initialized__ = False
10
46
11
47
def __new__ (cls , * args , ** kwargs ):
48
+ '''
49
+ Initializes class attributes for subsequent constructor calls.
50
+ '''
12
51
if not SeriesHelper .__initialized__ :
13
52
SeriesHelper .__initialized__ = True
14
- # Introspect series representation.
15
53
try :
16
54
_meta = getattr (cls , 'Meta' )
17
55
except AttributeError :
18
- raise AttributeError ('SeriesHelper {} does not contain a Meta class.' .format (cls .__name__ ))
56
+ raise AttributeError ('Missing Meta class in {} .' .format (cls .__name__ ))
19
57
20
- for attribute in ['series_name' , 'fields' , 'client' ]:
58
+ for attr in ['series_name' , 'fields' , 'client' ]:
21
59
try :
22
- setattr (cls , '_' + attribute , getattr (_meta , attribute ))
60
+ setattr (cls , '_' + attr , getattr (_meta , attr ))
23
61
except AttributeError :
24
- raise AttributeError ('SeriesHelper \' {0} Meta class does not define {1} .' .format (cls .__name__ , attribute ))
62
+ raise AttributeError ('Missing {} in {} Meta class .' .format (attr , cls .__name__ ))
25
63
26
64
cls ._bulk_size = getattr (_meta , 'bulk_size' , 1 )
27
-
28
- # Class attribute definitions
29
- cls ._datapoints = defaultdict (list ) # keys are the series name for ease of commit.
65
+
66
+ cls ._datapoints = defaultdict (list )
30
67
cls ._type = namedtuple (cls .__name__ , cls ._fields )
31
68
32
69
return super (SeriesHelper , cls ).__new__ (cls , * args , ** kwargs )
33
70
34
- def __init__ (self , ** kwargs ): # Does not support positional arguments.
71
+ def __init__ (self , ** kw ):
72
+ '''
73
+ Constructor call creates a new data point. All fields must be present.
74
+ :note: Data points written when `bulk_size` is reached per Helper.
75
+ :warning: Data points are *immutable* (`namedtuples`).
76
+ '''
35
77
cls = self .__class__
36
78
37
- if sorted (cls ._fields ) != sorted (kwargs .keys ()):
38
- raise NameError ('Expected fields {0} and got {1}.' .format (', ' . join ( sorted ( cls ._fields )), ', ' . join ( sorted ( kwargs . keys ()) )))
79
+ if sorted (cls ._fields ) != sorted (kw .keys ()):
80
+ raise NameError ('Expected {0}, got {1}.' .format (cls ._fields , kw . keys ()))
39
81
40
- cls ._datapoints [cls ._series_name .format (** kwargs )].append (cls ._type (** kwargs ))
82
+ cls ._datapoints [cls ._series_name .format (** kw )].append (cls ._type (** kw ))
41
83
42
- if len (cls ._datapoints ) > cls ._bulk_size :
84
+ if len (cls ._datapoints ) >= cls ._bulk_size :
43
85
cls .commit ()
44
86
45
87
@classmethod
46
88
def commit (cls ):
47
89
'''
48
90
Commit everything from datapoints via the client.
91
+ :return result of client.write_points.
49
92
'''
50
93
rtn = cls ._client .write_points (cls ._json_body_ ())
51
94
cls ._reset_ ()
@@ -56,14 +99,17 @@ def _json_body_(cls):
56
99
'''
57
100
:return: JSON body of these datapoints.
58
101
'''
59
- json_datapoints = []
102
+ json = []
60
103
for series_name , data in six .iteritems (cls ._datapoints ):
61
- json_datapoints .append ({'name' : series_name ,
62
- 'columns' : cls ._fields ,
63
- 'points' : [[point .__dict__ [k ] for k in cls ._fields ] for point in data ]
64
- })
65
- return json_datapoints
104
+ json .append ({'name' : series_name ,
105
+ 'columns' : cls ._fields ,
106
+ 'points' : [[point .__dict__ [k ] for k in cls ._fields ] for point in data ]
107
+ })
108
+ return json
66
109
67
110
@classmethod
68
111
def _reset_ (cls ):
112
+ '''
113
+ Reset data storage.
114
+ '''
69
115
cls ._datapoints = defaultdict (list )
0 commit comments