6
6
import shutil
7
7
import socket
8
8
import subprocess
9
- import sys
10
9
import tempfile
11
10
from threading import Thread , Event
12
11
import time
@@ -44,6 +43,7 @@ def run(self):
44
43
# Create the log directory
45
44
logDir = os .path .join (self .tmpDir , 'logs' )
46
45
os .mkdir (logDir )
46
+ stdout = open (os .path .join (logDir , 'stdout' ), 'w' )
47
47
48
48
# Create the config file
49
49
logConfig = "test/resources/log4j.properties"
@@ -63,15 +63,15 @@ def run(self):
63
63
(rlist , wlist , xlist ) = select .select ([proc .stdout ], [], [], 1 )
64
64
if proc .stdout in rlist :
65
65
read = proc .stdout .readline ()
66
- sys . stdout .write (read )
66
+ stdout .write (read )
67
67
self .capture += read
68
68
69
69
if self .shouldDie .is_set ():
70
70
proc .terminate ()
71
71
killed = True
72
72
73
73
if proc .poll () is not None :
74
- shutil .rmtree (self .tmpDir )
74
+ # shutil.rmtree(self.tmpDir)
75
75
if killed :
76
76
break
77
77
else :
@@ -206,13 +206,14 @@ def test_offset_request(self):
206
206
t1 = int (time .time ()* 1000 ) # now
207
207
t2 = t1 + 60000 # one minute from now
208
208
req = OffsetRequest ("test-offset-request" , 0 , t1 , 1024 )
209
- print self .kafka .get_offsets (req )
209
+ self .kafka .get_offsets (req )
210
210
211
211
req = OffsetRequest ("test-offset-request" , 0 , t2 , 1024 )
212
- print self .kafka .get_offsets (req )
212
+ self .kafka .get_offsets (req )
213
213
214
214
def test_10k_messages (self ):
215
215
msg_tmpl = "this is a test message with a few bytes in it. this is message number %d"
216
+ # TODO 10k actually fails, why?
216
217
msg = KafkaClient .create_gzip_message (* [msg_tmpl % i for i in range (1000 )])
217
218
req = ProduceRequest ("test-10k" , 0 , [msg ])
218
219
self .kafka .send_message_set (req )
0 commit comments