Skip to content

Commit f108564

Browse files
Test combining events for paste detection
1 parent eefbff4 commit f108564

File tree

2 files changed

+88
-4
lines changed

2 files changed

+88
-4
lines changed

bpython/curtsies.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,15 @@ def main(args=None, locals_=None, banner=None, welcome_message=None):
9494

9595
def _combined_events(event_provider, paste_threshold):
9696
"""Combines consecutive keypress events into paste events."""
97-
timeout = (yield 'nonsense_event') # so send can be used
97+
timeout = yield 'nonsense_event' # so send can be used immediately
9898
queue = collections.deque()
9999
while True:
100100
e = event_provider.send(timeout)
101101
if isinstance(e, curtsies.events.Event):
102-
timeout = (yield e)
102+
timeout = yield e
103103
continue
104104
elif e is None:
105+
timeout = yield None
105106
continue
106107
else:
107108
queue.append(e)
@@ -113,10 +114,10 @@ def _combined_events(event_provider, paste_threshold):
113114
paste = curtsies.events.PasteEvent()
114115
paste.events.extend(queue)
115116
queue.clear()
116-
yield paste
117+
timeout = yield paste
117118
else:
118119
while len(queue):
119-
yield queue.popleft()
120+
timeout = yield queue.popleft()
120121

121122

122123
def combined_events(event_provider, paste_threshold=10):

bpython/test/test_curtsies.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# coding: utf-8
2+
from __future__ import unicode_literals
3+
4+
from collections import namedtuple
5+
6+
from bpython.curtsies import combined_events
7+
from bpython.test import (FixLanguageTestCase as TestCase, unittest)
8+
9+
import curtsies.events
10+
11+
12+
ScheduledEvent = namedtuple('ScheduledEvent', ['when', 'event'])
13+
14+
15+
class EventGenerator(object):
16+
def __init__(self, initial_events=(), scheduled_events=()):
17+
self._events = []
18+
self._current_tick = 0
19+
for e in initial_events:
20+
self.schedule_event(e, 0)
21+
for e, w in scheduled_events:
22+
self.schedule_event(e, w)
23+
24+
def schedule_event(self, event, when):
25+
self._events.append(ScheduledEvent(when, event))
26+
self._events.sort()
27+
28+
def send(self, timeout=None):
29+
if timeout not in [None, 0]:
30+
raise ValueError('timeout value %r not supported' % timeout)
31+
if not self._events:
32+
return None
33+
if self._events[0].when <= self._current_tick:
34+
return self._events.pop(0).event
35+
36+
if timeout == 0:
37+
return None
38+
elif timeout is None:
39+
e = self._events.pop(0)
40+
self._current_tick = e.when
41+
return e.event
42+
else:
43+
raise ValueError('timeout value %r not supported' % timeout)
44+
45+
def tick(self, dt=1):
46+
self._current_tick += dt
47+
48+
49+
class TestCurtsiesPasteDetection(TestCase):
50+
def test_paste_threshold(self):
51+
inputs = combined_events(EventGenerator(list('abc')))
52+
cb = combined_events(inputs, paste_threshold=3)
53+
e = next(cb)
54+
self.assertIsInstance(e, curtsies.events.PasteEvent)
55+
self.assertEqual(e.events, list('abc'))
56+
self.assertEqual(next(cb), None)
57+
58+
inputs = combined_events(EventGenerator(list('abc')))
59+
cb = combined_events(inputs, paste_threshold=4)
60+
self.assertEqual(next(cb), 'a')
61+
self.assertEqual(next(cb), 'b')
62+
self.assertEqual(next(cb), 'c')
63+
self.assertEqual(next(cb), None)
64+
65+
def test_set_timeout(self):
66+
eg = EventGenerator('a', zip('bcd', [1,2,3]))
67+
inputs = combined_events(eg)
68+
cb = combined_events(inputs, paste_threshold=5)
69+
self.assertEqual(next(cb), 'a')
70+
self.assertEqual(cb.send(0), None)
71+
self.assertEqual(next(cb), 'b')
72+
self.assertEqual(cb.send(0), None)
73+
eg.tick()
74+
self.assertEqual(cb.send(0), 'c')
75+
self.assertEqual(cb.send(0), None)
76+
eg.tick()
77+
self.assertEqual(cb.send(0), 'd')
78+
self.assertEqual(cb.send(0), None)
79+
80+
81+
82+
if __name__ == '__main__':
83+
unittest.main()

0 commit comments

Comments
 (0)