Skip to content

Commit 0faa481

Browse files
author
quantmind
committed
adding multivariate analysis to columnts
1 parent 63ff741 commit 0faa481

File tree

4 files changed

+280
-13
lines changed

4 files changed

+280
-13
lines changed

stdnet/apps/columnts/lua/stats.lua

Lines changed: 176 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,66 @@
1-
local command = ARGV[1]
2-
local start = ARGV[2]
3-
local stop = ARGV[3]
4-
local num_fields = ARGV[4]
5-
local fields = table_slice(ARGV, 5, -1)
6-
local ts = columnts:new(KEYS[1])
1+
-- Initailize a vector with a value
2+
function init_vector(size, value)
3+
local vector = {}
4+
for i = 1, size do
5+
vector[i] = value
6+
end
7+
return vector
8+
end
9+
10+
function equal_vectors(v1, v2)
11+
if # v1 == # v2 then
12+
for i, v in ipairs(v1) do
13+
if v ~= v2[i] then
14+
return false
15+
end
16+
end
17+
return true
18+
else
19+
return false
20+
end
21+
end
22+
23+
-- vector1 += vector2
24+
function vector_sadd(vector1, vector2)
25+
for i, v in ipairs(vector1) do
26+
vector1[i] = v + vector2[i]
27+
end
28+
return vector1
29+
end
30+
31+
-- vector1 - vector2
32+
function vector_diff(vector1, vector2)
33+
local result = {}
34+
for i, v in ipairs(vector1) do
35+
result[i] = v - vector2[i]
36+
end
37+
return result
38+
end
39+
40+
-- Squared of a vector
41+
function vector_square(vector)
42+
local vector2 = {}
43+
local n = 0
44+
for i, v in ipairs(vector) do
45+
for j = 1, i do
46+
n = n + 1
47+
vector2[n] = v*vector[j]
48+
end
49+
end
50+
return vector2
51+
end
752

853
--
954
-- Calculate aggregate statistcs for a timeseries slice
10-
function stats(self, command, start, stop, fields)
11-
local time_values = self:range(command, start, stop, fields, true)
12-
local times, field_values = unpack(time_values)
55+
function uni_stats(serie)
56+
local times = serie.times
1357
local sts = {}
1458
local N = # times
1559
if N == 0 then
1660
return sts
1761
end
1862
local result = {start = times[1], stop = times[N], len = N, stats = sts}
19-
for field, values in pairs(field_values) do
63+
for field, values in pairs(serie.field_values) do
2064
local N = 0
2165
local min_val = 1.e10
2266
local max_val =-1.e10
@@ -57,6 +101,125 @@ function stats(self, command, start, stop, fields)
57101
return result
58102
end
59103

60-
local stats = flat_table(stats(ts, command, start, stop, fields))
61-
stats[4] = flat_table(stats[4])
62-
return stats
104+
105+
function add_field_names(key, field_values, serie_names)
106+
local fields = {}
107+
for field, values in pairs(field_values) do
108+
name = key .. ' @ ' .. field
109+
table.insert(fields, field)
110+
table.insert(serie_names, name)
111+
end
112+
return fields
113+
end
114+
115+
function add_cross_section(section, index, field_values, fields)
116+
local v
117+
for index, field in ipairs(fields) do
118+
v = field_values[field][index]
119+
if v == v then
120+
table.insert(section, v)
121+
else
122+
return nil
123+
end
124+
end
125+
return section
126+
end
127+
128+
function fields_and_times(series)
129+
local times
130+
local section
131+
local serie_names = {}
132+
local time_dict = {}
133+
-- Fill fields
134+
for i, serie in ipairs(series) do
135+
local fields = add_field_names(serie.key, serie.field_values, serie_names)
136+
if i == 1 then
137+
times = serie.times
138+
for i, time in ipairs(times) do
139+
time_dict[time] = add_cross_section({}, i, serie.field_values, fields)
140+
end
141+
else
142+
for i, time in ipairs(serie.times) do
143+
local section = time_dict[time]
144+
if section then
145+
time_dict[time] = add_cross_section(section, i, serie.field_values, fields)
146+
end
147+
end
148+
end
149+
end
150+
return {times=times, names=serie_names, time_dict=time_dict}
151+
end
152+
153+
--
154+
-- Calculate aggregate statistcs for a timeseries slice
155+
function multi_stats(series)
156+
local prev_section, section, section2, dsection
157+
local a = fields_and_times(series)
158+
local time_dict = a.time_dict
159+
local S = # a.names
160+
local T = S*(S+1)/2
161+
local N = 0
162+
local sum = init_vector(S, 0)
163+
local sum2 = init_vector(T, 0)
164+
local dsum = init_vector(S, 0)
165+
local dsum2 = init_vector(T, 0)
166+
for i, time in ipairs(a.times) do
167+
section = time_dict[time]
168+
if section then
169+
N = N + 1
170+
vector_sadd(sum, section)
171+
vector_sadd(sum2, vector_square(section))
172+
if prev_section then
173+
dsection = vector_diff(section, prev_section)
174+
prev_section = section
175+
vector_sadd(dsum, dsection)
176+
vector_sadd(dsum2, vector_square(dsection))
177+
end
178+
end
179+
end
180+
if N > 1 then
181+
return {fields = serie_names,
182+
npoints = N,
183+
sum = sum,
184+
sum2 = sum2,
185+
dsum = dsum,
186+
dsum2 = dsum2}
187+
end
188+
end
189+
190+
function get_series()
191+
local command = ARGV[1]
192+
local start = ARGV[2]
193+
local stop = ARGV[3]
194+
local pos = 4
195+
local series = {}
196+
for i, id in ipairs(KEYS) do
197+
local num_fields = ARGV[pos]
198+
local serie = columnts:new(id)
199+
local num_fields = ARGV[pos]
200+
local fields = table_slice(ARGV, pos+1, pos+num_fields)
201+
pos = pos + num_fields + 1
202+
local time_values = serie:range(command, start, stop, fields, true)
203+
local t,v = unpack(time_values)
204+
table.insert({key=id, times=t, field_values=v})
205+
end
206+
if # series == 0 then
207+
error('No timeseries available')
208+
end
209+
return series
210+
end
211+
212+
if KEYS then
213+
local series = get_series()
214+
if # series > 1 then
215+
stats = multi_stats(series)
216+
if stats then
217+
stats = cjson.encode(stats)
218+
end
219+
else
220+
stats = flat_table(uni_stats(series[1]))
221+
stats[4] = flat_table(stats[4])
222+
end
223+
224+
return stats
225+
end

stdnet/apps/columnts/models.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ def clear(self):
3030

3131

3232
class ColumnTS(orm.TS):
33+
default_multi_stats = ['covariance']
34+
3335
cache_class = TimeseriesCache
3436
pickler = encoders.DateTimeConverter()
3537
value_pickler = DoubleEncoder()
@@ -103,6 +105,31 @@ def stats(self, start, end, fields=None):
103105
res = self.backend_structure().stats(start, end, fields)
104106
return self.async_handle(res, self._stats)
105107

108+
def imulti_stats(self, start=0, end=-1, field, series, stats=None):
109+
stats = stats or self.default_multi_stats
110+
res = self.backend_structure().imulti_stats(start, end, field, series,
111+
stats)
112+
return self.async_handle(res, self._stats)
113+
114+
def multi_stats(self, start, end, field, series, stats=None):
115+
'''Perform cross multivariate statistics calculation of
116+
this :clsss:`ColumnTS` and other *series*.
117+
118+
:parameter start: the start date.
119+
:parameter start: the end date
120+
:parameter field: name of field to perform multivariate statistics.
121+
:parameter series: a list of two elements tuple containing the id of the
122+
a :class:`columnTS` and a field name.
123+
:parameter stats: list of statistics to evaluate.
124+
Default: ['covariance']
125+
'''
126+
stats = stats or self.default_multi_stats
127+
start = self.pickler.dumps(start)
128+
end = self.pickler.dumps(end)
129+
res = self.backend_structure().multi_stats(
130+
start, end, field, series, stats)
131+
return self.async_handle(res, self._stats)
132+
106133
def merge(self, *series, **kwargs):
107134
'''Merge this :class:`ColumnTS` with several other.
108135

stdnet/apps/columnts/redis.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,24 @@ def stats(self, start, end, fields = None):
104104
return self.client.script_call('timeseries_stats', self.id,
105105
'tsrangebytime', start, end, len(fields), *fields)
106106

107+
def imulti_stats(self, start, end, field, series, stats):
108+
return self._multi_stats('tsrange', start, end, field, series, stats)
109+
110+
def multi_stats(self, start, end, field, series, stats):
111+
return self._multi_stats('tsrangebytime', start, end, field, series,
112+
stats)
113+
114+
def _multi_stats(self, command, start, end, field, series, stats):
115+
n = len(series)
116+
argv = []
117+
for s in series:
118+
if not len(s) == 2:
119+
raise ValueError('Series must be a list of two elements tuple')
120+
argv.extend(s)
121+
argv.extend(stats)
122+
return self.client.script_call('timeseries_stats', self.id,
123+
command, start, end, 'multi', field, n, *argv)
124+
107125

108126
# Add the redis structure to the struct map in the backend class
109127
redisb.BackendDataServer.struct_map['columnts'] = RedisColumnTS

tests/lua/columnts.lua

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
-- tests stdnet.apps.columnts.lua internal functions
2+
package.path = package.path .. ";../../stdnet/apps/columnts/lua/?.lua"
3+
require("stats")
4+
5+
assert(# init_vector(5,-3) == 5)
6+
assert(# init_vector(0,7) == 0)
7+
assert(# init_vector(-1,0) == 0)
8+
9+
local vector = init_vector(7,98)
10+
for i,v in ipairs(vector) do
11+
assert(v == 98)
12+
end
13+
14+
local vector2 = vector_square(vector)
15+
assert(# vector2 == 28)
16+
for i,v in ipairs(vector2) do
17+
assert(v == 98*98)
18+
end
19+
20+
-- _VECTOR_SADD
21+
sv = {1,2,3}
22+
vector_sadd(sv, {3,-1,6})
23+
assert(equal_vectors(sv,{4,1,9}))
24+
25+
-- SQUARE VECTOR
26+
vector = {2,4,9,-2}
27+
vector2 = vector_square(vector)
28+
assert(# vector2 == 10)
29+
assert(vector2[1] == 4)
30+
assert(vector2[2] == 8)
31+
assert(vector2[3] == 16)
32+
assert(vector2[4] == 18)
33+
assert(vector2[5] == 36)
34+
assert(vector2[6] == 81)
35+
assert(vector2[7] == -4)
36+
assert(vector2[8] == -8)
37+
assert(vector2[9] == -18)
38+
assert(vector2[10] == 4)
39+
40+
-- TEST MULTI SERIES AGGREGATION
41+
local serie1 = {key='first',
42+
times={1001,1002,1003,1004},
43+
field_values={field1 = {1,3,2,4}}}
44+
local serie2 = {key='second',
45+
times={1001,1002,1004,1005,1006},
46+
field_values={field1 = {1,3,2,4,1.5}}}
47+
local serie3 = {key='third',
48+
times={1001,1002,1003,1004,1009},
49+
field_values={field1 = {-2,3,2,4,6.4}}}
50+
51+
local series = {serie1, serie2, serie3}
52+
local a = fields_and_times(series)
53+
assert(# a.times == 4)
54+
assert(equal_vectors(a.times, serie1.times))
55+
assert(# a.names == 3)
56+
57+
local stats = multi_stats({serie1,serie2,serie3})
58+
59+
print 'OK'

0 commit comments

Comments
 (0)