|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +import sys |
| 3 | +if sys.version_info < (2, 7): |
| 4 | + import unittest2 as unittest |
| 5 | +else: |
| 6 | + import unittest |
| 7 | + |
| 8 | +import mock |
| 9 | +import tempfile |
| 10 | +import logging |
| 11 | + |
| 12 | +import beaver |
| 13 | +from beaver.config import BeaverConfig |
| 14 | +from beaver.transports import create_transport |
| 15 | +from beaver.unicode_dammit import unicode_dammit |
| 16 | + |
| 17 | +from fixtures import Fixture |
| 18 | + |
| 19 | +from moto import mock_kinesis |
| 20 | +import boto.kinesis |
| 21 | + |
| 22 | +class KinesisTests(unittest.TestCase): |
| 23 | + |
| 24 | + @mock_kinesis |
| 25 | + def _create_streams(self): |
| 26 | + conn = boto.kinesis.connect_to_region("us-east-1") |
| 27 | + conn.create_stream("stream1", 1) |
| 28 | + conn.create_stream("stream2", 1) |
| 29 | + |
| 30 | + @classmethod |
| 31 | + def setUpClass(cls): |
| 32 | + cls.logger = logging.getLogger(__name__) |
| 33 | + |
| 34 | + empty_conf = tempfile.NamedTemporaryFile(delete=True) |
| 35 | + cls.beaver_config = BeaverConfig(mock.Mock(config=empty_conf.name)) |
| 36 | + cls.beaver_config.set('transport', 'kinesis') |
| 37 | + cls.beaver_config.set('logstash_version', 1) |
| 38 | + |
| 39 | + output_file = Fixture.download_official_distribution() |
| 40 | + Fixture.extract_distribution(output_file) |
| 41 | + |
| 42 | + @mock_kinesis |
| 43 | + def test_kinesis_default_auth_profile(self): |
| 44 | + self._create_streams() |
| 45 | + self.beaver_config.set('kinesis_aws_profile_name', None) |
| 46 | + self.beaver_config.set('kinesis_aws_access_key', None) |
| 47 | + self.beaver_config.set('kinesis_aws_secret_key', None) |
| 48 | + self.beaver_config.set('kinesis_aws_stream', 'stream1') |
| 49 | + |
| 50 | + transport = create_transport(self.beaver_config, logger=self.logger) |
| 51 | + |
| 52 | + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) |
| 53 | + transport.interrupt() |
| 54 | + |
| 55 | + @mock_kinesis |
| 56 | + def test_kinesis_auth_profile(self): |
| 57 | + self._create_streams() |
| 58 | + self.beaver_config.set('kinesis_aws_profile_name', 'beaver_stream') |
| 59 | + self.beaver_config.set('kinesis_aws_access_key', None) |
| 60 | + self.beaver_config.set('kinesis_aws_secret_key', None) |
| 61 | + self.beaver_config.set('kinesis_aws_stream', 'stream1') |
| 62 | + |
| 63 | + transport = create_transport(self.beaver_config, logger=self.logger) |
| 64 | + |
| 65 | + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) |
| 66 | + |
| 67 | + @mock_kinesis |
| 68 | + def test_kinesis_auth_key(self): |
| 69 | + self._create_streams() |
| 70 | + self.beaver_config.set('kinesis_aws_profile_name', None) |
| 71 | + self.beaver_config.set('kinesis_aws_access_key', 'beaver_test_key') |
| 72 | + self.beaver_config.set('kinesis_aws_secret_key', 'beaver_test_secret') |
| 73 | + self.beaver_config.set('kinesis_aws_stream', 'stream1') |
| 74 | + |
| 75 | + transport = create_transport(self.beaver_config, logger=self.logger) |
| 76 | + |
| 77 | + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) |
| 78 | + transport.interrupt() |
| 79 | + |
| 80 | + @mock_kinesis |
| 81 | + def test_kinesis_auth_account_id(self): |
| 82 | + self._create_streams() |
| 83 | + self.beaver_config.set('kinesis_aws_stream_owner_acct_id', 'abc123') |
| 84 | + self.beaver_config.set('kinesis_aws_profile_name', None) |
| 85 | + self.beaver_config.set('kinesis_aws_access_key', 'beaver_test_key') |
| 86 | + self.beaver_config.set('kinesis_aws_secret_key', 'beaver_test_secret') |
| 87 | + self.beaver_config.set('kinesis_aws_stream', 'stream1') |
| 88 | + |
| 89 | + transport = create_transport(self.beaver_config, logger=self.logger) |
| 90 | + |
| 91 | + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) |
| 92 | + transport.interrupt() |
| 93 | + |
| 94 | + @mock_kinesis |
| 95 | + def test_kinesis_send_stream(self): |
| 96 | + self._create_streams() |
| 97 | + self.beaver_config.set('kinesis_aws_stream', 'stream1') |
| 98 | + self.beaver_config.set('kinesis_aws_profile_name', None) |
| 99 | + self.beaver_config.set('kinesis_aws_access_key', None) |
| 100 | + self.beaver_config.set('kinesis_aws_secret_key', None) |
| 101 | + self.beaver_config.set('kinesis_bulk_lines', False) |
| 102 | + |
| 103 | + transport = create_transport(self.beaver_config, logger=self.logger) |
| 104 | + mock_send_batch = mock.Mock() |
| 105 | + transport._send_message_batch = mock_send_batch |
| 106 | + |
| 107 | + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) |
| 108 | + |
| 109 | + data = {} |
| 110 | + lines = [] |
| 111 | + n=500 |
| 112 | + for i in range(n): |
| 113 | + lines.append('log' + str(i) + '\n') |
| 114 | + new_lines = [] |
| 115 | + for line in lines: |
| 116 | + message = unicode_dammit(line) |
| 117 | + if len(message) == 0: |
| 118 | + continue |
| 119 | + new_lines.append(message) |
| 120 | + data['lines'] = new_lines |
| 121 | + data['fields'] = [] |
| 122 | + self.assertTrue(transport.callback("test.log", **data)) |
| 123 | + self.assertEqual(1, mock_send_batch.call_count) |
| 124 | + |
| 125 | + |
| 126 | + @mock_kinesis |
| 127 | + def test_kinesis_send_stream_with_record_count_cutoff(self): |
| 128 | + self._create_streams() |
| 129 | + self.beaver_config.set('kinesis_aws_stream', 'stream1') |
| 130 | + self.beaver_config.set('kinesis_aws_profile_name', None) |
| 131 | + self.beaver_config.set('kinesis_aws_access_key', None) |
| 132 | + self.beaver_config.set('kinesis_aws_secret_key', None) |
| 133 | + self.beaver_config.set('kinesis_bulk_lines', False) |
| 134 | + |
| 135 | + transport = create_transport(self.beaver_config, logger=self.logger) |
| 136 | + mock_send_batch = mock.Mock() |
| 137 | + transport._send_message_batch = mock_send_batch |
| 138 | + |
| 139 | + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) |
| 140 | + |
| 141 | + data = {} |
| 142 | + lines = [] |
| 143 | + n = 501 |
| 144 | + for i in range(n): |
| 145 | + lines.append('log' + str(i) + '\n') |
| 146 | + new_lines = [] |
| 147 | + for line in lines: |
| 148 | + message = unicode_dammit(line) |
| 149 | + if len(message) == 0: |
| 150 | + continue |
| 151 | + new_lines.append(message) |
| 152 | + data['lines'] = new_lines |
| 153 | + data['fields'] = [] |
| 154 | + self.assertTrue(transport.callback("test.log", **data)) |
| 155 | + self.assertEqual(2, mock_send_batch.call_count) |
0 commit comments