Skip to content

Commit fffa39a

Browse files
authored
Merge pull request fluent#82 from luhn/nanosecond-time
Nanosecond-precision timestamps
2 parents 5c4dff1 + 82040c8 commit fffa39a

File tree

3 files changed

+69
-1
lines changed

3 files changed

+69
-1
lines changed

README.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ fluentd, with tag 'app.follow' and the attributes 'from' and 'to'.
9090
cur_time = int(time.time())
9191
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to':'userB'})
9292
93+
To send events with nanosecond-precision timestamps (Fluent 0.14 and up),
94+
specify `nanosecond_precision` on `FluentSender`.
95+
96+
.. code:: python
97+
98+
# Use nanosecond
99+
logger = sender.FluentSender('app', nanosecond_precision=True)
100+
logger.emit('follow', {'from': 'userA', 'to': 'userB'})
101+
logger.emit_with_time('follow', time.time(), {'from': 'userA', 'to': 'userB'})
102+
93103
You can detect an error via return value of `emit`. If an error happens in `emit`, `emit` returns `False` and get an error object using `last_error` method.
94104

95105
.. code:: python

fluent/sender.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- coding: utf-8 -*-
22

33
from __future__ import print_function
4+
import struct
45
import socket
56
import threading
67
import time
@@ -30,6 +31,18 @@ def get_global_sender():
3031
def close():
3132
get_global_sender().close()
3233

34+
35+
class EventTime(msgpack.ExtType):
36+
def __new__(cls, timestamp):
37+
seconds = int(timestamp)
38+
nanoseconds = int(timestamp % 1 * 10 ** 9)
39+
return super(EventTime, cls).__new__(
40+
cls,
41+
code=0,
42+
data=struct.pack(">II", seconds, nanoseconds),
43+
)
44+
45+
3346
class FluentSender(object):
3447
def __init__(self,
3548
tag,
@@ -39,6 +52,7 @@ def __init__(self,
3952
timeout=3.0,
4053
verbose=False,
4154
buffer_overflow_handler=None,
55+
nanosecond_precision=False,
4256
**kwargs):
4357

4458
self.tag = tag
@@ -48,6 +62,7 @@ def __init__(self,
4862
self.timeout = timeout
4963
self.verbose = verbose
5064
self.buffer_overflow_handler = buffer_overflow_handler
65+
self.nanosecond_precision = nanosecond_precision
5166

5267
self.socket = None
5368
self.pendings = None
@@ -61,10 +76,15 @@ def __init__(self,
6176
self._close()
6277

6378
def emit(self, label, data):
64-
cur_time = int(time.time())
79+
if self.nanosecond_precision:
80+
cur_time = EventTime(time.time())
81+
else:
82+
cur_time = int(time.time())
6583
return self.emit_with_time(label, cur_time, data)
6684

6785
def emit_with_time(self, label, timestamp, data):
86+
if self.nanosecond_precision and isinstance(timestamp, float):
87+
timestamp = EventTime(timestamp)
6888
try:
6989
bytes_ = self._make_packet(label, timestamp, data)
7090
except Exception as e:

tests/test_sender.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import print_function
44
import unittest
55
import socket
6+
import msgpack
67

78
import fluent.sender
89
from tests import mockserver
@@ -64,6 +65,36 @@ def test_simple(self):
6465
self.assertTrue(data[0][1])
6566
self.assertTrue(isinstance(data[0][1], int))
6667

68+
def test_nanosecond(self):
69+
sender = self._sender
70+
sender.nanosecond_precision = True
71+
sender.emit('foo', {'bar': 'baz'})
72+
sender._close()
73+
data = self.get_data()
74+
eq = self.assertEqual
75+
eq(1, len(data))
76+
eq(3, len(data[0]))
77+
eq('test.foo', data[0][0])
78+
eq({'bar': 'baz'}, data[0][2])
79+
self.assertTrue(isinstance(data[0][1], msgpack.ExtType))
80+
eq(data[0][1].code, 0)
81+
82+
def test_nanosecond_coerce_float(self):
83+
time = 1490061367.8616468906402588
84+
sender = self._sender
85+
sender.nanosecond_precision = True
86+
sender.emit_with_time('foo', time, {'bar': 'baz'})
87+
sender._close()
88+
data = self.get_data()
89+
eq = self.assertEqual
90+
eq(1, len(data))
91+
eq(3, len(data[0]))
92+
eq('test.foo', data[0][0])
93+
eq({'bar': 'baz'}, data[0][2])
94+
self.assertTrue(isinstance(data[0][1], msgpack.ExtType))
95+
eq(data[0][1].code, 0)
96+
eq(data[0][1].data, b'X\xd0\x8873[\xb0*')
97+
6798
def test_no_last_error_on_successful_emit(self):
6899
sender = self._sender
69100
sender.emit('foo', {'bar': 'baz'})
@@ -93,3 +124,10 @@ def test_connect_exception_during_sender_init(self, mock_socket):
93124
mock_connect.side_effect = socket.error(EXCEPTION_MSG)
94125

95126
self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG)
127+
128+
129+
class TestEventTime(unittest.TestCase):
130+
def test_event_time(self):
131+
time = fluent.sender.EventTime(1490061367.8616468906402588)
132+
self.assertEqual(time.code, 0)
133+
self.assertEqual(time.data, b'X\xd0\x8873[\xb0*')

0 commit comments

Comments
 (0)