Skip to content

Commit ac454a1

Browse files
committed
Add read repair
1 parent 746c75b commit ac454a1

File tree

2 files changed

+35
-6
lines changed

2 files changed

+35
-6
lines changed

09-repair/client.rb

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,19 @@
44
ctx = ZMQ::Context.new
55
req = ctx.socket(ZMQ::REQ)
66
req.connect("tcp://127.0.0.1:2200")
7+
req.send("put 0 foo {\"B\":1} baz") && req.recv
8+
req.close
9+
10+
req = ctx.socket(ZMQ::REQ)
11+
req.connect("tcp://127.0.0.1:2201")
12+
req.send("put 0 foo {} qux") && req.recv
13+
14+
# trigger read repair
15+
puts req.send("get 2 foo") && req.recv
16+
17+
sleep 1
718

8-
req.send("put_counter 1 foo +1") && req.recv
9-
req.send("put_counter 1 foo +2") && req.recv
10-
req.send("put_counter 2 foo +1") && req.recv
11-
puts req.send("get_counter 2 foo") && req.recv
19+
# read repair should be complete
20+
puts req.send("get 2 foo") && req.recv
1221

1322
req.close

09-repair/node.rb

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def do_get(key, n=1, crdt=nil)
132132
results.flatten!
133133
results.uniq! {|o| o && o.value }
134134
results.compact!
135+
repair(key, n)
135136
begin
136137
# we get the most recent value if we can resolve this
137138
[results.sort.first]
@@ -146,15 +147,34 @@ def do_get(key, n=1, crdt=nil)
146147
end
147148

148149
def replicate(message, key, n)
149-
list = @ring.pref_list(key, n)
150-
list -= [@name]
150+
list = @ring.pref_list(key, n) - [@name]
151151
results = []
152152
while replicate_node = list.shift
153153
results << remote_call(replicate_node, message)
154154
end
155155
results
156156
end
157157

158+
def repair(key, n)
159+
list = @ring.pref_list(key, n) - [@name]
160+
puts "Repairing #{key}"
161+
list.map do |replicate_node|
162+
Thread.new do
163+
result = remote_call(replicate_node, "get 0 #{key}")
164+
remote_objs = NodeObject.deserialize(result)
165+
if remote_objs != 'null'
166+
# if local is nil or descends, update local
167+
local = @data[@ring.hash(key)]
168+
vclock = local && local.first.vclock
169+
descends = remote_objs.find{|o| o.vclock.descends_from?(vclock)}
170+
if vclock == nil || descends
171+
@data[@ring.hash(key)] = nos
172+
end
173+
end
174+
end
175+
end
176+
end
177+
158178
def remote_call(remote_name, message)
159179
puts "#{remote_name} <= #{message}"
160180
remote_port = config(remote_name)["port"]

0 commit comments

Comments
 (0)