Skip to content

Commit 7eb466b

Browse files
committed
Fix vclock/sibling code, extract mapreduce
1 parent bfb74e0 commit 7eb466b

File tree

4 files changed

+187
-186
lines changed

4 files changed

+187
-186
lines changed

client.rb

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
# How do we live beyond node failure? N Replication
88
# How do we know which value is correct? Add Vector Clocks
99

10-
10+
# CRDT
1111
# Use PUSH/PULL for AAE
1212
# Map/Reduce should be the last thing
1313
# TODO: Make add-ons modules
@@ -16,7 +16,8 @@
1616
# Replace pub/sub ring-leader with req/res gossip protocol
1717
# Allow sloppy quorums and data transfer of downed nodes with hinted handoff
1818

19-
port = 4000
19+
# port = 4000
20+
port = 2201
2021
ctx = ZMQ::Context.new
2122
req = ctx.socket(ZMQ::REQ)
2223
req.connect("tcp://127.0.0.1:#{port}")
@@ -37,8 +38,10 @@
3738
# resp = req.send("put 2 key {\"A:2201\":2,\"B:2201\":2} hello") && req.recv
3839

3940
# Counters
40-
req.send("put 1 key {\"A:2201\":1} +1") && req.recv
41-
req.send("put 1 key {\"B:2202\":1} +1") && req.recv
41+
# req.send("put 1 key {\"A:2201\":1} +1") && req.recv
42+
# req.send("put 1 key {\"B:2202\":1} +1") && req.recv
43+
req.send("put 1 key {} +1") && req.recv
44+
4245
vals = req.send("get 2 key") && req.recv
4346
p JSON.parse(vals)
4447
p JSON.parse(vals).reduce(0){|sum,v| sum + v['value'].to_i}

mapreduce.rb

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
2+
class Map
3+
def initialize(func_str, data)
4+
@data = data
5+
@func = func_str
6+
end
7+
8+
def call
9+
eval(@func, binding)
10+
end
11+
12+
# calls given map block for every value
13+
def map
14+
@results = []
15+
@data.each {|k,v| @results += yield k,v }
16+
@results
17+
end
18+
end
19+
20+
class Reduce
21+
def initialize(func_str, results)
22+
@results = results
23+
@func = func_str
24+
end
25+
26+
def call
27+
eval(@func, binding)
28+
end
29+
30+
# calls given reduce block for every value
31+
def reduce
32+
yield @results
33+
end
34+
end
35+
36+
37+
38+
module Mapreduce
39+
def map(socket, payload)
40+
socket.send( Map.new(payload, @data).call.to_s )
41+
end
42+
43+
def mr(socket, payload)
44+
map_func, reduce_func = payload.split(/\;\s+reduce/, 2)
45+
reduce_func = "reduce#{reduce_func}"
46+
socket.send( Reduce.new(reduce_func, remote_maps(map_func)).call.to_s )
47+
end
48+
49+
# run in parallel, then join results
50+
def remote_maps(map_func)
51+
workers, results = [], []
52+
@nodes.each do |node|
53+
workers << Thread.new do
54+
res = remote_call(node, "map #{map_func}")
55+
results += eval(res)
56+
end
57+
end
58+
workers.each{|w| w.join}
59+
results
60+
end
61+
end

merkle.rb

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
require 'digest/sha1'
22

33
class MerkleTree
4-
attr :nodes
5-
attr :blocks
6-
attr :block_size
7-
attr :root
4+
attr :nodes, :blocks, :block_size, :root
85

96
def initialize(block_size=1024)
107
@block_size = block_size
@@ -113,7 +110,12 @@ def add_level(this_level, next_level)
113110
end
114111
end
115112

116-
mt = MerkleTree.new()
117-
ARGF.each { |line| mt.add_line(line) }
118-
mt.finish
119-
mt.output
113+
### run tests
114+
if __FILE__ == $0
115+
116+
mt = MerkleTree.new()
117+
ARGF.each { |line| mt.add_line(line) }
118+
mt.finish
119+
mt.output
120+
121+
end

0 commit comments

Comments
 (0)