@@ -132,6 +132,7 @@ def do_get(key, n=1, crdt=nil)
132
132
results . flatten!
133
133
results . uniq! { |o | o && o . value }
134
134
results . compact!
135
+ repair ( key , n )
135
136
begin
136
137
# we get the most recent value if we can resolve this
137
138
[ results . sort . first ]
@@ -146,15 +147,34 @@ def do_get(key, n=1, crdt=nil)
146
147
end
147
148
148
149
def replicate ( message , key , n )
149
- list = @ring . pref_list ( key , n )
150
- list -= [ @name ]
150
+ list = @ring . pref_list ( key , n ) - [ @name ]
151
151
results = [ ]
152
152
while replicate_node = list . shift
153
153
results << remote_call ( replicate_node , message )
154
154
end
155
155
results
156
156
end
157
157
158
+ def repair ( key , n )
159
+ list = @ring . pref_list ( key , n ) - [ @name ]
160
+ puts "Repairing #{ key } "
161
+ list . map do |replicate_node |
162
+ Thread . new do
163
+ result = remote_call ( replicate_node , "get 0 #{ key } " )
164
+ remote_objs = NodeObject . deserialize ( result )
165
+ if remote_objs != 'null'
166
+ # if local is nil or descends, update local
167
+ local = @data [ @ring . hash ( key ) ]
168
+ vclock = local && local . first . vclock
169
+ descends = remote_objs . find { |o | o . vclock . descends_from? ( vclock ) }
170
+ if vclock == nil || descends
171
+ @data [ @ring . hash ( key ) ] = nos
172
+ end
173
+ end
174
+ end
175
+ end
176
+ end
177
+
158
178
def remote_call ( remote_name , message )
159
179
puts "#{ remote_name } <= #{ message } "
160
180
remote_port = config ( remote_name ) [ "port" ]
0 commit comments