Skip to content

Commit 0aa8fc5

Browse files
author
Nathan Marz
committed
encoding stuff
1 parent 55847da commit 0aa8fc5

File tree

5 files changed

+536
-532
lines changed

5 files changed

+536
-532
lines changed

src/dev/resources/tester_bolt.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# This Python file uses the following encoding: utf-8
2+
13
import storm
24
from random import random
35

@@ -7,6 +9,7 @@ def initialize(self, conf, context):
79

810
def process(self, tup):
911
word = tup.values[0];
12+
storm.log("BBB: " + word + " " + str(ord(word[-1])))
1013
if (random() < 0.75):
1114
storm.emit([word + 'lalala'], anchors=[tup])
1215
storm.ack(tup)

src/dev/resources/tester_spout.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
# This Python file uses the following encoding: utf-8
2+
13
from storm import Spout, emit, log
24
from random import choice
35
from time import sleep
46
from uuid import uuid4
57

6-
words = ["nathan", "mike", "jackson", "golda", "bertels"]
8+
words = ["nathan", "mike", "jackson", "golda", "bertels人"]
79

810
class TesterSpout(Spout):
911
def initialize(self, conf, context):
@@ -15,6 +17,7 @@ def nextTuple(self):
1517
word = choice(words)
1618
id = str(uuid4())
1719
self.pending[id] = word
20+
log("SSS: " + word + " " + str(ord(word[-1])))
1821
emit([word], id=id)
1922

2023
def ack(self, id):

src/jvm/backtype/storm/utils/ShellProcess.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,11 @@ public void writeObject(Object obj) throws IOException {
4444
writeString(JSONValue.toJSONString(obj));
4545
}
4646

47-
public void writeString(String string) throws IOException {
48-
// don't need synchronization for now
49-
//synchronized (processIn) {
50-
processIn.writeBytes(string + "\nend\n");
51-
processIn.flush();
52-
//}
47+
public void writeString(String str) throws IOException {
48+
byte[] strBytes = str.getBytes("UTF-8");
49+
processIn.write(strBytes, 0, strBytes.length);
50+
processIn.writeBytes("\nend\n");
51+
processIn.flush();
5352
}
5453

5554
// returns null for sync. odd?

src/multilang/py/storm.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@
44
from collections import deque
55

66
try:
7-
import cjson
8-
json_encode = cjson.encode
9-
json_decode = lambda x: cjson.decode(x, all_unicode=True)
7+
import simplejson as json
108
except ImportError:
119
import json
12-
json_encode = lambda x: json.dumps(x, ensure_ascii=False)
13-
json_decode = lambda x: json.loads(unicode(x))
10+
11+
json_encode = lambda x: json.dumps(x)
12+
json_decode = lambda x: json.loads(x)
1413

1514
def readStringMsg():
1615
msg = ""
@@ -62,7 +61,7 @@ def sendToParent(s):
6261
print s
6362
print "end"
6463
sys.stdout.flush()
65-
64+
6665
def sync():
6766
print "sync"
6867
sys.stdout.flush()
@@ -134,7 +133,7 @@ def initComponent():
134133
sendpid(heartbeatdir)
135134
return readenv()
136135

137-
class Tuple:
136+
class Tuple:
138137
def __init__(self, id, component, stream, task, values):
139138
self.id = id
140139
self.component = component
@@ -150,10 +149,10 @@ def __repr__(self):
150149
class Bolt:
151150
def initialize(self, stormconf, context):
152151
pass
153-
152+
154153
def process(self, tuple):
155154
pass
156-
155+
157156
def run(self):
158157
global MODE
159158
MODE = Bolt
@@ -164,15 +163,15 @@ def run(self):
164163
tup = readTuple()
165164
self.process(tup)
166165
except Exception, e:
167-
log(traceback.format_exc(e))
166+
log(traceback.format_exc(e))
168167

169168
class BasicBolt:
170169
def initialize(self, stormconf, context):
171170
pass
172-
171+
173172
def process(self, tuple):
174173
pass
175-
174+
176175
def run(self):
177176
global MODE
178177
MODE = Bolt

0 commit comments

Comments
 (0)