File tree Expand file tree Collapse file tree 4 files changed +187
-186
lines changed Expand file tree Collapse file tree 4 files changed +187
-186
lines changed Original file line number Diff line number Diff line change 7
7
# How do we live beyond node failure? N Replication
8
8
# How do we know which value is correct? Add Vector Clocks
9
9
10
-
10
+ # CRDT
11
11
# Use PUSH/PULL for AAE
12
12
# Map/Reduce should be the last thing
13
13
# TODO: Make add-ons modules
16
16
# Replace pub/sub ring-leader with req/res gossip protocol
17
17
# Allow sloppy quorums and data transfer of downed nodes with hinted handoff
18
18
19
- port = 4000
19
+ # port = 4000
20
+ port = 2201
20
21
ctx = ZMQ ::Context . new
21
22
req = ctx . socket ( ZMQ ::REQ )
22
23
req . connect ( "tcp://127.0.0.1:#{ port } " )
37
38
# resp = req.send("put 2 key {\"A:2201\":2,\"B:2201\":2} hello") && req.recv
38
39
39
40
# Counters
40
- req . send ( "put 1 key {\" A:2201\" :1} +1" ) && req . recv
41
- req . send ( "put 1 key {\" B:2202\" :1} +1" ) && req . recv
41
+ # req.send("put 1 key {\"A:2201\":1} +1") && req.recv
42
+ # req.send("put 1 key {\"B:2202\":1} +1") && req.recv
43
+ req . send ( "put 1 key {} +1" ) && req . recv
44
+
42
45
vals = req . send ( "get 2 key" ) && req . recv
43
46
p JSON . parse ( vals )
44
47
p JSON . parse ( vals ) . reduce ( 0 ) { |sum , v | sum + v [ 'value' ] . to_i }
Original file line number Diff line number Diff line change
1
+
2
+ class Map
3
+ def initialize ( func_str , data )
4
+ @data = data
5
+ @func = func_str
6
+ end
7
+
8
+ def call
9
+ eval ( @func , binding )
10
+ end
11
+
12
+ # calls given map block for every value
13
+ def map
14
+ @results = [ ]
15
+ @data . each { |k , v | @results += yield k , v }
16
+ @results
17
+ end
18
+ end
19
+
20
+ class Reduce
21
+ def initialize ( func_str , results )
22
+ @results = results
23
+ @func = func_str
24
+ end
25
+
26
+ def call
27
+ eval ( @func , binding )
28
+ end
29
+
30
+ # calls given reduce block for every value
31
+ def reduce
32
+ yield @results
33
+ end
34
+ end
35
+
36
+
37
+
38
+ module Mapreduce
39
+ def map ( socket , payload )
40
+ socket . send ( Map . new ( payload , @data ) . call . to_s )
41
+ end
42
+
43
+ def mr ( socket , payload )
44
+ map_func , reduce_func = payload . split ( /\; \s +reduce/ , 2 )
45
+ reduce_func = "reduce#{ reduce_func } "
46
+ socket . send ( Reduce . new ( reduce_func , remote_maps ( map_func ) ) . call . to_s )
47
+ end
48
+
49
+ # run in parallel, then join results
50
+ def remote_maps ( map_func )
51
+ workers , results = [ ] , [ ]
52
+ @nodes . each do |node |
53
+ workers << Thread . new do
54
+ res = remote_call ( node , "map #{ map_func } " )
55
+ results += eval ( res )
56
+ end
57
+ end
58
+ workers . each { |w | w . join }
59
+ results
60
+ end
61
+ end
Original file line number Diff line number Diff line change 1
1
require 'digest/sha1'
2
2
3
3
class MerkleTree
4
- attr :nodes
5
- attr :blocks
6
- attr :block_size
7
- attr :root
4
+ attr :nodes , :blocks , :block_size , :root
8
5
9
6
def initialize ( block_size = 1024 )
10
7
@block_size = block_size
@@ -113,7 +110,12 @@ def add_level(this_level, next_level)
113
110
end
114
111
end
115
112
116
- mt = MerkleTree . new ( )
117
- ARGF . each { |line | mt . add_line ( line ) }
118
- mt . finish
119
- mt . output
113
+ ### run tests
114
+ if __FILE__ == $0
115
+
116
+ mt = MerkleTree . new ( )
117
+ ARGF . each { |line | mt . add_line ( line ) }
118
+ mt . finish
119
+ mt . output
120
+
121
+ end
You can’t perform that action at this time.
0 commit comments