Skip to content

Commit 0e49d98

Browse files
authored
Merge pull request #416 from gregsterin/fix-kinesis-transport-max-records
Limiting number of records in batch to 500 as this is the kinesis limit.
2 parents f888dc9 + 273d392 commit 0e49d98

File tree

2 files changed

+159
-1
lines changed

2 files changed

+159
-1
lines changed
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# -*- coding: utf-8 -*-
2+
import sys
3+
if sys.version_info < (2, 7):
4+
import unittest2 as unittest
5+
else:
6+
import unittest
7+
8+
import mock
9+
import tempfile
10+
import logging
11+
12+
import beaver
13+
from beaver.config import BeaverConfig
14+
from beaver.transports import create_transport
15+
from beaver.unicode_dammit import unicode_dammit
16+
17+
from fixtures import Fixture
18+
19+
from moto import mock_kinesis
20+
import boto.kinesis
21+
22+
class KinesisTests(unittest.TestCase):
23+
24+
@mock_kinesis
25+
def _create_streams(self):
26+
conn = boto.kinesis.connect_to_region("us-east-1")
27+
conn.create_stream("stream1", 1)
28+
conn.create_stream("stream2", 1)
29+
30+
@classmethod
31+
def setUpClass(cls):
32+
cls.logger = logging.getLogger(__name__)
33+
34+
empty_conf = tempfile.NamedTemporaryFile(delete=True)
35+
cls.beaver_config = BeaverConfig(mock.Mock(config=empty_conf.name))
36+
cls.beaver_config.set('transport', 'kinesis')
37+
cls.beaver_config.set('logstash_version', 1)
38+
39+
output_file = Fixture.download_official_distribution()
40+
Fixture.extract_distribution(output_file)
41+
42+
@mock_kinesis
43+
def test_kinesis_default_auth_profile(self):
44+
self._create_streams()
45+
self.beaver_config.set('kinesis_aws_profile_name', None)
46+
self.beaver_config.set('kinesis_aws_access_key', None)
47+
self.beaver_config.set('kinesis_aws_secret_key', None)
48+
self.beaver_config.set('kinesis_aws_stream', 'stream1')
49+
50+
transport = create_transport(self.beaver_config, logger=self.logger)
51+
52+
self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)
53+
transport.interrupt()
54+
55+
@mock_kinesis
56+
def test_kinesis_auth_profile(self):
57+
self._create_streams()
58+
self.beaver_config.set('kinesis_aws_profile_name', 'beaver_stream')
59+
self.beaver_config.set('kinesis_aws_access_key', None)
60+
self.beaver_config.set('kinesis_aws_secret_key', None)
61+
self.beaver_config.set('kinesis_aws_stream', 'stream1')
62+
63+
transport = create_transport(self.beaver_config, logger=self.logger)
64+
65+
self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)
66+
67+
@mock_kinesis
68+
def test_kinesis_auth_key(self):
69+
self._create_streams()
70+
self.beaver_config.set('kinesis_aws_profile_name', None)
71+
self.beaver_config.set('kinesis_aws_access_key', 'beaver_test_key')
72+
self.beaver_config.set('kinesis_aws_secret_key', 'beaver_test_secret')
73+
self.beaver_config.set('kinesis_aws_stream', 'stream1')
74+
75+
transport = create_transport(self.beaver_config, logger=self.logger)
76+
77+
self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)
78+
transport.interrupt()
79+
80+
@mock_kinesis
81+
def test_kinesis_auth_account_id(self):
82+
self._create_streams()
83+
self.beaver_config.set('kinesis_aws_stream_owner_acct_id', 'abc123')
84+
self.beaver_config.set('kinesis_aws_profile_name', None)
85+
self.beaver_config.set('kinesis_aws_access_key', 'beaver_test_key')
86+
self.beaver_config.set('kinesis_aws_secret_key', 'beaver_test_secret')
87+
self.beaver_config.set('kinesis_aws_stream', 'stream1')
88+
89+
transport = create_transport(self.beaver_config, logger=self.logger)
90+
91+
self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)
92+
transport.interrupt()
93+
94+
@mock_kinesis
95+
def test_kinesis_send_stream(self):
96+
self._create_streams()
97+
self.beaver_config.set('kinesis_aws_stream', 'stream1')
98+
self.beaver_config.set('kinesis_aws_profile_name', None)
99+
self.beaver_config.set('kinesis_aws_access_key', None)
100+
self.beaver_config.set('kinesis_aws_secret_key', None)
101+
self.beaver_config.set('kinesis_bulk_lines', False)
102+
103+
transport = create_transport(self.beaver_config, logger=self.logger)
104+
mock_send_batch = mock.Mock()
105+
transport._send_message_batch = mock_send_batch
106+
107+
self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)
108+
109+
data = {}
110+
lines = []
111+
n=500
112+
for i in range(n):
113+
lines.append('log' + str(i) + '\n')
114+
new_lines = []
115+
for line in lines:
116+
message = unicode_dammit(line)
117+
if len(message) == 0:
118+
continue
119+
new_lines.append(message)
120+
data['lines'] = new_lines
121+
data['fields'] = []
122+
self.assertTrue(transport.callback("test.log", **data))
123+
self.assertEqual(1, mock_send_batch.call_count)
124+
125+
126+
@mock_kinesis
127+
def test_kinesis_send_stream_with_record_count_cutoff(self):
128+
self._create_streams()
129+
self.beaver_config.set('kinesis_aws_stream', 'stream1')
130+
self.beaver_config.set('kinesis_aws_profile_name', None)
131+
self.beaver_config.set('kinesis_aws_access_key', None)
132+
self.beaver_config.set('kinesis_aws_secret_key', None)
133+
self.beaver_config.set('kinesis_bulk_lines', False)
134+
135+
transport = create_transport(self.beaver_config, logger=self.logger)
136+
mock_send_batch = mock.Mock()
137+
transport._send_message_batch = mock_send_batch
138+
139+
self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport)
140+
141+
data = {}
142+
lines = []
143+
n = 501
144+
for i in range(n):
145+
lines.append('log' + str(i) + '\n')
146+
new_lines = []
147+
for line in lines:
148+
message = unicode_dammit(line)
149+
if len(message) == 0:
150+
continue
151+
new_lines.append(message)
152+
data['lines'] = new_lines
153+
data['fields'] = []
154+
self.assertTrue(transport.callback("test.log", **data))
155+
self.assertEqual(2, mock_send_batch.call_count)

beaver/transports/kinesis_transport.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ def __init__(self, beaver_config, logger=None):
2020
# self-imposed max batch size to minimize the number of records in a given call to Kinesis
2121
self._batch_size_max = beaver_config.get('kinesis_aws_batch_size_max', '512000')
2222

23+
# Kinesis Limit http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html#API_PutRecords_RequestSyntax
24+
self._max_records_per_batch = 500
25+
2326
try:
2427
if self._access_key is None and self._secret_key is None:
2528
self._connection = boto.kinesis.connect_to_region(self._region)
@@ -55,7 +58,7 @@ def callback(self, filename, lines, **kwargs):
5558
continue
5659

5760
# Check the self-enforced/declared batch size and flush before moving forward if we've eclipsed the max
58-
if (len(message_batch) > 0) and ((message_batch_size + message_size) >= self._batch_size_max):
61+
if len(message_batch) > 0 and ((message_batch_size + message_size) >= self._batch_size_max or len(message_batch) == self._max_records_per_batch):
5962
self._logger.debug('Flushing {0} messages to Kinesis stream {1} bytes'.format(
6063
len(message_batch), message_batch_size))
6164
self._send_message_batch(message_batch)

0 commit comments

Comments
 (0)