Skip to content

Commit bfb74e0

Browse files
committed
Add replication, vclocks and sibling
1 parent bd7d8e0 commit bfb74e0

File tree

3 files changed

+205
-50
lines changed

3 files changed

+205
-50
lines changed

client.rb

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,54 @@
11
require 'rubygems'
22
require 'zmq'
3+
require 'json'
34

45
# http://zguide.zeromq.org/page:all#Plugging-Sockets-Into-the-Topology
56

6-
# WORKS!
7+
# How do we live beyond node failure? N Replication
8+
# How do we know which value is correct? Add Vector Clocks
9+
10+
11+
# Use PUSH/PULL for AAE
12+
# Map/Reduce should be the last thing
13+
# TODO: Make add-ons modules
14+
15+
# TODO:
16+
# Replace pub/sub ring-leader with req/res gossip protocol
17+
# Allow sloppy quorums and data transfer of downed nodes with hinted handoff
18+
719
port = 4000
820
ctx = ZMQ::Context.new
921
req = ctx.socket(ZMQ::REQ)
1022
req.connect("tcp://127.0.0.1:#{port}")
11-
1000.times do |i|
12-
req.send("put #{i}key #{i}") && req.recv
13-
end
14-
# req.send("put key hello") && req.recv
15-
# p req.send("get key") && req.recv
23+
puts "Inserting Values"
24+
# 1000.times do |i|
25+
# req.send("put #{i}key #{i}") && req.recv
26+
# end
27+
28+
# VC1
29+
# resp = req.send("put 2 key {} hello") && req.recv
30+
31+
# VC2 - conflict!
32+
# req.send("put 0 key {\"A:2201\":1} hello1") && req.recv
33+
# req.send("put 0 key {\"B:2201\":1} hello2") && req.recv
34+
# puts req.send("get 2 key") && req.recv
35+
36+
# VC3 - resolve!
37+
# resp = req.send("put 2 key {\"A:2201\":2,\"B:2201\":2} hello") && req.recv
38+
39+
# Counters
40+
req.send("put 1 key {\"A:2201\":1} +1") && req.recv
41+
req.send("put 1 key {\"B:2202\":1} +1") && req.recv
42+
vals = req.send("get 2 key") && req.recv
43+
p JSON.parse(vals)
44+
p JSON.parse(vals).reduce(0){|sum,v| sum + v['value'].to_i}
45+
46+
# puts req.send("get 2 key") && req.recv
47+
# p req.send("get 3 key") && req.recv
1648
# req.send("put foo goodbye") && req.recv
1749

18-
req.send("mr map{|k,v| [v]}; reduce{|vs| vs.length}")
19-
p req.recv
50+
# puts "Running MapReduce"
51+
# req.send("mr map{|k,v| [v]}; reduce{|vs| vs.length}")
52+
# p req.recv
2053

2154
req.close

node.rb

Lines changed: 128 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
require 'rubygems'
33
require 'zmq'
44
require './hashes'
5+
require './vclock'
56

67
class Map
78
def initialize(func_str, data)
@@ -37,6 +38,31 @@ def reduce
3738
end
3839
end
3940

41+
class NodeObject
42+
attr :value, :vclock
43+
def initialize(value, vclock)
44+
@value = value
45+
@vclock = vclock
46+
end
47+
48+
def <=>(nobject2)
49+
vclock <=> nobject2.vclock
50+
end
51+
52+
def serialize
53+
{:value=>value, :vclock=> vclock}.to_json
54+
end
55+
56+
def self.deserialize(serialized)
57+
data = JSON.parse(serialized)
58+
vclock = VectorClock.deserialize(data['vclock'])
59+
NodeObject.new(data['value'], vclock)
60+
end
61+
62+
def to_s
63+
serialize
64+
end
65+
end
4066

4167
Thread.abort_on_exception = true
4268
class Node
@@ -120,9 +146,7 @@ def coordinate(port)
120146
msg, node = line.split(' ', 2)
121147
case msg
122148
when 'join'
123-
@nodes << node
124-
@nodes.uniq!
125-
@nodes.sort!
149+
@nodes = (@nodes << node).uniq.sort
126150
when 'down'
127151
@nodes -= [node]
128152
end
@@ -149,20 +173,20 @@ def service(port, sink=true)
149173
end
150174
# recv/send because it's a req/res
151175
while line = rep.recv
152-
msg, key, value = line.split(' ', 3)
153-
# p line
176+
msg, payload = line.split(' ', 2)
154177
case msg
155178
when 'put'
156-
rep.send( put(key, value).to_s )
179+
n, key, vc, value = payload.split(' ', 4)
180+
rep.send( put(key, vc, value, n.to_i).to_s )
157181
when 'get'
158-
rep.send( get(key).to_s )
182+
n, key, value = payload.split(' ', 3)
183+
rep.send( get(key, n.to_i).to_s )
159184
when 'map'
160-
_, map_func = line.split(' ', 2)
161-
rep.send( Map.new(map_func, @data).call.to_s )
185+
rep.send( Map.new(payload, @data).call.to_s )
162186
when 'mr'
163-
map_func, reduce_func = mr_message(line)
164-
results = remote_maps(map_func)
165-
rep.send( reduce(reduce_func, results).to_s )
187+
map_func, reduce_func = payload.split(/\;\s+reduce/, 2)
188+
reduce_func = "reduce#{reduce_func}"
189+
rep.send( reduce(reduce_func, remote_maps(map_func)).to_s )
166190
else
167191
rep.send( 'what?' )
168192
end
@@ -173,13 +197,6 @@ def service(port, sink=true)
173197
end
174198
end
175199

176-
def mr_message(message)
177-
_, mr_code = message.split ' ', 2
178-
map_func, reduce_func = mr_code.split(/\;\s+reduce/, 2)
179-
# TODO: split without removing 'reduce'
180-
[map_func, "reduce#{reduce_func}"]
181-
end
182-
183200
# run in parallel, then join results
184201
def remote_maps(map_func)
185202
# TODO: is ruby array threadsafe?
@@ -215,30 +232,110 @@ def cluster(nodes)
215232
end
216233
end
217234

218-
def get(key)
235+
# return a list of successive nodes
236+
# that can also hold this value
237+
def pref_list(n=3)
238+
list = @nodes.clone
239+
while list.first != @name
240+
list << list.shift
241+
end
242+
list[1...n]
243+
end
244+
245+
# with these messages we don't look for the
246+
# proper node, we just add the value
247+
def get(key, n=1)
248+
# 0 means get locally
249+
if n == 0
250+
puts "get 0 #{key}"
251+
return @data[hash(key)] || "null"
252+
end
253+
# if we ask for just one, and it's here, return it!
254+
if n == 1 && value = @data[hash(key)]
255+
puts "get 1 #{key} (local)"
256+
return value
257+
end
219258
node = @ring.node(key)
220259
if node == @name
221-
puts "get #{key}"
222-
@data[hash(key)]
260+
puts "get #{n} #{key}"
261+
results = replicate("get 0 #{key}", n)
262+
results.map! do |r|
263+
r == 'null' ? nil : NodeObject.deserialize(r)
264+
end
265+
results << @data[hash(key)]
266+
results.compact!
267+
begin
268+
results.sort.first
269+
rescue
270+
# a conflic forces sublings
271+
puts "Conflict!"
272+
return results
273+
end
223274
else
224-
remote_call(node, "get #{key}")
275+
remote_call(node, "get #{n} #{key}")
225276
end
226277
end
227278

228-
def put(key, value)
279+
def put(key, vc, value, n=1)
280+
# 0 means insert locally
281+
# use the vclock given
282+
if n == 0
283+
vclock = VectorClock.deserialize(vc)
284+
puts "put 0 #{key} #{vclock} #{value}"
285+
new_obj = [NodeObject.new(value, vclock)]
286+
return @data[hash(key)] = new_obj
287+
end
229288
# check for correct node
230289
node = @ring.node(key)
231290
if node == @name
232-
puts "put #{key} #{value}"
233-
@data[hash(key)] = value
234-
'true'
291+
# we don't care what a given vclock is,
292+
# just grab this node's clock as definitive
293+
vclock = VectorClock.deserialize(vc)
294+
node_object = []
295+
if current_obj = @data[hash(key)]
296+
# if no clock was given, just use the old one
297+
if vclock.empty?
298+
vclock = current_obj.vclock
299+
vclock.increment(@name)
300+
node_object = [NodeObject.new(value, vclock)]
301+
# otherwise, ensure the given one is a decendant of
302+
# an existing clock. If not, create a sibling
303+
else
304+
current_obj = [current_obj] unless Array === current_obj
305+
if current_obj.find{|o| vclock.descends_from?(o.vclock)}
306+
# is a decendant, assume this is resolving a conflict
307+
vclock.increment(@name)
308+
node_object = [NodeObject.new(value, vclock)]
309+
else
310+
# not a decendant of anything, ie conflict. add as a sibling
311+
vclock.increment(@name)
312+
node_object = current_obj + [NodeObject.new(value, vclock)]
313+
end
314+
end
315+
else
316+
vclock.increment(@name)
317+
node_object = [NodeObject.new(value, vclock)]
318+
end
319+
# vclock = old_val = @data[hash(key)] ? old_val.vclock : VectorClock.new
320+
# vclock.increment(@name)
321+
puts "put #{n} #{key} #{vclock} #{value}"
322+
@data[hash(key)] = node_object
323+
replicate("put 0 #{key} #{vclock} #{value}", n)
324+
return new_obj
235325
else
236-
remote_call(node, "put #{key} #{value}")
326+
remote_call(node, "put #{n} #{key} #{vc} #{value}")
327+
end
328+
end
329+
330+
def replicate(message, n)
331+
list = pref_list(n)
332+
results = []
333+
while replicate_node = list.shift
334+
results << remote_call(replicate_node, message)
237335
end
336+
results
238337
end
239338

240-
# TODO: test three nodes
241-
# TODO: keep ports open
242339
def remote_call(node, message)
243340
puts "#{node} <= #{message}"
244341
name, port = node.split ':'
@@ -269,14 +366,13 @@ def close
269366
# we're using ports here, because it's easier to run locally
270367
# however, this could easily be an ip address:port
271368
begin
369+
$n = 3
272370
node, coordinator = ARGV
273371
name, port = node.split ':'
274372
$node = Node.new(node)
275373

276374
trap("SIGINT") { $node.close }
277375

278-
# TODO: gossip?
279-
280376
leader = false
281377
if coordinator.nil?
282378
coordinator = port.to_i - 100

vclock.rb

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require 'json'
2+
13
class VectorClock
24
attr_reader :vector
35
def initialize(vector={})
@@ -9,8 +11,9 @@ def increment(clientId)
911
@vector[clientId] = count + 1
1012
end
1113

14+
# equality is assumed to be a decendant
1215
def descends_from?(vclock2)
13-
(self <=> vclock2) > 0 rescue false
16+
(self <=> vclock2) >= 0 rescue false
1417
end
1518

1619
def conflicts_with?(vclock2)
@@ -36,17 +39,40 @@ def <=>(vclock2)
3639
end
3740
raise "Conflict"
3841
end
42+
43+
def serialize
44+
@vector.to_json
45+
end
46+
47+
def self.deserialize(serialized)
48+
VectorClock.new(JSON.parse(serialized))
49+
end
50+
51+
def empty?
52+
vector.empty?
53+
end
54+
55+
def to_s
56+
serialize
57+
end
3958
end
4059

41-
vc = VectorClock.new
42-
vc.increment("adam")
43-
vc.increment("barb")
4460

45-
vc2 = VectorClock.new(vc.vector.clone)
46-
puts vc.descends_from?(vc2)
61+
### run tests
62+
if __FILE__ == $0
63+
64+
vc = VectorClock.new
65+
vc.increment("adam")
66+
vc.increment("barb")
67+
68+
vc2 = VectorClock.deserialize(vc.serialize)
69+
puts vc.descends_from?(vc2)
70+
71+
vc.increment("adam")
72+
puts vc.descends_from?(vc2)
4773

48-
vc.increment("adam")
49-
puts vc.descends_from?(vc2)
74+
vc2.increment("barb")
75+
puts vc2.conflicts_with?(vc)
5076

51-
vc2.increment("barb")
52-
puts vc2.conflicts_with?(vc)
77+
puts vc2.serialize
78+
end

0 commit comments

Comments
 (0)