From 258b4456773cc4760ff0d0fabcc6fcad6f1ab530 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Guz?= Date: Wed, 29 Jan 2020 13:07:51 +0100 Subject: [PATCH 1/5] Add queue overflow handler in asyncsender. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Guz --- fluent/asyncsender.py | 11 +++++++- tests/test_asynchandler.py | 52 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index 7f8dc02..3b460ab 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -55,6 +55,7 @@ def __init__(self, msgpack_kwargs=None, queue_maxsize=DEFAULT_QUEUE_MAXSIZE, queue_circular=DEFAULT_QUEUE_CIRCULAR, + queue_overflow_handler=None, **kwargs): """ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version. @@ -66,6 +67,10 @@ def __init__(self, **kwargs) self._queue_maxsize = queue_maxsize self._queue_circular = queue_circular + if queue_circular and queue_overflow_handler: + self._queue_overflow_handler = queue_overflow_handler + else: + self._queue_overflow_handler = self._queue_overflow_handler_default self._thread_guard = threading.Event() # This ensures visibility across all variables self._closed = False @@ -109,7 +114,8 @@ def _send(self, bytes_): if self._queue_circular and self._queue.full(): # discard oldest try: - self._queue.get(block=False) + discarded_bytes = self._queue.get(block=False) + self._queue_overflow_handler(discarded_bytes) except Empty: # pragma: no cover pass try: @@ -132,5 +138,8 @@ def _send_loop(self): finally: self._close() + def _queue_overflow_handler_default(self, discarded_bytes): + pass + def __exit__(self, exc_type, exc_val, exc_tb): self.close() diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index 52d9182..477b066 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -4,6 +4,9 @@ import sys import unittest +from mock import patch +from unittest import mock + import fluent.asynchandler import fluent.handler from tests import mockserver @@ -309,3 +312,52 @@ def test_simple(self): eq('userB', el[2]['to']) self.assertTrue(el[1]) self.assertTrue(isinstance(el[1], int)) + + +class QueueOverflowException(Exception): + pass + + +def queue_overflow_handler(discarded_bytes): + raise QueueOverflowException(discarded_bytes) + + + + +class TestHandlerWithCircularQueueHandler(unittest.TestCase): + Q_SIZE = 1 + + def setUp(self): + super(TestHandlerWithCircularQueueHandler, self).setUp() + self._server = mockserver.MockRecvServer('localhost') + self._port = self._server.port + + def tearDown(self): + self._server.close() + + def get_handler_class(self): + # return fluent.handler.FluentHandler + return fluent.asynchandler.FluentHandler + + @patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(return_value=True)) + def test_simple(self): + handler = self.get_handler_class()('app.follow', port=self._port, + queue_maxsize=self.Q_SIZE, + queue_circular=True, + queue_overflow_handler=queue_overflow_handler) + with handler: + self.assertEqual(handler.sender.queue_circular, True) + self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) + + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + + log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + with self.assertRaises(QueueOverflowException): + log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) + with self.assertRaises(QueueOverflowException): + log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'}) + From 8e70e6a3addb47a896816b6771ec4c4be251e26d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Guz?= Date: Thu, 30 Jan 2020 18:35:01 +0100 Subject: [PATCH 2/5] Fix tests. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Guz --- tests/test_asynchandler.py | 45 ++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index 477b066..ccd4069 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -4,8 +4,16 @@ import sys import unittest -from mock import patch -from unittest import mock +try: + from unittest import mock +except ImportError: + import mock +try: + from unittest.mock import patch +except ImportError: + from mock import patch + + import fluent.asynchandler import fluent.handler @@ -322,8 +330,6 @@ def queue_overflow_handler(discarded_bytes): raise QueueOverflowException(discarded_bytes) - - class TestHandlerWithCircularQueueHandler(unittest.TestCase): Q_SIZE = 1 @@ -339,25 +345,30 @@ def get_handler_class(self): # return fluent.handler.FluentHandler return fluent.asynchandler.FluentHandler - @patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(return_value=True)) def test_simple(self): handler = self.get_handler_class()('app.follow', port=self._port, queue_maxsize=self.Q_SIZE, queue_circular=True, queue_overflow_handler=queue_overflow_handler) with handler: - self.assertEqual(handler.sender.queue_circular, True) - self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) + def custom_full_queue(): + handler.sender._queue.put(b'Mock', block=True) + return True - logging.basicConfig(level=logging.INFO) - log = logging.getLogger('fluent.test') - handler.setFormatter(fluent.handler.FluentRecordFormatter()) - log.addHandler(handler) + with patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(side_effect=custom_full_queue)): + self.assertEqual(handler.sender.queue_circular, True) + self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE) - log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) - with self.assertRaises(QueueOverflowException): - log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) - log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) - with self.assertRaises(QueueOverflowException): - log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'}) + logging.basicConfig(level=logging.INFO) + log = logging.getLogger('fluent.test') + handler.setFormatter(fluent.handler.FluentRecordFormatter()) + log.addHandler(handler) + + with self.assertRaises(QueueOverflowException): + log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + + with self.assertRaises(QueueOverflowException): + log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + with self.assertRaises(QueueOverflowException): + log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) From 84808c305bca80f78bfd4e39527ed85ea93774c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Guz?= Date: Mon, 24 Feb 2020 11:08:12 +0100 Subject: [PATCH 3/5] Execute queue overflow handler in case of no errors. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Paweł Guz --- fluent/asyncsender.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index 3b460ab..e140774 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -115,9 +115,10 @@ def _send(self, bytes_): # discard oldest try: discarded_bytes = self._queue.get(block=False) - self._queue_overflow_handler(discarded_bytes) except Empty: # pragma: no cover pass + else: + self._queue_overflow_handler(discarded_bytes) try: self._queue.put(bytes_, block=(not self._queue_circular)) except Full: # pragma: no cover From 478bd02ff69c3c4afca25694025f8548898d96ff Mon Sep 17 00:00:00 2001 From: pguz Date: Mon, 25 May 2020 13:21:24 +0200 Subject: [PATCH 4/5] Respect multithreading in asynchandler test. Signed-off-by: pguz --- tests/test_asynchandler.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py index ccd4069..e88a041 100644 --- a/tests/test_asynchandler.py +++ b/tests/test_asynchandler.py @@ -322,7 +322,7 @@ def test_simple(self): self.assertTrue(isinstance(el[1], int)) -class QueueOverflowException(Exception): +class QueueOverflowException(BaseException): pass @@ -364,11 +364,24 @@ def custom_full_queue(): handler.setFormatter(fluent.handler.FluentRecordFormatter()) log.addHandler(handler) - with self.assertRaises(QueueOverflowException): + exc_counter = 0 + + try: log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 - with self.assertRaises(QueueOverflowException): + try: log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 - with self.assertRaises(QueueOverflowException): + try: log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'}) + except QueueOverflowException: + exc_counter += 1 + + # we can't be sure to have exception in every case due to multithreading, + # so we can test only for a cautelative condition here + print('Exception raised: {} (expected 3)'.format(exc_counter)) + assert exc_counter >= 0 From 0568a2624959d35d5c3ecce072ee4e12234b8753 Mon Sep 17 00:00:00 2001 From: Arcadiy Ivanov Date: Wed, 10 Mar 2021 18:41:28 -0500 Subject: [PATCH 5/5] Unpin msgpack version Remove Python 2.7 to remove the msgpack encoding ambiguity issue. Remove Python 3.4 support just because it's dead. fixes #171 Signed-off-by: Arcadiy Ivanov --- .travis.yml | 6 ++---- README.rst | 5 +++-- fluent/asyncsender.py | 12 +++--------- fluent/handler.py | 11 +++-------- fluent/sender.py | 2 -- setup.py | 15 ++++++++------- tests/mockserver.py | 4 +--- 7 files changed, 20 insertions(+), 35 deletions(-) diff --git a/.travis.yml b/.travis.yml index 75861ca..0e5e0c8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,11 @@ sudo: false language: python python: - - "2.7" - - "3.4" - "3.5" - "3.6" - "3.7" - "3.8" - - pypy + - "3.9" - pypy3 - nightly # command to install dependencies, e.g. pip install -r requirements.txt --use-mirrors @@ -27,7 +25,7 @@ deploy: secure: CpNaj4F3TZvpP1aSJWidh/XexrWODV2sBdObrYU79Gyh9hFl6WLsA3JM9BfVsy9cGb/P/jP6ly4Z0/6qdIzZ5D6FPOB1B7rn5GZ2LAMOypRCA6W2uJbRjUU373Wut0p0OmQcMPto6XJsMlpvOEq+1uAq+LLAnAGEmmYTeskZebs= on: tags: true - condition: '"$TRAVIS_PYTHON_VERSION" = "3.8" || "$TRAVIS_PYTHON_VERSION" = "2.7"' + condition: '"$TRAVIS_PYTHON_VERSION" = "3.9" || "$TRAVIS_PYTHON_VERSION" = "2.7"' distributions: "sdist bdist_wheel" matrix: diff --git a/README.rst b/README.rst index 4abb6f6..5a31463 100644 --- a/README.rst +++ b/README.rst @@ -24,9 +24,10 @@ Python application. Requirements ------------ -- Python 2.7 or 3.4+ -- ``msgpack-python`` +- Python 3.5+ +- ``msgpack`` - **IMPORTANT**: Version 0.8.0 is the last version supporting Python 2.6, 3.2 and 3.3 +- **IMPORTANT**: Version 0.9.6 is the last version supporting Python 2.7 and 3.4 Installation ------------ diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py index e140774..24c6924 100644 --- a/fluent/asyncsender.py +++ b/fluent/asyncsender.py @@ -1,13 +1,7 @@ # -*- coding: utf-8 -*- -from __future__ import print_function - import threading - -try: - from queue import Queue, Full, Empty -except ImportError: - from Queue import Queue, Full, Empty +from queue import Queue, Full, Empty from fluent import sender from fluent.sender import EventTime @@ -121,8 +115,8 @@ def _send(self, bytes_): self._queue_overflow_handler(discarded_bytes) try: self._queue.put(bytes_, block=(not self._queue_circular)) - except Full: # pragma: no cover - return False # this actually can't happen + except Full: # pragma: no cover + return False # this actually can't happen return True diff --git a/fluent/handler.py b/fluent/handler.py index 9297550..7aefd8f 100644 --- a/fluent/handler.py +++ b/fluent/handler.py @@ -9,11 +9,6 @@ except ImportError: # pragma: no cover import json -try: - basestring -except NameError: # pragma: no cover - basestring = (str, bytes) - from fluent import sender @@ -120,7 +115,7 @@ def _structuring(self, data, record): if isinstance(msg, dict): self._add_dic(data, msg) - elif isinstance(msg, basestring): + elif isinstance(msg, str): self._add_dic(data, self._format_msg(record, msg)) else: self._add_dic(data, {'message': msg}) @@ -171,8 +166,8 @@ def _format_by_dict_uses_time(self): @staticmethod def _add_dic(data, dic): for key, value in dic.items(): - if isinstance(key, basestring): - data[str(key)] = value + if isinstance(key, str): + data[key] = value class FluentHandler(logging.Handler): diff --git a/fluent/sender.py b/fluent/sender.py index 6762856..72e8c36 100644 --- a/fluent/sender.py +++ b/fluent/sender.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- -from __future__ import print_function - import errno import socket import struct diff --git a/setup.py b/setup.py index 65035ca..1453d55 100755 --- a/setup.py +++ b/setup.py @@ -12,31 +12,32 @@ setup( name='fluent-logger', - version='0.9.6', + version='0.10.0', description=desc, long_description=open(README).read(), package_dir={'fluent': 'fluent'}, packages=['fluent'], - install_requires=['msgpack<1.0.0'], + install_requires=['msgpack>1.0'], author='Kazuki Ohta', author_email='kazuki.ohta@gmail.com', + maintainer='Arcadiy Ivanov', + maintainer_email='arcadiy@ivanov.biz', url='https://github.com/fluent/fluent-logger-python', - download_url='http://pypi.python.org/pypi/fluent-logger/', + download_url='https://pypi.org/project/fluent-logger/', license='Apache License, Version 2.0', classifiers=[ - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: Implementation :: CPython', 'Programming Language :: Python :: Implementation :: PyPy', 'Development Status :: 5 - Production/Stable', 'Topic :: System :: Logging', 'Intended Audience :: Developers', ], - python_requires=">=2.7,!=3.0,!=3.1,!=3.2,!=3.3", + python_requires='>=3.5', test_suite='tests' ) diff --git a/tests/mockserver.py b/tests/mockserver.py index 426d139..77ecdd3 100644 --- a/tests/mockserver.py +++ b/tests/mockserver.py @@ -66,9 +66,7 @@ def run(self): def get_received(self): self.join() self._buf.seek(0) - # TODO: have to process string encoding properly. currently we assume - # that all encoding is utf-8. - return list(Unpacker(self._buf, encoding='utf-8')) + return list(Unpacker(self._buf)) def close(self):