Skip to content

Commit efcf58b

Browse files
committed
Attempt to fix travis build. Decrease complexity of service.py in favor of in memory logging. Address code review concerns
1 parent 99320fb commit efcf58b

File tree

7 files changed

+38
-66
lines changed

7 files changed

+38
-66
lines changed

kafka/client.py

-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
from functools import partial
88
from itertools import count
9-
from kafka.common import *
10-
119
from kafka.common import (TopicAndPartition,
1210
ConnectionError, FailedPayloadsError,
1311
PartitionUnavailableError,

kafka/common.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,16 @@ class StaleControllerEpochError(BrokerResponseError):
121121
message = 'STALE_CONTROLLER_EPOCH'
122122

123123

124-
class OffsetMetadataTooLarge(BrokerResponseError):
124+
class OffsetMetadataTooLargeError(BrokerResponseError):
125125
errno = 12
126126
message = 'OFFSET_METADATA_TOO_LARGE'
127127

128128

129+
class StaleLeaderEpochCodeError(BrokerResponseError):
130+
errno = 13
131+
message = 'STALE_LEADER_EPOCH_CODE'
132+
133+
129134
class KafkaUnavailableError(KafkaError):
130135
pass
131136

@@ -178,7 +183,8 @@ class ProtocolError(KafkaError):
178183
9 : ReplicaNotAvailableError,
179184
10 : MessageSizeTooLargeError,
180185
11 : StaleControllerEpochError,
181-
12 : OffsetMetadataTooLarge,
186+
12 : OffsetMetadataTooLargeError,
187+
13 : StaleLeaderEpochCodeError,
182188
}
183189

184190
def check_error(response):

kafka/consumer.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -430,12 +430,12 @@ def _fetch(self):
430430
# Put the message in our queue
431431
self.queue.put((partition, message))
432432
self.fetch_offsets[partition] = message.offset + 1
433-
except ConsumerFetchSizeTooSmall as e:
433+
except ConsumerFetchSizeTooSmall:
434434
if (self.max_buffer_size is not None and
435435
self.buffer_size == self.max_buffer_size):
436436
log.error("Max fetch size %d too small",
437437
self.max_buffer_size)
438-
raise e
438+
raise
439439
if self.max_buffer_size is None:
440440
self.buffer_size *= 2
441441
else:

kafka/util.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import sys
2-
import struct
31
import collections
2+
import struct
3+
import sys
44
from threading import Thread, Event
55

66
from kafka.common import BufferUnderflowError

test/fixtures.py

-4
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ def open(self):
9898
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
9999
properties
100100
))
101-
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
102-
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
103101

104102
# Party!
105103
self.out("Starting...")
@@ -180,8 +178,6 @@ def open(self):
180178
self.child = SpawnedService(self.kafka_run_class_args(
181179
"kafka.Kafka", properties
182180
))
183-
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
184-
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
185181

186182
# Party!
187183
self.out("Creating Zookeeper chroot node...")

test/service.py

+25-53
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import re
23
import select
34
import subprocess
@@ -29,43 +30,15 @@ def __init__(self, args=[]):
2930
threading.Thread.__init__(self)
3031

3132
self.args = args
32-
self.captured_stdout = ""
33-
self.captured_stderr = ""
34-
self.stdout_file = None
35-
self.stderr_file = None
36-
self.capture_stdout = True
37-
self.capture_stderr = True
38-
self.show_stdout = True
39-
self.show_stderr = True
33+
self.captured_stdout = []
34+
self.captured_stderr = []
4035

4136
self.should_die = threading.Event()
4237

43-
def configure_stdout(self, file=None, capture=True, show=False):
44-
self.stdout_file = file
45-
self.capture_stdout = capture
46-
self.show_stdout = show
47-
48-
def configure_stderr(self, file=None, capture=False, show=False):
49-
self.stderr_file = file
50-
self.capture_stderr = capture
51-
self.show_stderr = show
52-
5338
def run(self):
54-
stdout_handle = None
55-
stderr_handle = None
56-
try:
57-
if self.stdout_file:
58-
stdout_handle = open(self.stdout_file, "w")
59-
if self.stderr_file:
60-
stderr_handle = open(self.stderr_file, "w")
61-
self.run_with_handles(stdout_handle, stderr_handle)
62-
finally:
63-
if stdout_handle:
64-
stdout_handle.close()
65-
if stderr_handle:
66-
stderr_handle.close()
67-
68-
def run_with_handles(self, stdout_handle, stderr_handle):
39+
self.run_with_handles()
40+
41+
def run_with_handles(self):
6942
self.child = subprocess.Popen(
7043
self.args,
7144
bufsize=1,
@@ -78,35 +51,32 @@ def run_with_handles(self, stdout_handle, stderr_handle):
7851

7952
if self.child.stdout in rds:
8053
line = self.child.stdout.readline()
81-
if stdout_handle:
82-
stdout_handle.write(line)
83-
stdout_handle.flush()
84-
if self.capture_stdout:
85-
self.captured_stdout += line
86-
if self.show_stdout:
87-
sys.stdout.write(line)
88-
sys.stdout.flush()
54+
self.captured_stdout.append(line)
8955

9056
if self.child.stderr in rds:
9157
line = self.child.stderr.readline()
92-
if stderr_handle:
93-
stderr_handle.write(line)
94-
stderr_handle.flush()
95-
if self.capture_stderr:
96-
self.captured_stderr += line
97-
if self.show_stderr:
98-
sys.stderr.write(line)
99-
sys.stderr.flush()
58+
self.captured_stderr.append(line)
10059

10160
if self.should_die.is_set():
10261
self.child.terminate()
10362
alive = False
10463

105-
if self.child.poll() is not None:
64+
poll_results = self.child.poll()
65+
if poll_results is not None:
10666
if not alive:
10767
break
10868
else:
109-
raise RuntimeError("Subprocess has died. Aborting.")
69+
self.dump_logs()
70+
raise RuntimeError("Subprocess has died. Aborting. (args=%s)" % ' '.join(str(x) for x in self.args))
71+
72+
def dump_logs(self):
73+
logging.critical('stderr')
74+
for line in self.captured_stderr:
75+
logging.critical(line.rstrip())
76+
77+
logging.critical('stdout')
78+
for line in self.captured_stdout:
79+
logging.critical(line.rstrip())
11080

11181
def wait_for(self, pattern, timeout=10):
11282
t1 = time.time()
@@ -117,11 +87,13 @@ def wait_for(self, pattern, timeout=10):
11787
self.child.kill()
11888
except:
11989
logging.exception("Received exception when killing child process")
90+
self.dump_logs()
91+
12092
raise RuntimeError("Waiting for %r timed out" % pattern)
12193

122-
if re.search(pattern, self.captured_stdout, re.IGNORECASE) is not None:
94+
if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None:
12395
return
124-
if re.search(pattern, self.captured_stderr, re.IGNORECASE) is not None:
96+
if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None:
12597
return
12698
time.sleep(0.1)
12799

test/test_consumer_integration.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def setUpClass(cls):
2020
cls.server = cls.server1 # Bootstrapping server
2121

2222
@classmethod
23-
def tearDownClass(cls): # noqa
23+
def tearDownClass(cls):
2424
if not os.environ.get('KAFKA_VERSION'):
2525
return
2626

0 commit comments

Comments
 (0)