Skip to content

Commit 367e052

Browse files
author
quantmind
committed
first 5 session tests passing
1 parent b01e84b commit 367e052

File tree

8 files changed

+276
-118
lines changed

8 files changed

+276
-118
lines changed

stdnet/backends/redisb.py

Lines changed: 74 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ def redis_before_send(sender, request, command, **kwargs):
3333

3434
redis.redis_before_send.connect(redis_before_send)
3535

36+
#class add_recursive(redis.RedisScript):
37+
# script = (redis.read_lua_file('commands.utils'),
38+
# redis.read_lua_file('odm.add_recursive'))
3639

3740
class odmrun(redis.RedisScript):
3841
script = (redis.read_lua_file('commands.utils'),
@@ -41,53 +44,29 @@ class odmrun(redis.RedisScript):
4144
redis.read_lua_file('odmrun'))
4245

4346
def callback(self, request, response, args, meta=None,
44-
iids=None, script=None, **options):
47+
backend=None, script=None, **options):
4548
if script == 'delete':
4649
res = (instance_session_result(r,False,r,True,0) for r in response)
4750
return session_result(meta, res)
4851
elif script == 'commit':
49-
res = self._wrap_commit(request, response, iids)
52+
res = self._wrap_commit(request, response, **options)
5053
return session_result(meta, res)
54+
elif script == 'load':
55+
return self.load_query(request, response, backend, meta, **options)
56+
else:
57+
return response
5158

52-
def _wrap_commit(self, request, response, iids):
59+
def _wrap_commit(self, request, response, iids=None, **options):
5360
for id, iid in zip(response, iids):
5461
id, flag, info = id
5562
if int(flag):
5663
yield instance_session_result(iid, True, id, False, float(info))
5764
else:
5865
msg = info.decode(request.encoding)
59-
yield CommitException(msg)
60-
61-
#class add_recursive(redis.RedisScript):
62-
# script = (redis.read_lua_file('commands.utils'),
63-
# redis.read_lua_file('odm.add_recursive'))
64-
65-
66-
class load_query(redis.RedisScript):
67-
'''Rich script for loading a query result into stdnet. It handles
68-
loading of different fields, loading of related fields, sorting and
69-
limiting.'''
70-
script = (redis.read_lua_file('tabletools'),)
71-
# redis.read_lua_file('commands.timeseries'),
72-
# redis.read_lua_file('commands.utils'),
73-
# redis.read_lua_file('odm.load_query'))
74-
75-
def build(self, response, fields, fields_attributes, encoding):
76-
fields = tuple(fields) if fields else None
77-
if fields:
78-
if len(fields) == 1 and fields[0] == 'id':
79-
for id in response:
80-
yield id,(),{}
81-
else:
82-
for id,fdata in response:
83-
yield id,fields,dict(zip(fields_attributes,fdata))
84-
else:
85-
for id,fdata in response:
86-
yield id,None,dict(pairs_to_dict(fdata, encoding))
66+
yield CommitException(msg)
8767

88-
def callback(self, request, response, args, query=None, get=None,
89-
fields=None, fields_attributes=None, **kwargs):
90-
meta = query.meta
68+
def load_query(self, request, response, backend, meta, get=None,
69+
fields=None, fields_attributes=None, **options):
9170
if get:
9271
tpy = meta.dfields[get].to_python
9372
return [tpy(v) for v in response]
@@ -102,14 +81,27 @@ def callback(self, request, response, args, query=None, get=None,
10281
fields = tuple(native_str(f, encoding) for f in fields)
10382
related_fields[fname] =\
10483
self.load_related(meta, fname, rdata, fields, encoding)
105-
return query.backend.objects_from_db(meta, data, related_fields)
106-
84+
return backend.objects_from_db(meta, data, related_fields)
85+
86+
def build(self, response, fields, fields_attributes, encoding):
87+
fields = tuple(fields) if fields else None
88+
if fields:
89+
if len(fields) == 1 and fields[0] == 'id':
90+
for id in response:
91+
yield id,(),{}
92+
else:
93+
for id,fdata in response:
94+
yield id,fields,dict(zip(fields_attributes,fdata))
95+
else:
96+
for id,fdata in response:
97+
yield id,None,dict(pairs_to_dict(fdata, encoding))
98+
10799
def load_related(self, meta, fname, data, fields, encoding):
108100
'''Parse data for related objects.'''
109101
field = meta.dfields[fname]
110102
if field in meta.multifields:
111103
fmeta = field.structure_class()._meta
112-
if fmeta.name in ('hashtable','zset'):
104+
if fmeta.name in ('hashtable', 'zset'):
113105
return ((native_str(id, encoding),
114106
pairs_to_dict(fdata, encoding)) for \
115107
id,fdata in data)
@@ -156,7 +148,7 @@ def redis_execution(pipe, result_type):
156148
################################################################################
157149
class RedisQuery(stdnet.BackendQuery):
158150
card = None
159-
meta_info = None
151+
_meta_info = None
160152
script_dep = {'script_dependency': ('build_query','move2set')}
161153

162154
def zism(self, r):
@@ -165,6 +157,12 @@ def zism(self, r):
165157
def sism(self, r):
166158
return r
167159

160+
@property
161+
def meta_info(self):
162+
if self._meta_info == None:
163+
self._meta_info = json.dumps(self.backend.meta(self.meta))
164+
return self._meta_info
165+
168166
def _build(self, pipe=None, **kwargs):
169167
# Accumulate a query
170168
self.pipe = pipe if pipe is not None else self.backend.client.pipeline()
@@ -186,26 +184,21 @@ def _build(self, pipe=None, **kwargs):
186184
key = backend.basekey(meta, 'id')
187185
temp_key = False
188186
else:
189-
if self.meta_info == None:
190-
self.meta_info = backend.meta(meta)
191-
# Get a temporary key where to store the queryset
192187
key = backend.tempkey(meta)
193-
pipe.script_call('odmrun', (), 'query', self.meta_info,
194-
qs.name, key, *args)
188+
backend.odmrun(pipe, 'query', meta, (key,), self.meta_info,
189+
qs.name, *args)
195190
else:
196191
key = backend.tempkey(meta)
197-
p = 'z' if self.meta.ordering else 's'
198-
pipe.script_call('move2set', keys, p,
199-
**{'script_dependency':'build_query'})
192+
p = 'z' if meta.ordering else 's'
193+
pipe.script_call('move2set', keys, p, script_dependency='odmrun')
200194
if qs.keyword == 'intersect':
201-
getattr(pipe,p+'interstore')(key, keys, **self.script_dep)
195+
getattr(pipe, p+'interstore')(key, keys, **self.script_dep)
202196
elif qs.keyword == 'union':
203-
getattr(pipe,p+'unionstore')(key, keys, **self.script_dep)
197+
getattr(pipe, p+'unionstore')(key, keys, **self.script_dep)
204198
elif qs.keyword == 'diff':
205-
getattr(pipe,p+'diffstore')(key, keys, **self.script_dep)
199+
getattr(pipe, p+'diffstore')(key, keys, **self.script_dep)
206200
else:
207-
raise ValueError('Could not perform "{0}" operation'\
208-
.format(qs.keyword))
201+
raise ValueError('Could not perform %s operation' % qs.keyword)
209202
# If e requires a different field other than id, perform a sort
210203
# by nosort and get the object field.
211204
gf = qs._get_field
@@ -216,7 +209,7 @@ def _build(self, pipe=None, **kwargs):
216209
temp_key = True
217210
key = backend.tempkey(meta)
218211
okey = backend.basekey(meta, OBJ, '*->' + field_attribute)
219-
pipe.sort(bkey, by = 'nosort', get = okey, store = key)
212+
pipe.sort(bkey, by='nosort', get = okey, store = key)
220213
self.card = getattr(pipe,'llen')
221214
# if the key is temporary, add an expiry
222215
if temp_key:
@@ -316,12 +309,11 @@ def _items(self, slic):
316309
stop = -1
317310
get = self.queryelem._get_field or ''
318311
fields_attributes = None
319-
keys = (self.query_key, backend.basekey(meta))
320-
args = [get]
312+
pkname_tuple = (meta.pk.name,)
321313
# if the get_field is available, we simply load that field
322314
if get:
323-
if get == 'id':
324-
fields_attributes = fields = (get,)
315+
if get == meta.pk.name:
316+
fields_attributes = fields = pkname_tuple
325317
else:
326318
fields, fields_attributes = meta.backend_fields((get,))
327319
else:
@@ -330,22 +322,24 @@ def _items(self, slic):
330322
fields = set(fields)
331323
fields.update(self.queryelem.select_related or ())
332324
fields = tuple(fields)
333-
if fields == ('id',):
325+
if fields == pkname_tuple:
334326
fields_attributes = fields
335327
elif fields:
336328
fields, fields_attributes = meta.backend_fields(fields)
337329
else:
338-
fields_attributes = ()
339-
args.append(len(fields_attributes))
340-
args.extend(fields_attributes)
341-
args.extend(self.related_lua_args())
342-
args.extend((name,start,stop))
343-
args.extend(order)
344-
options = {'fields':fields,
345-
'fields_attributes':fields_attributes,
346-
'query':self,
347-
'get':get}
348-
return backend.client.script_call('load_query', keys, *args, **options)
330+
fields_attributes = ()
331+
options = {'get': get,
332+
'ordering': name,
333+
'order': order,
334+
'start': start,
335+
'stop': stop,
336+
'fields': fields_attributes,
337+
'get': get}
338+
joptions = json.dumps(options)
339+
options.update({'fields': fields,
340+
'fields_attributes': fields_attributes})
341+
return backend.odmrun(backend.client, 'load', meta, (self.query_key,),
342+
self.meta_info, joptions, **options)
349343

350344
def related_lua_args(self):
351345
'''Generator of load_related arguments'''
@@ -789,6 +783,13 @@ def meta(self, meta):
789783
data = meta.as_dict()
790784
data['namespace'] = self.basekey(meta)
791785
return data
786+
787+
def odmrun(self, client, script, meta, keys, meta_info, *args, **options):
788+
options.update({'backend': self,
789+
'meta': meta,
790+
'script': script})
791+
return client.script_call('odmrun', keys, script, meta_info, *args,
792+
**options)
792793

793794
def execute_session(self, session, callback):
794795
'''Execute a session in redis.'''
@@ -804,7 +805,7 @@ def execute_session(self, session, callback):
804805
dirty = tuple(sm.iterdirty())
805806
N = len(dirty)
806807
if N:
807-
lua_data = ['commit', json.dumps(self.meta(meta)), N]
808+
lua_data = [N]
808809
processed = []
809810
for instance in dirty:
810811
state = instance.state()
@@ -833,8 +834,9 @@ def execute_session(self, session, callback):
833834
lua_data.extend((action, id, score, len(data)))
834835
lua_data.extend(data)
835836
processed.append(state.iid)
836-
pipe.script_call('odmrun', (), *lua_data, script='commit',
837-
meta=meta, iids=processed)
837+
self.odmrun(pipe, 'commit', meta, (),
838+
json.dumps(self.meta(meta)), *lua_data,
839+
iids=processed)
838840
command, result = redis_execution(pipe, session_result)
839841
return on_result(result, callback, command)
840842

@@ -920,4 +922,4 @@ def publish(self, channel, message):
920922
return self.client.execute_command('PUBLISH', channel, message)
921923

922924
def subscriber(self, **kwargs):
923-
return redis.Subscriber(self.client, **kwargs)
925+
return redis.Subscriber(self.client, **kwargs)

0 commit comments

Comments
 (0)