diff --git a/.coveragerc b/.coveragerc
index 3d5fc07..2692cb7 100644
--- a/.coveragerc
+++ b/.coveragerc
@@ -2,7 +2,9 @@
[run]
branch = True
-omit = */tests/*
+omit =
+ */tests/*
+ fluent/__about__.py
[report]
omit = */tests/*
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
new file mode 100644
index 0000000..f95b238
--- /dev/null
+++ b/.github/workflows/test.yml
@@ -0,0 +1,48 @@
+name: Run test
+
+on:
+ push:
+ branches:
+ - master
+ pull_request:
+
+jobs:
+ lint:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - name: Install Ruff
+ run: pipx install ruff
+ - name: Ruff check
+ run: ruff check
+ - name: Ruff format
+ run: ruff format --diff
+
+ test:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "pypy3.9", "pypy3.10"]
+ steps:
+ - uses: actions/checkout@v4
+ - name: Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: ${{ matrix.python-version }}
+ cache: "pip"
+ cache-dependency-path: requirements-dev.txt
+ - name: Install dependencies
+ run: python -m pip install -r requirements-dev.txt
+ - name: Run tests
+ run: pytest --cov=fluent
+
+ build:
+ needs: test
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - run: pipx run build
+ - uses: actions/upload-artifact@v4
+ with:
+ name: dist
+ path: dist/
diff --git a/.gitignore b/.gitignore
index 921cbbd..fd4bc6c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,6 +4,9 @@
*.pyc
*.pyo
/*.egg-info
+/.coverage
+/.eggs
/.tox
/build
/dist
+.idea/
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index 505b9ff..0000000
--- a/.travis.yml
+++ /dev/null
@@ -1,15 +0,0 @@
-language: python
-python:
- - "2.6"
- - "2.7"
- - "3.2"
- - "3.3"
- - "3.4"
-# command to install dependencies, e.g. pip install -r requirements.txt --use-mirrors
-install:
- - "pip install --use-mirrors -e ."
- - "pip install coverage coveralls"
-script:
- - "python ./setup.py nosetests"
-after_success:
- - coveralls
diff --git a/MANIFEST.in b/MANIFEST.in
deleted file mode 100644
index e118e60..0000000
--- a/MANIFEST.in
+++ /dev/null
@@ -1,3 +0,0 @@
-include README.md
-include setup.py
-include COPYING
diff --git a/README.md b/README.md
deleted file mode 100644
index 4734f20..0000000
--- a/README.md
+++ /dev/null
@@ -1,88 +0,0 @@
-# A Python structured logger for Fluentd
-
-[](https://travis-ci.org/EvaSDK/fluent-logger-python)
-[](https://coveralls.io/r/EvaSDK/fluent-logger-python)
-
-Many web/mobile applications generate huge amount of event logs (c,f. login, logout, purchase, follow, etc). To analyze these event logs could be really valuable for improving the service. However, the challenge is collecting these logs easily and reliably.
-
-[Fluentd](http://github.com/fluent/fluentd) solves that problem by having: easy installation, small footprint, plugins, reliable buffering, log forwarding, etc.
-
-**fluent-logger-python** is a Python library, to record the events from Python application.
-
-## Requirements
-
-* Python 2.6 or greater including 3.x
-
-## Installation
-
-This library is distributed as 'fluent-logger' python package. Please execute the following command to install it.
-
- $ pip install fluent-logger
-
-## Configuration
-
-Fluentd daemon must be launched with a tcp source configuration:
-
-
- type forward
- port 24224
-
-
-To quickly test your setup, add a matcher that logs to the stdout:
-
-
- type stdout
-
-
-## Usage
-
-### Event-Based Interface
-
-First, you need to call logger.setup() to create global logger instance. This call needs to be called only once, at the beggining of the application for example.
-
-By default, the logger assumes fluentd daemon is launched locally. You can also specify remote logger by passing the options.
-
- from fluent import sender
-
- # for local fluent
- sender.setup('app')
-
- # for remote fluent
- sender.setup('app', host='host', port=24224)
-
-Then, please create the events like this. This will send the event to fluent, with tag 'app.follow' and the attributes 'from' and 'to'.
-
- from fluent import event
-
- # send event to fluentd, with 'app.follow' tag
- event.Event('follow', {
- 'from': 'userA',
- 'to': 'userB'
- })
-
-### Python logging.Handler interface
-
-This client-library also has FluentHandler class for Python logging module.
-
- import logging
- from fluent import handler
-
- logging.basicConfig(level=logging.INFO)
- l = logging.getLogger('fluent.test')
- l.addHandler(handler.FluentHandler('app.follow', host='host', port=24224))
- l.info({
- 'from': 'userA',
- 'to': 'userB'
- })
-
-## Testing
-
-Testing can be done using [nose](https://nose.readthedocs.org/en/latest/).
-
-## Contributors
-
-Patches contributed by [those people](https://github.com/fluent/fluent-logger-python/contributors).
-
-## License
-
-Apache License, Version 2.0
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..30b499f
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,388 @@
+A Python structured logger for Fluentd/Fluent Bit
+=================================================
+
+Many web/mobile applications generate huge amount of event logs (c,f.
+login, logout, purchase, follow, etc). To analyze these event logs could
+be really valuable for improving the service. However, the challenge is
+collecting these logs easily and reliably.
+
+`Fluentd `__ and `Fluent Bit `__ solves that problem by
+having: easy installation, small footprint, plugins, reliable buffering,
+log forwarding, etc.
+
+**fluent-logger-python** is a Python library, to record the events from
+Python application.
+
+Requirements
+------------
+
+- Python 3.7+
+- ``msgpack``
+- **IMPORTANT**: Version 0.8.0 is the last version supporting Python 2.6, 3.2 and 3.3
+- **IMPORTANT**: Version 0.9.6 is the last version supporting Python 2.7 and 3.4
+- **IMPORTANT**: Version 0.10.0 is the last version supporting Python 3.5 and 3.6
+
+Installation
+------------
+
+This library is distributed as 'fluent-logger' python package. Please
+execute the following command to install it.
+
+.. code:: sh
+
+ $ pip install fluent-logger
+
+Configuration
+-------------
+
+Fluentd daemon must be launched with a tcp source configuration:
+
+::
+
+
+ type forward
+ port 24224
+
+
+To quickly test your setup, add a matcher that logs to the stdout:
+
+::
+
+
+ type stdout
+
+
+Usage
+-----
+
+FluentSender Interface
+~~~~~~~~~~~~~~~~~~~~~~
+
+`sender.FluentSender` is a structured event logger for Fluentd.
+
+By default, the logger assumes fluentd daemon is launched locally. You
+can also specify remote logger by passing the options.
+
+.. code:: python
+
+ from fluent import sender
+
+ # for local fluent
+ logger = sender.FluentSender('app')
+
+ # for remote fluent
+ logger = sender.FluentSender('app', host='host', port=24224)
+
+For sending event, call `emit` method with your event. Following example will send the event to
+fluentd, with tag 'app.follow' and the attributes 'from' and 'to'.
+
+.. code:: python
+
+ # Use current time
+ logger.emit('follow', {'from': 'userA', 'to': 'userB'})
+
+ # Specify optional time
+ cur_time = int(time.time())
+ logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to':'userB'})
+
+To send events with nanosecond-precision timestamps (Fluent 0.14 and up),
+specify `nanosecond_precision` on `FluentSender`.
+
+.. code:: python
+
+ # Use nanosecond
+ logger = sender.FluentSender('app', nanosecond_precision=True)
+ logger.emit('follow', {'from': 'userA', 'to': 'userB'})
+ logger.emit_with_time('follow', time.time(), {'from': 'userA', 'to': 'userB'})
+
+You can detect an error via return value of `emit`. If an error happens in `emit`, `emit` returns `False` and get an error object using `last_error` method.
+
+.. code:: python
+
+ if not logger.emit('follow', {'from': 'userA', 'to': 'userB'}):
+ print(logger.last_error)
+ logger.clear_last_error() # clear stored error after handled errors
+
+If you want to shutdown the client, call `close()` method.
+
+.. code:: python
+
+ logger.close()
+
+Event-Based Interface
+~~~~~~~~~~~~~~~~~~~~~
+
+This API is a wrapper for `sender.FluentSender`.
+
+First, you need to call ``sender.setup()`` to create global `sender.FluentSender` logger
+instance. This call needs to be called only once, at the beginning of
+the application for example.
+
+Initialization code of Event-Based API is below:
+
+.. code:: python
+
+ from fluent import sender
+
+ # for local fluent
+ sender.setup('app')
+
+ # for remote fluent
+ sender.setup('app', host='host', port=24224)
+
+Then, please create the events like this. This will send the event to
+fluentd, with tag 'app.follow' and the attributes 'from' and 'to'.
+
+.. code:: python
+
+ from fluent import event
+
+ # send event to fluentd, with 'app.follow' tag
+ event.Event('follow', {
+ 'from': 'userA',
+ 'to': 'userB'
+ })
+
+`event.Event` has one limitation which can't return success/failure result.
+
+Other methods for Event-Based Interface.
+
+.. code:: python
+
+ sender.get_global_sender # get instance of global sender
+ sender.close # Call FluentSender#close
+
+Handler for buffer overflow
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+You can inject your own custom proc to handle buffer overflow in the event of connection failure. This will mitigate the loss of data instead of simply throwing data away.
+
+.. code:: python
+
+ import msgpack
+ from io import BytesIO
+
+ def overflow_handler(pendings):
+ unpacker = msgpack.Unpacker(BytesIO(pendings))
+ for unpacked in unpacker:
+ print(unpacked)
+
+ logger = sender.FluentSender('app', host='host', port=24224, buffer_overflow_handler=overflow_handler)
+
+You should handle any exception in handler. fluent-logger ignores exceptions from ``buffer_overflow_handler``.
+
+This handler is also called when pending events exist during `close()`.
+
+Python logging.Handler interface
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This client-library also has ``FluentHandler`` class for Python logging
+module.
+
+.. code:: python
+
+ import logging
+ from fluent import handler
+
+ custom_format = {
+ 'host': '%(hostname)s',
+ 'where': '%(module)s.%(funcName)s',
+ 'type': '%(levelname)s',
+ 'stack_trace': '%(exc_text)s'
+ }
+
+ logging.basicConfig(level=logging.INFO)
+ l = logging.getLogger('fluent.test')
+ h = handler.FluentHandler('app.follow', host='host', port=24224, buffer_overflow_handler=overflow_handler)
+ formatter = handler.FluentRecordFormatter(custom_format)
+ h.setFormatter(formatter)
+ l.addHandler(h)
+ l.info({
+ 'from': 'userA',
+ 'to': 'userB'
+ })
+ l.info('{"from": "userC", "to": "userD"}')
+ l.info("This log entry will be logged with the additional key: 'message'.")
+
+You can also customize formatter via logging.config.dictConfig
+
+.. code:: python
+
+ import logging.config
+ import yaml
+
+ with open('logging.yaml') as fd:
+ conf = yaml.load(fd)
+
+ logging.config.dictConfig(conf['logging'])
+
+You can inject your own custom proc to handle buffer overflow in the event of connection failure. This will mitigate the loss of data instead of simply throwing data away.
+
+.. code:: python
+
+ import msgpack
+ from io import BytesIO
+
+ def overflow_handler(pendings):
+ unpacker = msgpack.Unpacker(BytesIO(pendings))
+ for unpacked in unpacker:
+ print(unpacked)
+
+A sample configuration ``logging.yaml`` would be:
+
+.. code:: python
+
+ logging:
+ version: 1
+
+ formatters:
+ brief:
+ format: '%(message)s'
+ default:
+ format: '%(asctime)s %(levelname)-8s %(name)-15s %(message)s'
+ datefmt: '%Y-%m-%d %H:%M:%S'
+ fluent_fmt:
+ '()': fluent.handler.FluentRecordFormatter
+ format:
+ level: '%(levelname)s'
+ hostname: '%(hostname)s'
+ where: '%(module)s.%(funcName)s'
+
+ handlers:
+ console:
+ class : logging.StreamHandler
+ level: DEBUG
+ formatter: default
+ stream: ext://sys.stdout
+ fluent:
+ class: fluent.handler.FluentHandler
+ host: localhost
+ port: 24224
+ tag: test.logging
+ buffer_overflow_handler: overflow_handler
+ formatter: fluent_fmt
+ level: DEBUG
+ none:
+ class: logging.NullHandler
+
+ loggers:
+ amqp:
+ handlers: [none]
+ propagate: False
+ conf:
+ handlers: [none]
+ propagate: False
+ '': # root logger
+ handlers: [console, fluent]
+ level: DEBUG
+ propagate: False
+
+Asynchronous Communication
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Besides the regular interfaces - the event-based one provided by ``sender.FluentSender`` and the python logging one
+provided by ``handler.FluentHandler`` - there are also corresponding asynchronous versions in ``asyncsender`` and
+``asynchandler`` respectively. These versions use a separate thread to handle the communication with the remote fluentd
+server. In this way the client of the library won't be blocked during the logging of the events, and won't risk going
+into timeout if the fluentd server becomes unreachable. Also it won't be slowed down by the network overhead.
+
+The interfaces in ``asyncsender`` and ``asynchandler`` are exactly the same as those in ``sender`` and ``handler``, so it's
+just a matter of importing from a different module.
+
+For instance, for the event-based interface:
+
+.. code:: python
+
+ from fluent import asyncsender as sender
+
+ # for local fluent
+ sender.setup('app')
+
+ # for remote fluent
+ sender.setup('app', host='host', port=24224)
+
+ # do your work
+ ...
+
+ # IMPORTANT: before program termination, close the sender
+ sender.close()
+
+or for the python logging interface:
+
+.. code:: python
+
+ import logging
+ from fluent import asynchandler as handler
+
+ custom_format = {
+ 'host': '%(hostname)s',
+ 'where': '%(module)s.%(funcName)s',
+ 'type': '%(levelname)s',
+ 'stack_trace': '%(exc_text)s'
+ }
+
+ logging.basicConfig(level=logging.INFO)
+ l = logging.getLogger('fluent.test')
+ h = handler.FluentHandler('app.follow', host='host', port=24224, buffer_overflow_handler=overflow_handler)
+ formatter = handler.FluentRecordFormatter(custom_format)
+ h.setFormatter(formatter)
+ l.addHandler(h)
+ l.info({
+ 'from': 'userA',
+ 'to': 'userB'
+ })
+ l.info('{"from": "userC", "to": "userD"}')
+ l.info("This log entry will be logged with the additional key: 'message'.")
+
+ ...
+
+ # IMPORTANT: before program termination, close the handler
+ h.close()
+
+**NOTE**: please note that it's important to close the sender or the handler at program termination. This will make
+sure the communication thread terminates and it's joined correctly. Otherwise the program won't exit, waiting for
+the thread, unless forcibly killed.
+
+Circular queue mode
++++++++++++++++++++
+
+In some applications it can be especially important to guarantee that the logging process won't block under *any*
+circumstance, even when it's logging faster than the sending thread could handle (*backpressure*). In this case it's
+possible to enable the `circular queue` mode, by passing `True` in the `queue_circular` parameter of
+``asyncsender.FluentSender`` or ``asynchandler.FluentHandler``. By doing so the thread doing the logging won't block
+even when the queue is full, the new event will be added to the queue by discarding the oldest one.
+
+**WARNING**: setting `queue_circular` to `True` will cause loss of events if the queue fills up completely! Make sure
+that this doesn't happen, or it's acceptable for your application.
+
+
+Testing
+-------
+
+Testing can be done using `pytest `__.
+
+.. code:: sh
+
+ $ pytest tests
+
+
+Release
+-------
+
+.. code:: sh
+
+ $ # Download dist.zip for release from GitHub Action artifact.
+ $ unzip -d dist dist.zip
+ $ pipx twine upload dist/*
+
+
+Contributors
+------------
+
+Patches contributed by `those
+people `__.
+
+License
+-------
+
+Apache License, Version 2.0
diff --git a/fluent/__about__.py b/fluent/__about__.py
new file mode 100644
index 0000000..fee46bd
--- /dev/null
+++ b/fluent/__about__.py
@@ -0,0 +1 @@
+__version__ = "0.11.1"
diff --git a/fluent/asynchandler.py b/fluent/asynchandler.py
new file mode 100644
index 0000000..e3c3dc0
--- /dev/null
+++ b/fluent/asynchandler.py
@@ -0,0 +1,10 @@
+from fluent import asyncsender, handler
+
+
+class FluentHandler(handler.FluentHandler):
+ """
+ Asynchronous Logging Handler for fluent.
+ """
+
+ def getSenderClass(self):
+ return asyncsender.FluentSender
diff --git a/fluent/asyncsender.py b/fluent/asyncsender.py
new file mode 100644
index 0000000..b391290
--- /dev/null
+++ b/fluent/asyncsender.py
@@ -0,0 +1,149 @@
+import threading
+from queue import Empty, Full, Queue
+
+from fluent import sender
+from fluent.sender import EventTime
+
+__all__ = ["EventTime", "FluentSender"]
+
+DEFAULT_QUEUE_MAXSIZE = 100
+DEFAULT_QUEUE_CIRCULAR = False
+
+_TOMBSTONE = object()
+
+_global_sender = None
+
+
+def _set_global_sender(sender): # pragma: no cover
+ """[For testing] Function to set global sender directly"""
+ global _global_sender
+ _global_sender = sender
+
+
+def setup(tag, **kwargs): # pragma: no cover
+ global _global_sender
+ _global_sender = FluentSender(tag, **kwargs)
+
+
+def get_global_sender(): # pragma: no cover
+ return _global_sender
+
+
+def close(): # pragma: no cover
+ get_global_sender().close()
+
+
+class FluentSender(sender.FluentSender):
+ def __init__(
+ self,
+ tag,
+ host="localhost",
+ port=24224,
+ bufmax=1 * 1024 * 1024,
+ timeout=3.0,
+ verbose=False,
+ buffer_overflow_handler=None,
+ nanosecond_precision=False,
+ msgpack_kwargs=None,
+ queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
+ queue_circular=DEFAULT_QUEUE_CIRCULAR,
+ queue_overflow_handler=None,
+ **kwargs,
+ ):
+ """
+ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
+ """
+ super().__init__(
+ tag=tag,
+ host=host,
+ port=port,
+ bufmax=bufmax,
+ timeout=timeout,
+ verbose=verbose,
+ buffer_overflow_handler=buffer_overflow_handler,
+ nanosecond_precision=nanosecond_precision,
+ msgpack_kwargs=msgpack_kwargs,
+ **kwargs,
+ )
+ self._queue_maxsize = queue_maxsize
+ self._queue_circular = queue_circular
+ if queue_circular and queue_overflow_handler:
+ self._queue_overflow_handler = queue_overflow_handler
+ else:
+ self._queue_overflow_handler = self._queue_overflow_handler_default
+
+ self._thread_guard = (
+ threading.Event()
+ ) # This ensures visibility across all variables
+ self._closed = False
+
+ self._queue = Queue(maxsize=queue_maxsize)
+ self._send_thread = threading.Thread(
+ target=self._send_loop, name="AsyncFluentSender %d" % id(self)
+ )
+ self._send_thread.daemon = True
+ self._send_thread.start()
+
+ def close(self, flush=True):
+ with self.lock:
+ if self._closed:
+ return
+ self._closed = True
+ if not flush:
+ while True:
+ try:
+ self._queue.get(block=False)
+ except Empty:
+ break
+ self._queue.put(_TOMBSTONE)
+ self._send_thread.join()
+
+ @property
+ def queue_maxsize(self):
+ return self._queue_maxsize
+
+ @property
+ def queue_blocking(self):
+ return not self._queue_circular
+
+ @property
+ def queue_circular(self):
+ return self._queue_circular
+
+ def _send(self, bytes_):
+ with self.lock:
+ if self._closed:
+ return False
+ if self._queue_circular and self._queue.full():
+ # discard oldest
+ try:
+ discarded_bytes = self._queue.get(block=False)
+ except Empty: # pragma: no cover
+ pass
+ else:
+ self._queue_overflow_handler(discarded_bytes)
+ try:
+ self._queue.put(bytes_, block=(not self._queue_circular))
+ except Full: # pragma: no cover
+ return False # this actually can't happen
+
+ return True
+
+ def _send_loop(self):
+ send_internal = super()._send_internal
+
+ try:
+ while True:
+ bytes_ = self._queue.get(block=True)
+ if bytes_ is _TOMBSTONE:
+ break
+
+ send_internal(bytes_)
+ finally:
+ self._close()
+
+ def _queue_overflow_handler_default(self, discarded_bytes):
+ pass
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
diff --git a/fluent/event.py b/fluent/event.py
index 76f27ca..a8c4d7b 100644
--- a/fluent/event.py
+++ b/fluent/event.py
@@ -1,13 +1,12 @@
-# -*- coding: utf-8 -*-
-
-import time
-
from fluent import sender
-class Event(object):
+class Event:
def __init__(self, label, data, **kwargs):
- assert isinstance(data, dict), 'data must be a dict'
- sender_ = kwargs.get('sender', sender.get_global_sender())
- timestamp = kwargs.get('time', int(time.time()))
- sender_.emit_with_time(label, timestamp, data)
+ assert isinstance(data, dict), "data must be a dict"
+ sender_ = kwargs.get("sender", sender.get_global_sender())
+ timestamp = kwargs.get("time", None)
+ if timestamp is not None:
+ sender_.emit_with_time(label, timestamp, data)
+ else:
+ sender_.emit(label, data)
diff --git a/fluent/handler.py b/fluent/handler.py
index e14c036..8029604 100644
--- a/fluent/handler.py
+++ b/fluent/handler.py
@@ -1,83 +1,279 @@
-# -*- coding: utf-8 -*-
-
+import json
import logging
import socket
-try:
- import simplejson as json
-except ImportError: # pragma: no cover
- import json
+from fluent import sender
+
-try:
- basestring
-except NameError: # pragma: no cover
- basestring = (str, bytes)
+class FluentRecordFormatter(logging.Formatter):
+ """A structured formatter for Fluent.
-from fluent import sender
+ Best used with server storing data in an ElasticSearch cluster for example.
+
+ :param fmt: a dict or a callable with format string as values to map to provided keys.
+ If callable, should accept a single argument `LogRecord` and return a dict,
+ and have a field `usesTime` that is callable and return a bool as would
+ `FluentRecordFormatter.usesTime`
+ :param datefmt: strftime()-compatible date/time format string.
+ :param style: '%', '{' or '$' (used only with Python 3.2 or above)
+ :param fill_missing_fmt_key: if True, do not raise a KeyError if the format
+ key is not found. Put None if not found.
+ :param format_json: if True, will attempt to parse message as json. If not,
+ will use message as-is. Defaults to True
+ :param exclude_attrs: switches this formatter into a mode where all attributes
+ except the ones specified by `exclude_attrs` are logged with the record as is.
+ If `None`, operates as before, otherwise `fmt` is ignored.
+ Can be an iterable.
+ """
+
+ def __init__(
+ self,
+ fmt=None,
+ datefmt=None,
+ style="%",
+ fill_missing_fmt_key=False,
+ format_json=True,
+ exclude_attrs=None,
+ ):
+ super().__init__(None, datefmt)
+
+ if style != "%":
+ self.__style, basic_fmt_dict = {
+ "{": (
+ logging.StrFormatStyle,
+ {
+ "sys_host": "{hostname}",
+ "sys_name": "{name}",
+ "sys_module": "{module}",
+ },
+ ),
+ "$": (
+ logging.StringTemplateStyle,
+ {
+ "sys_host": "${hostname}",
+ "sys_name": "${name}",
+ "sys_module": "${module}",
+ },
+ ),
+ }[style]
+ else:
+ self.__style = None
+ basic_fmt_dict = {
+ "sys_host": "%(hostname)s",
+ "sys_name": "%(name)s",
+ "sys_module": "%(module)s",
+ }
+
+ if exclude_attrs is not None:
+ self._exc_attrs = set(exclude_attrs)
+ self._fmt_dict = None
+ self._formatter = self._format_by_exclusion
+ self.usesTime = super().usesTime
+ else:
+ self._exc_attrs = None
+ if not fmt:
+ self._fmt_dict = basic_fmt_dict
+ self._formatter = self._format_by_dict
+ self.usesTime = self._format_by_dict_uses_time
+ else:
+ if callable(fmt):
+ self._formatter = fmt
+ self.usesTime = fmt.usesTime
+ else:
+ self._fmt_dict = fmt
+ self._formatter = self._format_by_dict
+ self.usesTime = self._format_by_dict_uses_time
+ if format_json:
+ self._format_msg = self._format_msg_json
+ else:
+ self._format_msg = self._format_msg_default
-class FluentRecordFormatter(object):
- def __init__(self):
self.hostname = socket.gethostname()
+ self.fill_missing_fmt_key = fill_missing_fmt_key
+
def format(self, record):
- data = {'sys_host': self.hostname,
- 'sys_name': record.name,
- 'sys_module': record.module,
- # 'sys_lineno': record.lineno,
- # 'sys_levelno': record.levelno,
- # 'sys_levelname': record.levelname,
- # 'sys_filename': record.filename,
- # 'sys_funcname': record.funcName,
- # 'sys_exc_info': record.exc_info,
- }
- # if 'sys_exc_info' in data and data['sys_exc_info']:
- # data['sys_exc_info'] = self.formatException(data['sys_exc_info'])
-
- self._structuring(data, record.msg)
+ # Compute attributes handled by parent class.
+ super().format(record)
+ # Add ours
+ record.hostname = self.hostname
+
+ # Apply format
+ data = self._formatter(record)
+
+ self._structuring(data, record)
return data
- def _structuring(self, data, msg):
+ def usesTime(self):
+ """This method is substituted on construction based on settings for performance reasons"""
+
+ def _structuring(self, data, record):
+ """Melds `msg` into `data`.
+
+ :param data: dictionary to be sent to fluent server
+ :param msg: :class:`LogRecord`'s message to add to `data`.
+ `msg` can be a simple string for backward compatibility with
+ :mod:`logging` framework, a JSON encoded string or a dictionary
+ that will be merged into dictionary generated in :meth:`format.
+ """
+ msg = record.msg
+
if isinstance(msg, dict):
self._add_dic(data, msg)
elif isinstance(msg, str):
+ self._add_dic(data, self._format_msg(record, msg))
+ else:
+ self._add_dic(data, {"message": msg})
+
+ def _format_msg_json(self, record, msg):
+ try:
+ json_msg = json.loads(str(msg))
+ if isinstance(json_msg, dict):
+ return json_msg
+ else:
+ return self._format_msg_default(record, msg)
+ except ValueError:
+ return self._format_msg_default(record, msg)
+
+ def _format_msg_default(self, record, msg):
+ return {"message": super().format(record)}
+
+ def _format_by_exclusion(self, record):
+ data = {}
+ for key, value in record.__dict__.items():
+ if key not in self._exc_attrs:
+ data[key] = value
+ return data
+
+ def _format_by_dict(self, record):
+ data = {}
+ for key, value in self._fmt_dict.items():
try:
- self._add_dic(data, json.loads(str(msg)))
- except ValueError:
- pass
+ if self.__style:
+ value = self.__style(value).format(record)
+ else:
+ value = value % record.__dict__
+ except KeyError as exc:
+ value = None
+ if not self.fill_missing_fmt_key:
+ raise exc
+
+ data[key] = value
+ return data
+
+ def _format_by_dict_uses_time(self):
+ if self.__style:
+ search = self.__style.asctime_search
+ else:
+ search = "%(asctime)"
+ return any([value.find(search) >= 0 for value in self._fmt_dict.values()])
@staticmethod
def _add_dic(data, dic):
for key, value in dic.items():
- if isinstance(key, basestring):
- data[str(key)] = value
+ if isinstance(key, str):
+ data[key] = value
class FluentHandler(logging.Handler):
- '''
+ """
Logging Handler for fluent.
- '''
- def __init__(self,
- tag,
- host='localhost',
- port=24224,
- timeout=3.0,
- verbose=False):
+ """
+ def __init__(
+ self,
+ tag,
+ host="localhost",
+ port=24224,
+ timeout=3.0,
+ verbose=False,
+ buffer_overflow_handler=None,
+ msgpack_kwargs=None,
+ nanosecond_precision=False,
+ **kwargs,
+ ):
self.tag = tag
- self.sender = sender.FluentSender(tag,
- host=host, port=port,
- timeout=timeout, verbose=verbose)
+ self._host = host
+ self._port = port
+ self._timeout = timeout
+ self._verbose = verbose
+ self._buffer_overflow_handler = buffer_overflow_handler
+ self._msgpack_kwargs = msgpack_kwargs
+ self._nanosecond_precision = nanosecond_precision
+ self._kwargs = kwargs
+ self._sender = None
logging.Handler.__init__(self)
+ def getSenderClass(self):
+ return sender.FluentSender
+
+ @property
+ def sender(self):
+ if self._sender is None:
+ self._sender = self.getSenderInstance(
+ tag=self.tag,
+ host=self._host,
+ port=self._port,
+ timeout=self._timeout,
+ verbose=self._verbose,
+ buffer_overflow_handler=self._buffer_overflow_handler,
+ msgpack_kwargs=self._msgpack_kwargs,
+ nanosecond_precision=self._nanosecond_precision,
+ **self._kwargs,
+ )
+ return self._sender
+
+ def getSenderInstance(
+ self,
+ tag,
+ host,
+ port,
+ timeout,
+ verbose,
+ buffer_overflow_handler,
+ msgpack_kwargs,
+ nanosecond_precision,
+ **kwargs,
+ ):
+ sender_class = self.getSenderClass()
+ return sender_class(
+ tag,
+ host=host,
+ port=port,
+ timeout=timeout,
+ verbose=verbose,
+ buffer_overflow_handler=buffer_overflow_handler,
+ msgpack_kwargs=msgpack_kwargs,
+ nanosecond_precision=nanosecond_precision,
+ **kwargs,
+ )
+
def emit(self, record):
data = self.format(record)
- self.sender.emit(None, data)
+ _sender = self.sender
+ return _sender.emit_with_time(
+ None,
+ sender.EventTime(record.created)
+ if _sender.nanosecond_precision
+ else int(record.created),
+ data,
+ )
def close(self):
self.acquire()
try:
- self.sender._close()
- logging.Handler.close(self)
+ try:
+ if self._sender is not None:
+ self._sender.close()
+ self._sender = None
+ finally:
+ super().close()
finally:
self.release()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
diff --git a/fluent/sender.py b/fluent/sender.py
index 43ea213..fe5fd35 100644
--- a/fluent/sender.py
+++ b/fluent/sender.py
@@ -1,78 +1,155 @@
-# -*- coding: utf-8 -*-
-
-from __future__ import print_function
+import errno
import socket
+import struct
import threading
import time
+import traceback
import msgpack
-
_global_sender = None
-def setup(tag, **kwargs):
- host = kwargs.get('host', 'localhost')
- port = kwargs.get('port', 24224)
+def _set_global_sender(sender): # pragma: no cover
+ """[For testing] Function to set global sender directly"""
+ global _global_sender
+ _global_sender = sender
+
+def setup(tag, **kwargs): # pragma: no cover
global _global_sender
- _global_sender = FluentSender(tag, host=host, port=port)
+ _global_sender = FluentSender(tag, **kwargs)
-def get_global_sender():
+def get_global_sender(): # pragma: no cover
return _global_sender
-class FluentSender(object):
- def __init__(self,
- tag,
- host='localhost',
- port=24224,
- bufmax=1 * 1024 * 1024,
- timeout=3.0,
- verbose=False):
+def close(): # pragma: no cover
+ get_global_sender().close()
+
+
+class EventTime(msgpack.ExtType):
+ def __new__(cls, timestamp, nanoseconds=None):
+ seconds = int(timestamp)
+ if nanoseconds is None:
+ nanoseconds = int(timestamp % 1 * 10**9)
+ return super().__new__(
+ cls,
+ code=0,
+ data=struct.pack(">II", seconds, nanoseconds),
+ )
+ @classmethod
+ def from_unix_nano(cls, unix_nano):
+ seconds, nanos = divmod(unix_nano, 10**9)
+ return cls(seconds, nanos)
+
+
+class FluentSender:
+ def __init__(
+ self,
+ tag,
+ host="localhost",
+ port=24224,
+ bufmax=1 * 1024 * 1024,
+ timeout=3.0,
+ verbose=False,
+ buffer_overflow_handler=None,
+ nanosecond_precision=False,
+ msgpack_kwargs=None,
+ *,
+ forward_packet_error=True,
+ **kwargs,
+ ):
+ """
+ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
+ """
self.tag = tag
self.host = host
self.port = port
self.bufmax = bufmax
self.timeout = timeout
self.verbose = verbose
+ self.buffer_overflow_handler = buffer_overflow_handler
+ self.nanosecond_precision = nanosecond_precision
+ self.forward_packet_error = forward_packet_error
+ self.msgpack_kwargs = {} if msgpack_kwargs is None else msgpack_kwargs
self.socket = None
self.pendings = None
self.lock = threading.Lock()
-
- try:
- self._reconnect()
- except Exception:
- # will be retried in emit()
- self._close()
+ self._closed = False
+ self._last_error_threadlocal = threading.local()
def emit(self, label, data):
- cur_time = int(time.time())
- self.emit_with_time(label, cur_time, data)
+ if self.nanosecond_precision:
+ cur_time = EventTime.from_unix_nano(time.time_ns())
+ else:
+ cur_time = int(time.time())
+ return self.emit_with_time(label, cur_time, data)
def emit_with_time(self, label, timestamp, data):
- bytes_ = self._make_packet(label, timestamp, data)
- self._send(bytes_)
+ try:
+ bytes_ = self._make_packet(label, timestamp, data)
+ except Exception as e:
+ if not self.forward_packet_error:
+ raise
+ self.last_error = e
+ bytes_ = self._make_packet(
+ label,
+ timestamp,
+ {
+ "level": "CRITICAL",
+ "message": "Can't output to log",
+ "traceback": traceback.format_exc(),
+ },
+ )
+ return self._send(bytes_)
+
+ @property
+ def last_error(self):
+ return getattr(self._last_error_threadlocal, "exception", None)
+
+ @last_error.setter
+ def last_error(self, err):
+ self._last_error_threadlocal.exception = err
+
+ def clear_last_error(self, _thread_id=None):
+ if hasattr(self._last_error_threadlocal, "exception"):
+ delattr(self._last_error_threadlocal, "exception")
+
+ def close(self):
+ with self.lock:
+ if self._closed:
+ return
+ self._closed = True
+ if self.pendings:
+ try:
+ self._send_data(self.pendings)
+ except Exception:
+ self._call_buffer_overflow_handler(self.pendings)
+
+ self._close()
+ self.pendings = None
def _make_packet(self, label, timestamp, data):
if label:
- tag = '.'.join((self.tag, label))
+ tag = f"{self.tag}.{label}" if self.tag else label
else:
tag = self.tag
+ if self.nanosecond_precision and isinstance(timestamp, float):
+ timestamp = EventTime(timestamp)
packet = (tag, timestamp, data)
if self.verbose:
print(packet)
- return msgpack.packb(packet)
+ return msgpack.packb(packet, **self.msgpack_kwargs)
def _send(self, bytes_):
- self.lock.acquire()
- try:
- self._send_internal(bytes_)
- finally:
- self.lock.release()
+ with self.lock:
+ if self._closed:
+ return False
+ return self._send_internal(bytes_)
def _send_internal(self, bytes_):
# buffering
@@ -81,37 +158,108 @@ def _send_internal(self, bytes_):
bytes_ = self.pendings
try:
- # reconnect if possible
- self._reconnect()
-
- # send message
- self.socket.sendall(bytes_)
+ self._send_data(bytes_)
# send finished
self.pendings = None
- except Exception:
+
+ return True
+ except OSError as e:
+ self.last_error = e
+
# close socket
self._close()
- # clear buffer if it exceeds max bufer size
+
+ # clear buffer if it exceeds max buffer size
if self.pendings and (len(self.pendings) > self.bufmax):
- # TODO: add callback handler here
+ self._call_buffer_overflow_handler(self.pendings)
self.pendings = None
else:
self.pendings = bytes_
+ return False
+
+ def _check_recv_side(self):
+ try:
+ self.socket.settimeout(0.0)
+ try:
+ recvd = self.socket.recv(4096)
+ except OSError as recv_e:
+ if recv_e.errno != errno.EWOULDBLOCK:
+ raise
+ return
+
+ if recvd == b"":
+ raise OSError(errno.EPIPE, "Broken pipe")
+ finally:
+ self.socket.settimeout(self.timeout)
+
+ def _send_data(self, bytes_):
+ # reconnect if possible
+ self._reconnect()
+ # send message
+ bytes_to_send = len(bytes_)
+ bytes_sent = 0
+ self._check_recv_side()
+ while bytes_sent < bytes_to_send:
+ sent = self.socket.send(bytes_[bytes_sent:])
+ if sent == 0:
+ raise OSError(errno.EPIPE, "Broken pipe")
+ bytes_sent += sent
+ self._check_recv_side()
+
def _reconnect(self):
if not self.socket:
- if self.host.startswith('unix://'):
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.settimeout(self.timeout)
- sock.connect(self.host[len('unix://'):])
+ try:
+ if self.host.startswith("unix://"):
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.settimeout(self.timeout)
+ sock.connect(self.host[len("unix://") :])
+ else:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.settimeout(self.timeout)
+ # This might be controversial and may need to be removed
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ sock.connect((self.host, self.port))
+ except Exception as e:
+ try:
+ sock.close()
+ except Exception: # pragma: no cover
+ pass
+ raise e
else:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(self.timeout)
- sock.connect((self.host, self.port))
- self.socket = sock
+ self.socket = sock
+
+ def _call_buffer_overflow_handler(self, pending_events):
+ try:
+ if self.buffer_overflow_handler:
+ self.buffer_overflow_handler(pending_events)
+ except Exception:
+ # User should care any exception in handler
+ pass
def _close(self):
- if self.socket:
- self.socket.close()
- self.socket = None
+ try:
+ sock = self.socket
+ if sock:
+ try:
+ try:
+ sock.shutdown(socket.SHUT_RDWR)
+ except OSError: # pragma: no cover
+ pass
+ finally:
+ try:
+ sock.close()
+ except OSError: # pragma: no cover
+ pass
+ finally:
+ self.socket = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, typ, value, traceback):
+ try:
+ self.close()
+ except Exception as e: # pragma: no cover
+ self.last_error = e
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..8140e03
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,54 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "fluent-logger"
+dynamic = ["version"]
+description = "A Python logging handler for Fluentd event collector"
+readme = "README.rst"
+license = { file = "COPYING" }
+requires-python = ">=3.7"
+authors = [
+ { name = "Kazuki Ohta", email = "kazuki.ohta@gmail.com" },
+]
+maintainers = [
+ { name = "Arcadiy Ivanov", email = "arcadiy@ivanov.biz" },
+ { name = "Inada Naoki", email = "songofacandy@gmail.com" },
+]
+classifiers = [
+ "Development Status :: 5 - Production/Stable",
+ "Intended Audience :: Developers",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.7",
+ "Programming Language :: Python :: 3.8",
+ "Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
+ "Programming Language :: Python :: Implementation :: CPython",
+ "Programming Language :: Python :: Implementation :: PyPy",
+ "Topic :: System :: Logging",
+]
+dependencies = [
+ "msgpack>=1.0",
+]
+
+[project.urls]
+Download = "https://pypi.org/project/fluent-logger/"
+Homepage = "https://github.com/fluent/fluent-logger-python"
+
+[tool.hatch.version]
+path = "fluent/__about__.py"
+
+[tool.hatch.build.targets.sdist]
+exclude = [
+ "/.github",
+ "/.tox",
+ "/.venv",
+]
+
+[tool.hatch.build.targets.wheel]
+include = [
+ "/fluent",
+]
diff --git a/requirements-dev.txt b/requirements-dev.txt
new file mode 100644
index 0000000..3707664
--- /dev/null
+++ b/requirements-dev.txt
@@ -0,0 +1,3 @@
+pytest
+pytest-cov
+msgpack
diff --git a/setup.cfg b/setup.cfg
deleted file mode 100644
index 633f7f5..0000000
--- a/setup.cfg
+++ /dev/null
@@ -1,8 +0,0 @@
-[nosetests]
-match = ^test_
-cover-package = fluent
-with-coverage = 1
-cover-erase = 1
-cover-branches = 1
-cover-inclusive = 1
-cover-min-percentage = 70
diff --git a/setup.py b/setup.py
deleted file mode 100755
index 8e739c2..0000000
--- a/setup.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/usr/bin/python
-
-from os import path
-
-try:
- from setuptools import setup
-except ImportError:
- from distutils.core import setup
-
-README = path.abspath(path.join(path.dirname(__file__), 'README.md'))
-desc = 'A Python logging handler for Fluentd event collector'
-
-setup(
- name='fluent-logger',
- version='0.3.5',
- description=desc,
- long_description=open(README).read(),
- package_dir={'fluent': 'fluent'},
- packages=['fluent'],
- install_requires=['msgpack-python'],
- author='Kazuki Ohta',
- author_email='kazuki.ohta@gmail.com',
- url='https://github.com/fluent/fluent-logger-python',
- download_url='http://pypi.python.org/pypi/fluent-logger/',
- license='Apache License, Version 2.0',
- classifiers=[
- 'Programming Language :: Python :: 2',
- 'Programming Language :: Python :: 3',
- 'Development Status :: 4 - Beta',
- 'Intended Audience :: Developers',
- ],
- test_suite='tests'
-)
diff --git a/tests/mockserver.py b/tests/mockserver.py
index c160095..6ea2fff 100644
--- a/tests/mockserver.py
+++ b/tests/mockserver.py
@@ -1,5 +1,3 @@
-# -*- coding: utf-8 -*-
-
try:
from cStringIO import StringIO as BytesIO
except ImportError:
@@ -7,7 +5,6 @@
import socket
import threading
-import time
from msgpack import Unpacker
@@ -16,38 +13,78 @@ class MockRecvServer(threading.Thread):
"""
Single threaded server accepts one connection and recv until EOF.
"""
- def __init__(self, host='localhost', port=24224):
- if host.startswith('unix://'):
- self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self._sock.bind(host[len('unix://'):])
+
+ def __init__(self, host="localhost", port=0):
+ super().__init__()
+
+ if host.startswith("unix://"):
+ self.socket_proto = socket.AF_UNIX
+ self.socket_type = socket.SOCK_STREAM
+ self.socket_addr = host[len("unix://") :]
else:
- self._sock = socket.socket()
- self._sock.bind((host, port))
+ self.socket_proto = socket.AF_INET
+ self.socket_type = socket.SOCK_STREAM
+ self.socket_addr = (host, port)
+
+ self._sock = socket.socket(self.socket_proto, self.socket_type)
+ self._sock.bind(self.socket_addr)
+ if self.socket_proto == socket.AF_INET:
+ self.port = self._sock.getsockname()[1]
+
+ self._sock.listen(1)
self._buf = BytesIO()
+ self._con = None
- threading.Thread.__init__(self)
self.start()
def run(self):
sock = self._sock
- sock.listen(1)
- con, _ = sock.accept()
- while True:
- data = con.recv(4096)
- if not data:
- break
- self._buf.write(data)
- con.close()
- sock.close()
- self._sock = None
-
- def wait(self):
- while self._sock:
- time.sleep(0.1)
-
- def get_recieved(self):
- self.wait()
+
+ try:
+ try:
+ con, _ = sock.accept()
+ except Exception:
+ return
+ self._con = con
+ try:
+ while True:
+ try:
+ data = con.recv(16384)
+ if not data:
+ break
+ self._buf.write(data)
+ except OSError as e:
+ print("MockServer error: %s" % e)
+ break
+ finally:
+ con.close()
+ finally:
+ sock.close()
+
+ def get_received(self):
+ self.join()
self._buf.seek(0)
- # TODO: have to process string encoding properly. currently we assume
- # that all encoding is utf-8.
- return list(Unpacker(self._buf, encoding='utf-8'))
+ return list(Unpacker(self._buf))
+
+ def close(self):
+ try:
+ self._sock.close()
+ except Exception:
+ pass
+
+ try:
+ conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ conn.connect((self.socket_addr[0], self.port))
+ finally:
+ conn.close()
+ except Exception:
+ pass
+
+ if self._con:
+ try:
+ self._con.close()
+ except Exception:
+ pass
+
+ self.join()
diff --git a/tests/test_asynchandler.py b/tests/test_asynchandler.py
new file mode 100644
index 0000000..7bbf108
--- /dev/null
+++ b/tests/test_asynchandler.py
@@ -0,0 +1,385 @@
+import logging
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ from unittest import mock
+try:
+ from unittest.mock import patch
+except ImportError:
+ from unittest.mock import patch
+
+
+import fluent.asynchandler
+import fluent.handler
+from tests import mockserver
+
+
+def get_logger(name, level=logging.INFO):
+ logger = logging.getLogger(name)
+ logger.setLevel(level)
+ return logger
+
+
+class TestHandler(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._port = self._server.port
+
+ def tearDown(self):
+ self._server.close()
+
+ def get_handler_class(self):
+ # return fluent.handler.FluentHandler
+ return fluent.asynchandler.FluentHandler
+
+ def get_data(self):
+ return self._server.get_received()
+
+ def test_simple(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info({"from": "userA", "to": "userB"})
+
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(1, len(data))
+ eq(3, len(data[0]))
+ eq("app.follow", data[0][0])
+ eq("userA", data[0][2]["from"])
+ eq("userB", data[0][2]["to"])
+ self.assertTrue(data[0][1])
+ self.assertTrue(isinstance(data[0][1], int))
+
+ def test_custom_fmt(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(
+ fmt={
+ "name": "%(name)s",
+ "lineno": "%(lineno)d",
+ "emitted_at": "%(asctime)s",
+ }
+ )
+ )
+ log.addHandler(handler)
+ log.info({"sample": "value"})
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("lineno" in data[0][2])
+ self.assertTrue("emitted_at" in data[0][2])
+
+ def test_custom_fmt_with_format_style(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(
+ fmt={
+ "name": "{name}",
+ "lineno": "{lineno}",
+ "emitted_at": "{asctime}",
+ },
+ style="{",
+ )
+ )
+ log.addHandler(handler)
+ log.info({"sample": "value"})
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("lineno" in data[0][2])
+ self.assertTrue("emitted_at" in data[0][2])
+
+ def test_custom_fmt_with_template_style(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(
+ fmt={
+ "name": "${name}",
+ "lineno": "${lineno}",
+ "emitted_at": "${asctime}",
+ },
+ style="$",
+ )
+ )
+ log.addHandler(handler)
+ log.info({"sample": "value"})
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("lineno" in data[0][2])
+ self.assertTrue("emitted_at" in data[0][2])
+
+ def test_custom_field_raise_exception(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(
+ fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"}
+ )
+ )
+ log.addHandler(handler)
+ with self.assertRaises(KeyError):
+ log.info({"sample": "value"})
+ log.removeHandler(handler)
+
+ def test_custom_field_fill_missing_fmt_key_is_true(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(
+ fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"},
+ fill_missing_fmt_key=True,
+ )
+ )
+ log.addHandler(handler)
+ log.info({"sample": "value"})
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("custom_field" in data[0][2])
+ # field defaults to none if not in log record
+ self.assertIsNone(data[0][2]["custom_field"])
+
+ def test_json_encoded_message(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info('{"key": "hello world!", "param": "value"}')
+
+ data = self.get_data()
+ self.assertTrue("key" in data[0][2])
+ self.assertEqual("hello world!", data[0][2]["key"])
+
+ def test_unstructured_message(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info("hello %s", "world")
+
+ data = self.get_data()
+ self.assertTrue("message" in data[0][2])
+ self.assertEqual("hello world", data[0][2]["message"])
+
+ def test_unstructured_formatted_message(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info("hello world, %s", "you!")
+
+ data = self.get_data()
+ self.assertTrue("message" in data[0][2])
+ self.assertEqual("hello world, you!", data[0][2]["message"])
+
+ def test_number_string_simple_message(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info("1")
+
+ data = self.get_data()
+ self.assertTrue("message" in data[0][2])
+
+ def test_non_string_simple_message(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info(42)
+
+ data = self.get_data()
+ self.assertTrue("message" in data[0][2])
+
+ def test_non_string_dict_message(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info({42: "root"})
+
+ data = self.get_data()
+ # For some reason, non-string keys are ignored
+ self.assertFalse(42 in data[0][2])
+
+ def test_exception_message(self):
+ handler = self.get_handler_class()("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ try:
+ raise Exception("sample exception")
+ except Exception:
+ log.exception("it failed")
+
+ data = self.get_data()
+ message = data[0][2]["message"]
+ # Includes the logged message, as well as the stack trace.
+ self.assertTrue("it failed" in message)
+ self.assertTrue('tests/test_asynchandler.py", line' in message)
+ self.assertTrue("Exception: sample exception" in message)
+
+
+class TestHandlerWithCircularQueue(unittest.TestCase):
+ Q_SIZE = 3
+
+ def setUp(self):
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._port = self._server.port
+
+ def tearDown(self):
+ self._server.close()
+
+ def get_handler_class(self):
+ # return fluent.handler.FluentHandler
+ return fluent.asynchandler.FluentHandler
+
+ def get_data(self):
+ return self._server.get_received()
+
+ def test_simple(self):
+ handler = self.get_handler_class()(
+ "app.follow",
+ port=self._port,
+ queue_maxsize=self.Q_SIZE,
+ queue_circular=True,
+ )
+ with handler:
+ self.assertEqual(handler.sender.queue_circular, True)
+ self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE)
+
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info({"cnt": 1, "from": "userA", "to": "userB"})
+ log.info({"cnt": 2, "from": "userA", "to": "userB"})
+ log.info({"cnt": 3, "from": "userA", "to": "userB"})
+ log.info({"cnt": 4, "from": "userA", "to": "userB"})
+ log.info({"cnt": 5, "from": "userA", "to": "userB"})
+
+ data = self.get_data()
+ eq = self.assertEqual
+ # with the logging interface, we can't be sure to have filled up the queue, so we can
+ # test only for a cautelative condition here
+ self.assertTrue(len(data) >= self.Q_SIZE)
+
+ el = data[0]
+ eq(3, len(el))
+ eq("app.follow", el[0])
+ eq("userA", el[2]["from"])
+ eq("userB", el[2]["to"])
+ self.assertTrue(el[1])
+ self.assertTrue(isinstance(el[1], int))
+
+
+class QueueOverflowException(BaseException):
+ pass
+
+
+def queue_overflow_handler(discarded_bytes):
+ raise QueueOverflowException(discarded_bytes)
+
+
+class TestHandlerWithCircularQueueHandler(unittest.TestCase):
+ Q_SIZE = 1
+
+ def setUp(self):
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._port = self._server.port
+
+ def tearDown(self):
+ self._server.close()
+
+ def get_handler_class(self):
+ # return fluent.handler.FluentHandler
+ return fluent.asynchandler.FluentHandler
+
+ def test_simple(self):
+ handler = self.get_handler_class()(
+ "app.follow",
+ port=self._port,
+ queue_maxsize=self.Q_SIZE,
+ queue_circular=True,
+ queue_overflow_handler=queue_overflow_handler,
+ )
+ with handler:
+
+ def custom_full_queue():
+ handler.sender._queue.put(b"Mock", block=True)
+ return True
+
+ with patch.object(
+ fluent.asynchandler.asyncsender.Queue,
+ "full",
+ mock.Mock(side_effect=custom_full_queue),
+ ):
+ self.assertEqual(handler.sender.queue_circular, True)
+ self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE)
+
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+
+ exc_counter = 0
+
+ try:
+ log.info({"cnt": 1, "from": "userA", "to": "userB"})
+ except QueueOverflowException:
+ exc_counter += 1
+
+ try:
+ log.info({"cnt": 2, "from": "userA", "to": "userB"})
+ except QueueOverflowException:
+ exc_counter += 1
+
+ try:
+ log.info({"cnt": 3, "from": "userA", "to": "userB"})
+ except QueueOverflowException:
+ exc_counter += 1
+
+ # we can't be sure to have exception in every case due to multithreading,
+ # so we can test only for a cautelative condition here
+ print(f"Exception raised: {exc_counter} (expected 3)")
+ assert exc_counter >= 0
diff --git a/tests/test_asyncsender.py b/tests/test_asyncsender.py
new file mode 100644
index 0000000..a16c43f
--- /dev/null
+++ b/tests/test_asyncsender.py
@@ -0,0 +1,379 @@
+import unittest
+
+import msgpack
+
+import fluent.asyncsender
+from tests import mockserver
+
+
+class TestSetup(unittest.TestCase):
+ def tearDown(self):
+ from fluent.asyncsender import _set_global_sender
+
+ _set_global_sender(None)
+
+ def test_no_kwargs(self):
+ fluent.asyncsender.setup("tag")
+ actual = fluent.asyncsender.get_global_sender()
+ self.assertEqual(actual.tag, "tag")
+ self.assertEqual(actual.host, "localhost")
+ self.assertEqual(actual.port, 24224)
+ self.assertEqual(actual.timeout, 3.0)
+ actual.close()
+
+ def test_host_and_port(self):
+ fluent.asyncsender.setup("tag", host="myhost", port=24225)
+ actual = fluent.asyncsender.get_global_sender()
+ self.assertEqual(actual.tag, "tag")
+ self.assertEqual(actual.host, "myhost")
+ self.assertEqual(actual.port, 24225)
+ self.assertEqual(actual.timeout, 3.0)
+ actual.close()
+
+ def test_tolerant(self):
+ fluent.asyncsender.setup("tag", host="myhost", port=24225, timeout=1.0)
+ actual = fluent.asyncsender.get_global_sender()
+ self.assertEqual(actual.tag, "tag")
+ self.assertEqual(actual.host, "myhost")
+ self.assertEqual(actual.port, 24225)
+ self.assertEqual(actual.timeout, 1.0)
+ actual.close()
+
+
+class TestSender(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._sender = fluent.asyncsender.FluentSender(
+ tag="test", port=self._server.port
+ )
+
+ def tearDown(self):
+ try:
+ self._sender.close()
+ finally:
+ self._server.close()
+
+ def get_data(self):
+ return self._server.get_received()
+
+ def test_simple(self):
+ with self._sender as sender:
+ sender.emit("foo", {"bar": "baz"})
+
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(1, len(data))
+ eq(3, len(data[0]))
+ eq("test.foo", data[0][0])
+ eq({"bar": "baz"}, data[0][2])
+ self.assertTrue(data[0][1])
+ self.assertTrue(isinstance(data[0][1], int))
+
+ def test_decorator_simple(self):
+ with self._sender as sender:
+ sender.emit("foo", {"bar": "baz"})
+
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(1, len(data))
+ eq(3, len(data[0]))
+ eq("test.foo", data[0][0])
+ eq({"bar": "baz"}, data[0][2])
+ self.assertTrue(data[0][1])
+ self.assertTrue(isinstance(data[0][1], int))
+
+ def test_nanosecond(self):
+ with self._sender as sender:
+ sender.nanosecond_precision = True
+ sender.emit("foo", {"bar": "baz"})
+
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(1, len(data))
+ eq(3, len(data[0]))
+ eq("test.foo", data[0][0])
+ eq({"bar": "baz"}, data[0][2])
+ self.assertTrue(isinstance(data[0][1], msgpack.ExtType))
+ eq(data[0][1].code, 0)
+
+ def test_nanosecond_coerce_float(self):
+ time_ = 1490061367.8616468906402588
+ with self._sender as sender:
+ sender.nanosecond_precision = True
+ sender.emit_with_time("foo", time_, {"bar": "baz"})
+
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(1, len(data))
+ eq(3, len(data[0]))
+ eq("test.foo", data[0][0])
+ eq({"bar": "baz"}, data[0][2])
+ self.assertTrue(isinstance(data[0][1], msgpack.ExtType))
+ eq(data[0][1].code, 0)
+ eq(data[0][1].data, b"X\xd0\x8873[\xb0*")
+
+ def test_no_last_error_on_successful_emit(self):
+ with self._sender as sender:
+ sender.emit("foo", {"bar": "baz"})
+
+ self.assertEqual(sender.last_error, None)
+
+ def test_last_error_property(self):
+ EXCEPTION_MSG = "custom exception for testing last_error property"
+ self._sender.last_error = OSError(EXCEPTION_MSG)
+
+ self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG)
+
+ def test_clear_last_error(self):
+ EXCEPTION_MSG = "custom exception for testing clear_last_error"
+ self._sender.last_error = OSError(EXCEPTION_MSG)
+ self._sender.clear_last_error()
+
+ self.assertEqual(self._sender.last_error, None)
+
+ @unittest.skip(
+ "This test failed with 'TypeError: catching classes that do not "
+ "inherit from BaseException is not allowed' so skipped"
+ )
+ def test_connect_exception_during_sender_init(self, mock_socket):
+ # Make the socket.socket().connect() call raise a custom exception
+ mock_connect = mock_socket.socket.return_value.connect
+ EXCEPTION_MSG = "a sender init socket connect() exception"
+ mock_connect.side_effect = OSError(EXCEPTION_MSG)
+
+ self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG)
+
+ def test_sender_without_flush(self):
+ with self._sender as sender:
+ sender._queue.put(
+ fluent.asyncsender._TOMBSTONE
+ ) # This closes without closing
+ sender._send_thread.join()
+ for x in range(1, 10):
+ sender._queue.put(x)
+ sender.close(False)
+ self.assertIs(sender._queue.get(False), fluent.asyncsender._TOMBSTONE)
+
+
+class TestSenderDefaultProperties(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._sender = fluent.asyncsender.FluentSender(
+ tag="test", port=self._server.port
+ )
+
+ def tearDown(self):
+ try:
+ self._sender.close()
+ finally:
+ self._server.close()
+
+ def test_default_properties(self):
+ with self._sender as sender:
+ self.assertTrue(sender.queue_blocking)
+ self.assertFalse(sender.queue_circular)
+ self.assertTrue(isinstance(sender.queue_maxsize, int))
+ self.assertTrue(sender.queue_maxsize > 0)
+
+
+class TestSenderWithTimeout(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._sender = fluent.asyncsender.FluentSender(
+ tag="test", port=self._server.port, queue_timeout=0.04
+ )
+
+ def tearDown(self):
+ try:
+ self._sender.close()
+ finally:
+ self._server.close()
+
+ def get_data(self):
+ return self._server.get_received()
+
+ def test_simple(self):
+ with self._sender as sender:
+ sender.emit("foo", {"bar": "baz"})
+
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(1, len(data))
+ eq(3, len(data[0]))
+ eq("test.foo", data[0][0])
+ eq({"bar": "baz"}, data[0][2])
+ self.assertTrue(data[0][1])
+ self.assertTrue(isinstance(data[0][1], int))
+
+ def test_simple_with_timeout_props(self):
+ with self._sender as sender:
+ sender.emit("foo", {"bar": "baz"})
+
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(1, len(data))
+ eq(3, len(data[0]))
+ eq("test.foo", data[0][0])
+ eq({"bar": "baz"}, data[0][2])
+ self.assertTrue(data[0][1])
+ self.assertTrue(isinstance(data[0][1], int))
+
+
+class TestEventTime(unittest.TestCase):
+ def test_event_time(self):
+ time = fluent.asyncsender.EventTime(1490061367.8616468906402588)
+ self.assertEqual(time.code, 0)
+ self.assertEqual(time.data, b"X\xd0\x8873[\xb0*")
+
+
+class TestSenderWithTimeoutAndCircular(unittest.TestCase):
+ Q_SIZE = 3
+
+ def setUp(self):
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._sender = fluent.asyncsender.FluentSender(
+ tag="test",
+ port=self._server.port,
+ queue_maxsize=self.Q_SIZE,
+ queue_circular=True,
+ )
+
+ def tearDown(self):
+ try:
+ self._sender.close()
+ finally:
+ self._server.close()
+
+ def get_data(self):
+ return self._server.get_received()
+
+ def test_simple(self):
+ with self._sender as sender:
+ self.assertEqual(self._sender.queue_maxsize, self.Q_SIZE)
+ self.assertEqual(self._sender.queue_circular, True)
+ self.assertEqual(self._sender.queue_blocking, False)
+
+ ok = sender.emit("foo1", {"bar": "baz1"})
+ self.assertTrue(ok)
+ ok = sender.emit("foo2", {"bar": "baz2"})
+ self.assertTrue(ok)
+ ok = sender.emit("foo3", {"bar": "baz3"})
+ self.assertTrue(ok)
+ ok = sender.emit("foo4", {"bar": "baz4"})
+ self.assertTrue(ok)
+ ok = sender.emit("foo5", {"bar": "baz5"})
+ self.assertTrue(ok)
+
+ data = self.get_data()
+ eq = self.assertEqual
+ # with the logging interface, we can't be sure to have filled up the queue, so we can
+ # test only for a cautelative condition here
+ self.assertTrue(len(data) >= self.Q_SIZE)
+ eq(3, len(data[0]))
+ self.assertTrue(data[0][1])
+ self.assertTrue(isinstance(data[0][1], int))
+
+ eq(3, len(data[2]))
+ self.assertTrue(data[2][1])
+ self.assertTrue(isinstance(data[2][1], int))
+
+
+class TestSenderWithTimeoutMaxSizeNonCircular(unittest.TestCase):
+ Q_SIZE = 3
+
+ def setUp(self):
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._sender = fluent.asyncsender.FluentSender(
+ tag="test", port=self._server.port, queue_maxsize=self.Q_SIZE
+ )
+
+ def tearDown(self):
+ try:
+ self._sender.close()
+ finally:
+ self._server.close()
+
+ def get_data(self):
+ return self._server.get_received()
+
+ def test_simple(self):
+ with self._sender as sender:
+ self.assertEqual(self._sender.queue_maxsize, self.Q_SIZE)
+ self.assertEqual(self._sender.queue_blocking, True)
+ self.assertEqual(self._sender.queue_circular, False)
+
+ ok = sender.emit("foo1", {"bar": "baz1"})
+ self.assertTrue(ok)
+ ok = sender.emit("foo2", {"bar": "baz2"})
+ self.assertTrue(ok)
+ ok = sender.emit("foo3", {"bar": "baz3"})
+ self.assertTrue(ok)
+ ok = sender.emit("foo4", {"bar": "baz4"})
+ self.assertTrue(ok)
+ ok = sender.emit("foo5", {"bar": "baz5"})
+ self.assertTrue(ok)
+
+ data = self.get_data()
+ eq = self.assertEqual
+ print(data)
+ eq(5, len(data))
+ eq(3, len(data[0]))
+ eq("test.foo1", data[0][0])
+ eq({"bar": "baz1"}, data[0][2])
+ self.assertTrue(data[0][1])
+ self.assertTrue(isinstance(data[0][1], int))
+
+ eq(3, len(data[2]))
+ eq("test.foo3", data[2][0])
+ eq({"bar": "baz3"}, data[2][2])
+
+
+class TestSenderUnlimitedSize(unittest.TestCase):
+ Q_SIZE = 3
+
+ def setUp(self):
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._sender = fluent.asyncsender.FluentSender(
+ tag="test", port=self._server.port, queue_timeout=0.04, queue_maxsize=0
+ )
+
+ def tearDown(self):
+ try:
+ self._sender.close()
+ finally:
+ self._server.close()
+
+ def get_data(self):
+ return self._server.get_received()
+
+ def test_simple(self):
+ with self._sender as sender:
+ self.assertEqual(self._sender.queue_maxsize, 0)
+ self.assertEqual(self._sender.queue_blocking, True)
+ self.assertEqual(self._sender.queue_circular, False)
+
+ NUM = 1000
+ for i in range(1, NUM + 1):
+ ok = sender.emit(f"foo{i}", {"bar": f"baz{i}"})
+ self.assertTrue(ok)
+
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(NUM, len(data))
+ el = data[0]
+ eq(3, len(el))
+ eq("test.foo1", el[0])
+ eq({"bar": "baz1"}, el[2])
+ self.assertTrue(el[1])
+ self.assertTrue(isinstance(el[1], int))
+
+ el = data[NUM - 1]
+ eq(3, len(el))
+ eq(f"test.foo{NUM}", el[0])
+ eq({"bar": f"baz{NUM}"}, el[2])
diff --git a/tests/test_event.py b/tests/test_event.py
index 8a0669b..1e597d8 100644
--- a/tests/test_event.py
+++ b/tests/test_event.py
@@ -1,23 +1,58 @@
-# -*- coding: utf-8 -*-
-
import unittest
from fluent import event, sender
+from tests import mockserver
-sender.setup(server='localhost', tag='app')
+class TestException(BaseException):
+ __test__ = False # teach pytest this is not test class.
class TestEvent(unittest.TestCase):
+ def setUp(self):
+ self._server = mockserver.MockRecvServer("localhost")
+ sender.setup("app", port=self._server.port)
+
+ def tearDown(self):
+ from fluent.sender import _set_global_sender
+
+ sender.close()
+ _set_global_sender(None)
+
def test_logging(self):
+ # XXX: This tests succeeds even if the fluentd connection failed
# send event with tag app.follow
- event.Event('follow', {
- 'from': 'userA',
- 'to': 'userB'
- })
+ event.Event("follow", {"from": "userA", "to": "userB"})
+
+ def test_logging_with_timestamp(self):
+ # XXX: This tests succeeds even if the fluentd connection failed
# send event with tag app.follow, with timestamp
- event.Event('follow', {
- 'from': 'userA',
- 'to': 'userB'
- }, time=int(0))
+ event.Event("follow", {"from": "userA", "to": "userB"}, time=0)
+
+ def test_no_last_error_on_successful_event(self):
+ global_sender = sender.get_global_sender()
+ event.Event("unfollow", {"from": "userC", "to": "userD"})
+
+ self.assertEqual(global_sender.last_error, None)
+ sender.close()
+
+ @unittest.skip(
+ "This test failed with 'TypeError: catching classes that do not "
+ "inherit from BaseException is not allowed' so skipped"
+ )
+ def test_connect_exception_during_event_send(self, mock_socket):
+ # Make the socket.socket().connect() call raise a custom exception
+ mock_connect = mock_socket.socket.return_value.connect
+ EXCEPTION_MSG = "a event send socket connect() exception"
+ mock_connect.side_effect = TestException(EXCEPTION_MSG)
+
+ # Force the socket to reconnect while trying to emit the event
+ global_sender = sender.get_global_sender()
+ global_sender._close()
+
+ event.Event("unfollow", {"from": "userE", "to": "userF"})
+
+ ex = global_sender.last_error
+ self.assertEqual(ex.args, EXCEPTION_MSG)
+ global_sender.clear_last_error()
diff --git a/tests/test_handler.py b/tests/test_handler.py
index 7f944a8..711d282 100644
--- a/tests/test_handler.py
+++ b/tests/test_handler.py
@@ -1,46 +1,352 @@
-# -*- coding: utf-8 -*-
-
import logging
import unittest
import fluent.handler
-
from tests import mockserver
+def get_logger(name, level=logging.INFO):
+ logger = logging.getLogger(name)
+ logger.setLevel(level)
+ return logger
+
+
class TestHandler(unittest.TestCase):
def setUp(self):
- super(TestHandler, self).setUp()
- for port in range(10000, 20000):
- try:
- self._server = mockserver.MockRecvServer('localhost', port)
- self._port = port
- break
- except IOError:
- pass
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._port = self._server.port
+
+ def tearDown(self):
+ self._server.close()
def get_data(self):
- return self._server.get_recieved()
+ return self._server.get_received()
def test_simple(self):
- handler = fluent.handler.FluentHandler('app.follow', port=self._port)
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
- logging.basicConfig(level=logging.INFO)
- log = logging.getLogger('fluent.test')
- handler.setFormatter(fluent.handler.FluentRecordFormatter())
- log.addHandler(handler)
- log.info({
- 'from': 'userA',
- 'to': 'userB'
- })
- handler.close()
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+
+ log.info({"from": "userA", "to": "userB"})
+
+ log.removeHandler(handler)
data = self.get_data()
eq = self.assertEqual
eq(1, len(data))
eq(3, len(data[0]))
- eq('app.follow', data[0][0])
- eq('userA', data[0][2]['from'])
- eq('userB', data[0][2]['to'])
+ eq("app.follow", data[0][0])
+ eq("userA", data[0][2]["from"])
+ eq("userB", data[0][2]["to"])
self.assertTrue(data[0][1])
self.assertTrue(isinstance(data[0][1], int))
+
+ def test_custom_fmt(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(
+ fmt={
+ "name": "%(name)s",
+ "lineno": "%(lineno)d",
+ "emitted_at": "%(asctime)s",
+ }
+ )
+ )
+ log.addHandler(handler)
+ log.info({"sample": "value"})
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("lineno" in data[0][2])
+ self.assertTrue("emitted_at" in data[0][2])
+
+ def test_exclude_attrs(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter(exclude_attrs=[]))
+ log.addHandler(handler)
+ log.info({"sample": "value"})
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("lineno" in data[0][2])
+
+ def test_exclude_attrs_with_exclusion(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(exclude_attrs=["funcName"])
+ )
+ log.addHandler(handler)
+ log.info({"sample": "value"})
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("lineno" in data[0][2])
+
+ def test_exclude_attrs_with_extra(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter(exclude_attrs=[]))
+ log.addHandler(handler)
+ log.info("Test with value '%s'", "test value", extra={"x": 1234})
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("lineno" in data[0][2])
+ self.assertEqual("Test with value 'test value'", data[0][2]["message"])
+ self.assertEqual(1234, data[0][2]["x"])
+
+ def test_format_dynamic(self):
+ def formatter(record):
+ return {"message": record.message, "x": record.x, "custom_value": 1}
+
+ formatter.usesTime = lambda: True
+
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter(fmt=formatter))
+ log.addHandler(handler)
+ log.info("Test with value '%s'", "test value", extra={"x": 1234})
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("x" in data[0][2])
+ self.assertEqual(1234, data[0][2]["x"])
+ self.assertEqual(1, data[0][2]["custom_value"])
+
+ def test_custom_fmt_with_format_style(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(
+ fmt={
+ "name": "{name}",
+ "lineno": "{lineno}",
+ "emitted_at": "{asctime}",
+ },
+ style="{",
+ )
+ )
+ log.addHandler(handler)
+ log.info({"sample": "value"})
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("lineno" in data[0][2])
+ self.assertTrue("emitted_at" in data[0][2])
+
+ def test_custom_fmt_with_template_style(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(
+ fmt={
+ "name": "${name}",
+ "lineno": "${lineno}",
+ "emitted_at": "${asctime}",
+ },
+ style="$",
+ )
+ )
+ log.addHandler(handler)
+ log.info({"sample": "value"})
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("lineno" in data[0][2])
+ self.assertTrue("emitted_at" in data[0][2])
+
+ def test_custom_field_raise_exception(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(
+ fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"}
+ )
+ )
+ log.addHandler(handler)
+
+ with self.assertRaises(KeyError):
+ log.info({"sample": "value"})
+
+ log.removeHandler(handler)
+
+ def test_custom_field_fill_missing_fmt_key_is_true(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(
+ fmt={"name": "%(name)s", "custom_field": "%(custom_field)s"},
+ fill_missing_fmt_key=True,
+ )
+ )
+ log.addHandler(handler)
+ log.info({"sample": "value"})
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("name" in data[0][2])
+ self.assertEqual("fluent.test", data[0][2]["name"])
+ self.assertTrue("custom_field" in data[0][2])
+ # field defaults to none if not in log record
+ self.assertIsNone(data[0][2]["custom_field"])
+
+ def test_json_encoded_message(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+
+ log.info('{"key": "hello world!", "param": "value"}')
+
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("key" in data[0][2])
+ self.assertEqual("hello world!", data[0][2]["key"])
+
+ def test_json_encoded_message_without_json(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(
+ fluent.handler.FluentRecordFormatter(format_json=False)
+ )
+ log.addHandler(handler)
+
+ log.info('{"key": "hello world!", "param": "value"}')
+
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("key" not in data[0][2])
+ self.assertEqual(
+ '{"key": "hello world!", "param": "value"}', data[0][2]["message"]
+ )
+
+ def test_unstructured_message(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info("hello %s", "world")
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("message" in data[0][2])
+ self.assertEqual("hello world", data[0][2]["message"])
+
+ def test_unstructured_formatted_message(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info("hello world, %s", "you!")
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("message" in data[0][2])
+ self.assertEqual("hello world, you!", data[0][2]["message"])
+
+ def test_number_string_simple_message(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info("1")
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("message" in data[0][2])
+
+ def test_non_string_simple_message(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info(42)
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ self.assertTrue("message" in data[0][2])
+
+ def test_non_string_dict_message(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ log.info({42: "root"})
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ # For some reason, non-string keys are ignored
+ self.assertFalse(42 in data[0][2])
+
+ def test_exception_message(self):
+ handler = fluent.handler.FluentHandler("app.follow", port=self._port)
+
+ with handler:
+ log = get_logger("fluent.test")
+ handler.setFormatter(fluent.handler.FluentRecordFormatter())
+ log.addHandler(handler)
+ try:
+ raise Exception("sample exception")
+ except Exception:
+ log.exception("it failed")
+ log.removeHandler(handler)
+
+ data = self.get_data()
+ message = data[0][2]["message"]
+ # Includes the logged message, as well as the stack trace.
+ self.assertTrue("it failed" in message)
+ self.assertTrue('tests/test_handler.py", line' in message)
+ self.assertTrue("Exception: sample exception" in message)
diff --git a/tests/test_sender.py b/tests/test_sender.py
index e2e4335..69d09ed 100644
--- a/tests/test_sender.py
+++ b/tests/test_sender.py
@@ -1,36 +1,319 @@
-# -*- coding: utf-8 -*-
-
-from __future__ import print_function
+import errno
+import sys
import unittest
+from shutil import rmtree
+from tempfile import mkdtemp
-import fluent.sender
+import msgpack
+import fluent.sender
from tests import mockserver
+class TestSetup(unittest.TestCase):
+ def tearDown(self):
+ from fluent.sender import _set_global_sender
+
+ _set_global_sender(None)
+
+ def test_no_kwargs(self):
+ fluent.sender.setup("tag")
+ actual = fluent.sender.get_global_sender()
+ self.assertEqual(actual.tag, "tag")
+ self.assertEqual(actual.host, "localhost")
+ self.assertEqual(actual.port, 24224)
+ self.assertEqual(actual.timeout, 3.0)
+
+ def test_host_and_port(self):
+ fluent.sender.setup("tag", host="myhost", port=24225)
+ actual = fluent.sender.get_global_sender()
+ self.assertEqual(actual.tag, "tag")
+ self.assertEqual(actual.host, "myhost")
+ self.assertEqual(actual.port, 24225)
+ self.assertEqual(actual.timeout, 3.0)
+
+ def test_tolerant(self):
+ fluent.sender.setup("tag", host="myhost", port=24225, timeout=1.0)
+ actual = fluent.sender.get_global_sender()
+ self.assertEqual(actual.tag, "tag")
+ self.assertEqual(actual.host, "myhost")
+ self.assertEqual(actual.port, 24225)
+ self.assertEqual(actual.timeout, 1.0)
+
+
class TestSender(unittest.TestCase):
def setUp(self):
- super(TestSender, self).setUp()
- for port in range(10000, 20000):
- try:
- self._server = mockserver.MockRecvServer('localhost', port)
- break
- except IOError as exc:
- print(exc)
- self._sender = fluent.sender.FluentSender(tag='test', port=port)
+ super().setUp()
+ self._server = mockserver.MockRecvServer("localhost")
+ self._sender = fluent.sender.FluentSender(tag="test", port=self._server.port)
+
+ def tearDown(self):
+ try:
+ self._sender.close()
+ finally:
+ self._server.close()
def get_data(self):
- return self._server.get_recieved()
+ return self._server.get_received()
def test_simple(self):
sender = self._sender
- sender.emit('foo', {'bar': 'baz'})
+ sender.emit("foo", {"bar": "baz"})
sender._close()
data = self.get_data()
eq = self.assertEqual
eq(1, len(data))
eq(3, len(data[0]))
- eq('test.foo', data[0][0])
- eq({'bar': 'baz'}, data[0][2])
+ eq("test.foo", data[0][0])
+ eq({"bar": "baz"}, data[0][2])
+ self.assertTrue(data[0][1])
+ self.assertTrue(isinstance(data[0][1], int))
+
+ def test_decorator_simple(self):
+ with self._sender as sender:
+ sender.emit("foo", {"bar": "baz"})
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(1, len(data))
+ eq(3, len(data[0]))
+ eq("test.foo", data[0][0])
+ eq({"bar": "baz"}, data[0][2])
self.assertTrue(data[0][1])
self.assertTrue(isinstance(data[0][1], int))
+
+ def test_nanosecond(self):
+ sender = self._sender
+ sender.nanosecond_precision = True
+ sender.emit("foo", {"bar": "baz"})
+ sender._close()
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(1, len(data))
+ eq(3, len(data[0]))
+ eq("test.foo", data[0][0])
+ eq({"bar": "baz"}, data[0][2])
+ self.assertTrue(isinstance(data[0][1], msgpack.ExtType))
+ eq(data[0][1].code, 0)
+
+ def test_nanosecond_coerce_float(self):
+ time = 1490061367.8616468906402588
+ sender = self._sender
+ sender.nanosecond_precision = True
+ sender.emit_with_time("foo", time, {"bar": "baz"})
+ sender._close()
+ data = self.get_data()
+ eq = self.assertEqual
+ eq(1, len(data))
+ eq(3, len(data[0]))
+ eq("test.foo", data[0][0])
+ eq({"bar": "baz"}, data[0][2])
+ self.assertTrue(isinstance(data[0][1], msgpack.ExtType))
+ eq(data[0][1].code, 0)
+ eq(data[0][1].data, b"X\xd0\x8873[\xb0*")
+
+ def test_no_last_error_on_successful_emit(self):
+ sender = self._sender
+ sender.emit("foo", {"bar": "baz"})
+ sender._close()
+
+ self.assertEqual(sender.last_error, None)
+
+ def test_last_error_property(self):
+ EXCEPTION_MSG = "custom exception for testing last_error property"
+ self._sender.last_error = OSError(EXCEPTION_MSG)
+
+ self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG)
+
+ def test_clear_last_error(self):
+ EXCEPTION_MSG = "custom exception for testing clear_last_error"
+ self._sender.last_error = OSError(EXCEPTION_MSG)
+ self._sender.clear_last_error()
+
+ self.assertEqual(self._sender.last_error, None)
+ self._sender.clear_last_error()
+ self.assertEqual(self._sender.last_error, None)
+
+ def test_emit_error(self):
+ with self._sender as sender:
+ sender.emit("blah", {"a": object()})
+
+ data = self._server.get_received()
+ self.assertEqual(len(data), 1)
+ self.assertEqual(data[0][2]["message"], "Can't output to log")
+
+ def test_emit_error_no_forward(self):
+ with self._sender as sender:
+ sender.forward_packet_error = False
+ with self.assertRaises(TypeError):
+ sender.emit("blah", {"a": object()})
+
+ def test_emit_after_close(self):
+ with self._sender as sender:
+ self.assertTrue(sender.emit("blah", {"a": "123"}))
+ sender.close()
+ self.assertFalse(sender.emit("blah", {"a": "456"}))
+
+ data = self._server.get_received()
+ self.assertEqual(len(data), 1)
+ self.assertEqual(data[0][2]["a"], "123")
+
+ def test_verbose(self):
+ with self._sender as sender:
+ sender.verbose = True
+ sender.emit("foo", {"bar": "baz"})
+ # No assertions here, just making sure there are no exceptions
+
+ def test_failure_to_connect(self):
+ self._server.close()
+
+ with self._sender as sender:
+ sender._send_internal(b"123")
+ self.assertEqual(sender.pendings, b"123")
+ self.assertIsNone(sender.socket)
+
+ sender._send_internal(b"456")
+ self.assertEqual(sender.pendings, b"123456")
+ self.assertIsNone(sender.socket)
+
+ sender.pendings = None
+ overflows = []
+
+ def boh(buf):
+ overflows.append(buf)
+
+ def boh_with_error(buf):
+ raise RuntimeError
+
+ sender.buffer_overflow_handler = boh
+
+ sender._send_internal(b"0" * sender.bufmax)
+ self.assertFalse(overflows) # No overflow
+
+ sender._send_internal(b"1")
+ self.assertTrue(overflows)
+ self.assertEqual(overflows.pop(0), b"0" * sender.bufmax + b"1")
+
+ sender.buffer_overflow_handler = None
+ sender._send_internal(b"0" * sender.bufmax)
+ sender._send_internal(b"1")
+ self.assertIsNone(sender.pendings)
+
+ sender.buffer_overflow_handler = boh_with_error
+ sender._send_internal(b"0" * sender.bufmax)
+ sender._send_internal(b"1")
+ self.assertIsNone(sender.pendings)
+
+ sender._send_internal(b"1")
+ self.assertFalse(overflows) # No overflow
+ self.assertEqual(sender.pendings, b"1")
+ self.assertIsNone(sender.socket)
+
+ sender.buffer_overflow_handler = boh
+ sender.close()
+ self.assertEqual(overflows.pop(0), b"1")
+
+ def test_broken_conn(self):
+ with self._sender as sender:
+ sender._send_internal(b"123")
+ self.assertIsNone(sender.pendings, b"123")
+ self.assertTrue(sender.socket)
+
+ class FakeSocket:
+ def __init__(self):
+ self.to = 123
+ self.send_side_effects = [3, 0, 9]
+ self.send_idx = 0
+ self.recv_side_effects = [
+ OSError(errno.EWOULDBLOCK, "Blah"),
+ b"this data is going to be ignored",
+ b"",
+ OSError(errno.EWOULDBLOCK, "Blah"),
+ OSError(errno.EWOULDBLOCK, "Blah"),
+ OSError(errno.EACCES, "This error will never happen"),
+ ]
+ self.recv_idx = 0
+
+ def send(self, bytes_):
+ try:
+ v = self.send_side_effects[self.send_idx]
+ if isinstance(v, Exception):
+ raise v
+ if isinstance(v, type) and issubclass(v, Exception):
+ raise v()
+ return v
+ finally:
+ self.send_idx += 1
+
+ def shutdown(self, mode):
+ pass
+
+ def close(self):
+ pass
+
+ def settimeout(self, to):
+ self.to = to
+
+ def gettimeout(self):
+ return self.to
+
+ def recv(self, bufsize, flags=0):
+ try:
+ v = self.recv_side_effects[self.recv_idx]
+ if isinstance(v, Exception):
+ raise v
+ if isinstance(v, type) and issubclass(v, Exception):
+ raise v()
+ return v
+ finally:
+ self.recv_idx += 1
+
+ old_sock = self._sender.socket
+ sock = FakeSocket()
+
+ try:
+ self._sender.socket = sock
+ sender.last_error = None
+ self.assertTrue(sender._send_internal(b"456"))
+ self.assertFalse(sender.last_error)
+
+ self._sender.socket = sock
+ sender.last_error = None
+ self.assertFalse(sender._send_internal(b"456"))
+ self.assertEqual(sender.last_error.errno, errno.EPIPE)
+
+ self._sender.socket = sock
+ sender.last_error = None
+ self.assertFalse(sender._send_internal(b"456"))
+ self.assertEqual(sender.last_error.errno, errno.EPIPE)
+
+ self._sender.socket = sock
+ sender.last_error = None
+ self.assertFalse(sender._send_internal(b"456"))
+ self.assertEqual(sender.last_error.errno, errno.EACCES)
+ finally:
+ self._sender.socket = old_sock
+
+ @unittest.skipIf(sys.platform == "win32", "Unix socket not supported")
+ def test_unix_socket(self):
+ self.tearDown()
+ tmp_dir = mkdtemp()
+ try:
+ server_file = "unix://" + tmp_dir + "/tmp.unix"
+ self._server = mockserver.MockRecvServer(server_file)
+ self._sender = fluent.sender.FluentSender(tag="test", host=server_file)
+ with self._sender as sender:
+ self.assertTrue(sender.emit("foo", {"bar": "baz"}))
+
+ data = self._server.get_received()
+ self.assertEqual(len(data), 1)
+ self.assertEqual(data[0][2], {"bar": "baz"})
+
+ finally:
+ rmtree(tmp_dir, True)
+
+
+class TestEventTime(unittest.TestCase):
+ def test_event_time(self):
+ time = fluent.sender.EventTime(1490061367.8616468906402588)
+ self.assertEqual(time.code, 0)
+ self.assertEqual(time.data, b"X\xd0\x8873[\xb0*")
diff --git a/tox.ini b/tox.ini
index 21cc4a2..14634e9 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,9 +1,11 @@
[tox]
minversion = 1.7.2
-envlist = py26, py27, py32, py33, py34
+envlist = py27, py32, py33, py34, py35, py36, py37, py38
skip_missing_interpreters = True
[testenv]
-deps = nose
- coverage
-commands = python setup.py nosetests
+deps =
+ pytest
+ pytest-cov
+ msgpack
+commands = pytest --cov=fluent