2
2
require 'rubygems'
3
3
require 'zmq'
4
4
require './hashes'
5
+ require './vclock'
5
6
6
7
class Map
7
8
def initialize ( func_str , data )
@@ -37,6 +38,31 @@ def reduce
37
38
end
38
39
end
39
40
41
+ class NodeObject
42
+ attr :value , :vclock
43
+ def initialize ( value , vclock )
44
+ @value = value
45
+ @vclock = vclock
46
+ end
47
+
48
+ def <=>( nobject2 )
49
+ vclock <=> nobject2 . vclock
50
+ end
51
+
52
+ def serialize
53
+ { :value => value , :vclock => vclock } . to_json
54
+ end
55
+
56
+ def self . deserialize ( serialized )
57
+ data = JSON . parse ( serialized )
58
+ vclock = VectorClock . deserialize ( data [ 'vclock' ] )
59
+ NodeObject . new ( data [ 'value' ] , vclock )
60
+ end
61
+
62
+ def to_s
63
+ serialize
64
+ end
65
+ end
40
66
41
67
Thread . abort_on_exception = true
42
68
class Node
@@ -120,9 +146,7 @@ def coordinate(port)
120
146
msg , node = line . split ( ' ' , 2 )
121
147
case msg
122
148
when 'join'
123
- @nodes << node
124
- @nodes . uniq!
125
- @nodes . sort!
149
+ @nodes = ( @nodes << node ) . uniq . sort
126
150
when 'down'
127
151
@nodes -= [ node ]
128
152
end
@@ -149,20 +173,20 @@ def service(port, sink=true)
149
173
end
150
174
# recv/send because it's a req/res
151
175
while line = rep . recv
152
- msg , key , value = line . split ( ' ' , 3 )
153
- # p line
176
+ msg , payload = line . split ( ' ' , 2 )
154
177
case msg
155
178
when 'put'
156
- rep . send ( put ( key , value ) . to_s )
179
+ n , key , vc , value = payload . split ( ' ' , 4 )
180
+ rep . send ( put ( key , vc , value , n . to_i ) . to_s )
157
181
when 'get'
158
- rep . send ( get ( key ) . to_s )
182
+ n , key , value = payload . split ( ' ' , 3 )
183
+ rep . send ( get ( key , n . to_i ) . to_s )
159
184
when 'map'
160
- _ , map_func = line . split ( ' ' , 2 )
161
- rep . send ( Map . new ( map_func , @data ) . call . to_s )
185
+ rep . send ( Map . new ( payload , @data ) . call . to_s )
162
186
when 'mr'
163
- map_func , reduce_func = mr_message ( line )
164
- results = remote_maps ( map_func )
165
- rep . send ( reduce ( reduce_func , results ) . to_s )
187
+ map_func , reduce_func = payload . split ( / \; \s +reduce/ , 2 )
188
+ reduce_func = "reduce #{ reduce_func } "
189
+ rep . send ( reduce ( reduce_func , remote_maps ( map_func ) ) . to_s )
166
190
else
167
191
rep . send ( 'what?' )
168
192
end
@@ -173,13 +197,6 @@ def service(port, sink=true)
173
197
end
174
198
end
175
199
176
- def mr_message ( message )
177
- _ , mr_code = message . split ' ' , 2
178
- map_func , reduce_func = mr_code . split ( /\; \s +reduce/ , 2 )
179
- # TODO: split without removing 'reduce'
180
- [ map_func , "reduce#{ reduce_func } " ]
181
- end
182
-
183
200
# run in parallel, then join results
184
201
def remote_maps ( map_func )
185
202
# TODO: is ruby array threadsafe?
@@ -215,30 +232,110 @@ def cluster(nodes)
215
232
end
216
233
end
217
234
218
- def get ( key )
235
+ # return a list of successive nodes
236
+ # that can also hold this value
237
+ def pref_list ( n = 3 )
238
+ list = @nodes . clone
239
+ while list . first != @name
240
+ list << list . shift
241
+ end
242
+ list [ 1 ...n ]
243
+ end
244
+
245
+ # with these messages we don't look for the
246
+ # proper node, we just add the value
247
+ def get ( key , n = 1 )
248
+ # 0 means get locally
249
+ if n == 0
250
+ puts "get 0 #{ key } "
251
+ return @data [ hash ( key ) ] || "null"
252
+ end
253
+ # if we ask for just one, and it's here, return it!
254
+ if n == 1 && value = @data [ hash ( key ) ]
255
+ puts "get 1 #{ key } (local)"
256
+ return value
257
+ end
219
258
node = @ring . node ( key )
220
259
if node == @name
221
- puts "get #{ key } "
222
- @data [ hash ( key ) ]
260
+ puts "get #{ n } #{ key } "
261
+ results = replicate ( "get 0 #{ key } " , n )
262
+ results . map! do |r |
263
+ r == 'null' ? nil : NodeObject . deserialize ( r )
264
+ end
265
+ results << @data [ hash ( key ) ]
266
+ results . compact!
267
+ begin
268
+ results . sort . first
269
+ rescue
270
+ # a conflic forces sublings
271
+ puts "Conflict!"
272
+ return results
273
+ end
223
274
else
224
- remote_call ( node , "get #{ key } " )
275
+ remote_call ( node , "get #{ n } #{ key } " )
225
276
end
226
277
end
227
278
228
- def put ( key , value )
279
+ def put ( key , vc , value , n = 1 )
280
+ # 0 means insert locally
281
+ # use the vclock given
282
+ if n == 0
283
+ vclock = VectorClock . deserialize ( vc )
284
+ puts "put 0 #{ key } #{ vclock } #{ value } "
285
+ new_obj = [ NodeObject . new ( value , vclock ) ]
286
+ return @data [ hash ( key ) ] = new_obj
287
+ end
229
288
# check for correct node
230
289
node = @ring . node ( key )
231
290
if node == @name
232
- puts "put #{ key } #{ value } "
233
- @data [ hash ( key ) ] = value
234
- 'true'
291
+ # we don't care what a given vclock is,
292
+ # just grab this node's clock as definitive
293
+ vclock = VectorClock . deserialize ( vc )
294
+ node_object = [ ]
295
+ if current_obj = @data [ hash ( key ) ]
296
+ # if no clock was given, just use the old one
297
+ if vclock . empty?
298
+ vclock = current_obj . vclock
299
+ vclock . increment ( @name )
300
+ node_object = [ NodeObject . new ( value , vclock ) ]
301
+ # otherwise, ensure the given one is a decendant of
302
+ # an existing clock. If not, create a sibling
303
+ else
304
+ current_obj = [ current_obj ] unless Array === current_obj
305
+ if current_obj . find { |o | vclock . descends_from? ( o . vclock ) }
306
+ # is a decendant, assume this is resolving a conflict
307
+ vclock . increment ( @name )
308
+ node_object = [ NodeObject . new ( value , vclock ) ]
309
+ else
310
+ # not a decendant of anything, ie conflict. add as a sibling
311
+ vclock . increment ( @name )
312
+ node_object = current_obj + [ NodeObject . new ( value , vclock ) ]
313
+ end
314
+ end
315
+ else
316
+ vclock . increment ( @name )
317
+ node_object = [ NodeObject . new ( value , vclock ) ]
318
+ end
319
+ # vclock = old_val = @data[hash(key)] ? old_val.vclock : VectorClock.new
320
+ # vclock.increment(@name)
321
+ puts "put #{ n } #{ key } #{ vclock } #{ value } "
322
+ @data [ hash ( key ) ] = node_object
323
+ replicate ( "put 0 #{ key } #{ vclock } #{ value } " , n )
324
+ return new_obj
235
325
else
236
- remote_call ( node , "put #{ key } #{ value } " )
326
+ remote_call ( node , "put #{ n } #{ key } #{ vc } #{ value } " )
327
+ end
328
+ end
329
+
330
+ def replicate ( message , n )
331
+ list = pref_list ( n )
332
+ results = [ ]
333
+ while replicate_node = list . shift
334
+ results << remote_call ( replicate_node , message )
237
335
end
336
+ results
238
337
end
239
338
240
- # TODO: test three nodes
241
- # TODO: keep ports open
242
339
def remote_call ( node , message )
243
340
puts "#{ node } <= #{ message } "
244
341
name , port = node . split ':'
@@ -269,14 +366,13 @@ def close
269
366
# we're using ports here, because it's easier to run locally
270
367
# however, this could easily be an ip address:port
271
368
begin
369
+ $n = 3
272
370
node , coordinator = ARGV
273
371
name , port = node . split ':'
274
372
$node = Node . new ( node )
275
373
276
374
trap ( "SIGINT" ) { $node. close }
277
375
278
- # TODO: gossip?
279
-
280
376
leader = false
281
377
if coordinator . nil?
282
378
coordinator = port . to_i - 100
0 commit comments