@@ -33,6 +33,9 @@ def redis_before_send(sender, request, command, **kwargs):
33
33
34
34
redis .redis_before_send .connect (redis_before_send )
35
35
36
+ #class add_recursive(redis.RedisScript):
37
+ # script = (redis.read_lua_file('commands.utils'),
38
+ # redis.read_lua_file('odm.add_recursive'))
36
39
37
40
class odmrun (redis .RedisScript ):
38
41
script = (redis .read_lua_file ('commands.utils' ),
@@ -41,53 +44,29 @@ class odmrun(redis.RedisScript):
41
44
redis .read_lua_file ('odmrun' ))
42
45
43
46
def callback (self , request , response , args , meta = None ,
44
- iids = None , script = None , ** options ):
47
+ backend = None , script = None , ** options ):
45
48
if script == 'delete' :
46
49
res = (instance_session_result (r ,False ,r ,True ,0 ) for r in response )
47
50
return session_result (meta , res )
48
51
elif script == 'commit' :
49
- res = self ._wrap_commit (request , response , iids )
52
+ res = self ._wrap_commit (request , response , ** options )
50
53
return session_result (meta , res )
54
+ elif script == 'load' :
55
+ return self .load_query (request , response , backend , meta , ** options )
56
+ else :
57
+ return response
51
58
52
- def _wrap_commit (self , request , response , iids ):
59
+ def _wrap_commit (self , request , response , iids = None , ** options ):
53
60
for id , iid in zip (response , iids ):
54
61
id , flag , info = id
55
62
if int (flag ):
56
63
yield instance_session_result (iid , True , id , False , float (info ))
57
64
else :
58
65
msg = info .decode (request .encoding )
59
- yield CommitException (msg )
60
-
61
- #class add_recursive(redis.RedisScript):
62
- # script = (redis.read_lua_file('commands.utils'),
63
- # redis.read_lua_file('odm.add_recursive'))
64
-
65
-
66
- class load_query (redis .RedisScript ):
67
- '''Rich script for loading a query result into stdnet. It handles
68
- loading of different fields, loading of related fields, sorting and
69
- limiting.'''
70
- script = (redis .read_lua_file ('tabletools' ),)
71
- # redis.read_lua_file('commands.timeseries'),
72
- # redis.read_lua_file('commands.utils'),
73
- # redis.read_lua_file('odm.load_query'))
74
-
75
- def build (self , response , fields , fields_attributes , encoding ):
76
- fields = tuple (fields ) if fields else None
77
- if fields :
78
- if len (fields ) == 1 and fields [0 ] == 'id' :
79
- for id in response :
80
- yield id ,(),{}
81
- else :
82
- for id ,fdata in response :
83
- yield id ,fields ,dict (zip (fields_attributes ,fdata ))
84
- else :
85
- for id ,fdata in response :
86
- yield id ,None ,dict (pairs_to_dict (fdata , encoding ))
66
+ yield CommitException (msg )
87
67
88
- def callback (self , request , response , args , query = None , get = None ,
89
- fields = None , fields_attributes = None , ** kwargs ):
90
- meta = query .meta
68
+ def load_query (self , request , response , backend , meta , get = None ,
69
+ fields = None , fields_attributes = None , ** options ):
91
70
if get :
92
71
tpy = meta .dfields [get ].to_python
93
72
return [tpy (v ) for v in response ]
@@ -102,14 +81,27 @@ def callback(self, request, response, args, query=None, get=None,
102
81
fields = tuple (native_str (f , encoding ) for f in fields )
103
82
related_fields [fname ] = \
104
83
self .load_related (meta , fname , rdata , fields , encoding )
105
- return query .backend .objects_from_db (meta , data , related_fields )
106
-
84
+ return backend .objects_from_db (meta , data , related_fields )
85
+
86
+ def build (self , response , fields , fields_attributes , encoding ):
87
+ fields = tuple (fields ) if fields else None
88
+ if fields :
89
+ if len (fields ) == 1 and fields [0 ] == 'id' :
90
+ for id in response :
91
+ yield id ,(),{}
92
+ else :
93
+ for id ,fdata in response :
94
+ yield id ,fields ,dict (zip (fields_attributes ,fdata ))
95
+ else :
96
+ for id ,fdata in response :
97
+ yield id ,None ,dict (pairs_to_dict (fdata , encoding ))
98
+
107
99
def load_related (self , meta , fname , data , fields , encoding ):
108
100
'''Parse data for related objects.'''
109
101
field = meta .dfields [fname ]
110
102
if field in meta .multifields :
111
103
fmeta = field .structure_class ()._meta
112
- if fmeta .name in ('hashtable' ,'zset' ):
104
+ if fmeta .name in ('hashtable' , 'zset' ):
113
105
return ((native_str (id , encoding ),
114
106
pairs_to_dict (fdata , encoding )) for \
115
107
id ,fdata in data )
@@ -156,7 +148,7 @@ def redis_execution(pipe, result_type):
156
148
################################################################################
157
149
class RedisQuery (stdnet .BackendQuery ):
158
150
card = None
159
- meta_info = None
151
+ _meta_info = None
160
152
script_dep = {'script_dependency' : ('build_query' ,'move2set' )}
161
153
162
154
def zism (self , r ):
@@ -165,6 +157,12 @@ def zism(self, r):
165
157
def sism (self , r ):
166
158
return r
167
159
160
+ @property
161
+ def meta_info (self ):
162
+ if self ._meta_info == None :
163
+ self ._meta_info = json .dumps (self .backend .meta (self .meta ))
164
+ return self ._meta_info
165
+
168
166
def _build (self , pipe = None , ** kwargs ):
169
167
# Accumulate a query
170
168
self .pipe = pipe if pipe is not None else self .backend .client .pipeline ()
@@ -186,26 +184,21 @@ def _build(self, pipe=None, **kwargs):
186
184
key = backend .basekey (meta , 'id' )
187
185
temp_key = False
188
186
else :
189
- if self .meta_info == None :
190
- self .meta_info = backend .meta (meta )
191
- # Get a temporary key where to store the queryset
192
187
key = backend .tempkey (meta )
193
- pipe . script_call ( ' odmrun' , () , 'query' , self .meta_info ,
194
- qs .name , key , * args )
188
+ backend . odmrun ( pipe , 'query' , meta , ( key ,) , self .meta_info ,
189
+ qs .name , * args )
195
190
else :
196
191
key = backend .tempkey (meta )
197
- p = 'z' if self .meta .ordering else 's'
198
- pipe .script_call ('move2set' , keys , p ,
199
- ** {'script_dependency' :'build_query' })
192
+ p = 'z' if meta .ordering else 's'
193
+ pipe .script_call ('move2set' , keys , p , script_dependency = 'odmrun' )
200
194
if qs .keyword == 'intersect' :
201
- getattr (pipe ,p + 'interstore' )(key , keys , ** self .script_dep )
195
+ getattr (pipe , p + 'interstore' )(key , keys , ** self .script_dep )
202
196
elif qs .keyword == 'union' :
203
- getattr (pipe ,p + 'unionstore' )(key , keys , ** self .script_dep )
197
+ getattr (pipe , p + 'unionstore' )(key , keys , ** self .script_dep )
204
198
elif qs .keyword == 'diff' :
205
- getattr (pipe ,p + 'diffstore' )(key , keys , ** self .script_dep )
199
+ getattr (pipe , p + 'diffstore' )(key , keys , ** self .script_dep )
206
200
else :
207
- raise ValueError ('Could not perform "{0}" operation' \
208
- .format (qs .keyword ))
201
+ raise ValueError ('Could not perform %s operation' % qs .keyword )
209
202
# If e requires a different field other than id, perform a sort
210
203
# by nosort and get the object field.
211
204
gf = qs ._get_field
@@ -216,7 +209,7 @@ def _build(self, pipe=None, **kwargs):
216
209
temp_key = True
217
210
key = backend .tempkey (meta )
218
211
okey = backend .basekey (meta , OBJ , '*->' + field_attribute )
219
- pipe .sort (bkey , by = 'nosort' , get = okey , store = key )
212
+ pipe .sort (bkey , by = 'nosort' , get = okey , store = key )
220
213
self .card = getattr (pipe ,'llen' )
221
214
# if the key is temporary, add an expiry
222
215
if temp_key :
@@ -316,12 +309,11 @@ def _items(self, slic):
316
309
stop = - 1
317
310
get = self .queryelem ._get_field or ''
318
311
fields_attributes = None
319
- keys = (self .query_key , backend .basekey (meta ))
320
- args = [get ]
312
+ pkname_tuple = (meta .pk .name ,)
321
313
# if the get_field is available, we simply load that field
322
314
if get :
323
- if get == 'id' :
324
- fields_attributes = fields = ( get ,)
315
+ if get == meta . pk . name :
316
+ fields_attributes = fields = pkname_tuple
325
317
else :
326
318
fields , fields_attributes = meta .backend_fields ((get ,))
327
319
else :
@@ -330,22 +322,24 @@ def _items(self, slic):
330
322
fields = set (fields )
331
323
fields .update (self .queryelem .select_related or ())
332
324
fields = tuple (fields )
333
- if fields == ( 'id' ,) :
325
+ if fields == pkname_tuple :
334
326
fields_attributes = fields
335
327
elif fields :
336
328
fields , fields_attributes = meta .backend_fields (fields )
337
329
else :
338
- fields_attributes = ()
339
- args .append (len (fields_attributes ))
340
- args .extend (fields_attributes )
341
- args .extend (self .related_lua_args ())
342
- args .extend ((name ,start ,stop ))
343
- args .extend (order )
344
- options = {'fields' :fields ,
345
- 'fields_attributes' :fields_attributes ,
346
- 'query' :self ,
347
- 'get' :get }
348
- return backend .client .script_call ('load_query' , keys , * args , ** options )
330
+ fields_attributes = ()
331
+ options = {'get' : get ,
332
+ 'ordering' : name ,
333
+ 'order' : order ,
334
+ 'start' : start ,
335
+ 'stop' : stop ,
336
+ 'fields' : fields_attributes ,
337
+ 'get' : get }
338
+ joptions = json .dumps (options )
339
+ options .update ({'fields' : fields ,
340
+ 'fields_attributes' : fields_attributes })
341
+ return backend .odmrun (backend .client , 'load' , meta , (self .query_key ,),
342
+ self .meta_info , joptions , ** options )
349
343
350
344
def related_lua_args (self ):
351
345
'''Generator of load_related arguments'''
@@ -789,6 +783,13 @@ def meta(self, meta):
789
783
data = meta .as_dict ()
790
784
data ['namespace' ] = self .basekey (meta )
791
785
return data
786
+
787
+ def odmrun (self , client , script , meta , keys , meta_info , * args , ** options ):
788
+ options .update ({'backend' : self ,
789
+ 'meta' : meta ,
790
+ 'script' : script })
791
+ return client .script_call ('odmrun' , keys , script , meta_info , * args ,
792
+ ** options )
792
793
793
794
def execute_session (self , session , callback ):
794
795
'''Execute a session in redis.'''
@@ -804,7 +805,7 @@ def execute_session(self, session, callback):
804
805
dirty = tuple (sm .iterdirty ())
805
806
N = len (dirty )
806
807
if N :
807
- lua_data = ['commit' , json . dumps ( self . meta ( meta )), N ]
808
+ lua_data = [N ]
808
809
processed = []
809
810
for instance in dirty :
810
811
state = instance .state ()
@@ -833,8 +834,9 @@ def execute_session(self, session, callback):
833
834
lua_data .extend ((action , id , score , len (data )))
834
835
lua_data .extend (data )
835
836
processed .append (state .iid )
836
- pipe .script_call ('odmrun' , (), * lua_data , script = 'commit' ,
837
- meta = meta , iids = processed )
837
+ self .odmrun (pipe , 'commit' , meta , (),
838
+ json .dumps (self .meta (meta )), * lua_data ,
839
+ iids = processed )
838
840
command , result = redis_execution (pipe , session_result )
839
841
return on_result (result , callback , command )
840
842
@@ -920,4 +922,4 @@ def publish(self, channel, message):
920
922
return self .client .execute_command ('PUBLISH' , channel , message )
921
923
922
924
def subscriber (self , ** kwargs ):
923
- return redis .Subscriber (self .client , ** kwargs )
925
+ return redis .Subscriber (self .client , ** kwargs )
0 commit comments