Skip to content

Commit 0187c89

Browse files
committed
Fixes for several open issues
Propagate exceptions raised by the user's packet callback -- fixes oremanj#31, oremanj#50 Warn about exceptions raised by the packet callback during queue unbinding Raise an error if a packet verdict is set after its parent queue is closed set_payload() now affects the result of later get_payload() -- fixes oremanj#30 Handle signals received when run() is blocked in recv() -- fixes oremanj#65
1 parent ddbc12a commit 0187c89

File tree

6 files changed

+240
-61
lines changed

6 files changed

+240
-61
lines changed

CHANGES.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
v1.0.0, unreleased
2+
Propagate exceptions raised by the user's packet callback
3+
Warn about exceptions raised by the packet callback during queue unbinding
4+
Raise an error if a packet verdict is set after its parent queue is closed
5+
set_payload() now affects the result of later get_payload()
6+
Handle signals received when run() is blocked in recv()
7+
18
v0.9.0, 12 Jan 2021
29
Improve usability when Packet objects are retained past the callback
310
Add Packet.retain() to save the packet contents in such cases

netfilterqueue.pxd

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ cdef extern from "<errno.h>":
88

99
# dummy defines from asm-generic/errno.h:
1010
cdef enum:
11+
EINTR = 4
1112
EAGAIN = 11 # Try again
1213
EWOULDBLOCK = EAGAIN
1314
ENOBUFS = 105 # No buffer space available
@@ -115,15 +116,17 @@ cdef extern from "libnetfilter_queue/libnetfilter_queue.h":
115116
u_int16_t num,
116117
nfq_callback *cb,
117118
void *data)
118-
int nfq_destroy_queue(nfq_q_handle *qh)
119119

120-
int nfq_handle_packet(nfq_handle *h, char *buf, int len)
121-
122-
int nfq_set_mode(nfq_q_handle *qh,
123-
u_int8_t mode, unsigned int len)
124-
125-
q_set_queue_maxlen(nfq_q_handle *qh,
126-
u_int32_t queuelen)
120+
# Any function that parses Netlink replies might invoke the user
121+
# callback and thus might need to propagate a Python exception.
122+
# This includes nfq_handle_packet but is not limited to that --
123+
# other functions might send a query, read until they get the reply,
124+
# and find a packet notification before the reply which they then
125+
# must deal with.
126+
int nfq_destroy_queue(nfq_q_handle *qh) except? -1
127+
int nfq_handle_packet(nfq_handle *h, char *buf, int len) except? -1
128+
int nfq_set_mode(nfq_q_handle *qh, u_int8_t mode, unsigned int len) except? -1
129+
int nfq_set_queue_maxlen(nfq_q_handle *qh, u_int32_t queuelen) except? -1
127130

128131
int nfq_set_verdict(nfq_q_handle *qh,
129132
u_int32_t id,
@@ -137,7 +140,6 @@ cdef extern from "libnetfilter_queue/libnetfilter_queue.h":
137140
u_int32_t mark,
138141
u_int32_t datalen,
139142
unsigned char *buf) nogil
140-
int nfq_set_queue_maxlen(nfq_q_handle *qh, u_int32_t queuelen)
141143

142144
int nfq_fd(nfq_handle *h)
143145
nfqnl_msg_packet_hdr *nfq_get_msg_packet_hdr(nfq_data *nfad)
@@ -146,7 +148,7 @@ cdef extern from "libnetfilter_queue/libnetfilter_queue.h":
146148
nfqnl_msg_packet_hw *nfq_get_packet_hw(nfq_data *nfad)
147149
int nfq_get_nfmark (nfq_data *nfad)
148150
nfnl_handle *nfq_nfnlh(nfq_handle *h)
149-
151+
150152
# Dummy defines from linux/socket.h:
151153
cdef enum: # Protocol families, same as address families.
152154
PF_INET = 2
@@ -166,8 +168,14 @@ cdef enum:
166168
NF_STOP
167169
NF_MAX_VERDICT = NF_STOP
168170

171+
cdef class NetfilterQueue:
172+
cdef object user_callback # User callback
173+
cdef nfq_handle *h # Handle to NFQueue library
174+
cdef nfq_q_handle *qh # A handle to the queue
175+
cdef bint unbinding
176+
169177
cdef class Packet:
170-
cdef nfq_q_handle *_qh
178+
cdef NetfilterQueue _queue
171179
cdef bint _verdict_is_set # True if verdict has been issued,
172180
# false otherwise
173181
cdef bint _mark_is_set # True if a mark has been given, false otherwise
@@ -196,9 +204,9 @@ cdef class Packet:
196204
#cdef readonly u_int32_t outdev
197205
#cdef readonly u_int32_t physoutdev
198206

199-
cdef set_nfq_data(self, nfq_q_handle *qh, nfq_data *nfa)
207+
cdef set_nfq_data(self, NetfilterQueue queue, nfq_data *nfa)
200208
cdef drop_refs(self)
201-
cdef void verdict(self, u_int8_t verdict)
209+
cdef int verdict(self, u_int8_t verdict) except -1
202210
cpdef Py_ssize_t get_payload_len(self)
203211
cpdef double get_timestamp(self)
204212
cpdef bytes get_payload(self)
@@ -209,11 +217,3 @@ cdef class Packet:
209217
cpdef accept(self)
210218
cpdef drop(self)
211219
cpdef repeat(self)
212-
213-
cdef class NetfilterQueue:
214-
cdef object user_callback # User callback
215-
cdef nfq_handle *h # Handle to NFQueue library
216-
cdef nfq_q_handle *qh # A handle to the queue
217-
cdef u_int16_t af # Address family
218-
cdef packet_copy_size # Amount of packet metadata + data copied to buffer
219-

netfilterqueue.pyx

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,38 @@ DEF SockRcvSize = DEFAULT_MAX_QUEUELEN * SockCopySize // 2
2626

2727
cdef extern from *:
2828
"""
29-
#if PY_MAJOR_VERSION < 3
30-
#define PyBytes_FromStringAndSize PyString_FromStringAndSize
31-
#endif
29+
static void do_write_unraisable(PyObject* obj) {
30+
PyObject *ty, *val, *tb;
31+
PyErr_GetExcInfo(&ty, &val, &tb);
32+
PyErr_Restore(ty, val, tb);
33+
PyErr_WriteUnraisable(obj);
34+
}
3235
"""
36+
cdef void do_write_unraisable(msg)
3337

38+
39+
from cpython.exc cimport PyErr_CheckSignals
40+
41+
# A negative return value from this callback will stop processing and
42+
# make nfq_handle_packet return -1, so we use that as the error flag.
3443
cdef int global_callback(nfq_q_handle *qh, nfgenmsg *nfmsg,
35-
nfq_data *nfa, void *data) with gil:
44+
nfq_data *nfa, void *data) except -1 with gil:
3645
"""Create a Packet and pass it to appropriate callback."""
3746
cdef NetfilterQueue nfqueue = <NetfilterQueue>data
3847
cdef object user_callback = <object>nfqueue.user_callback
3948
packet = Packet()
40-
packet.set_nfq_data(qh, nfa)
41-
user_callback(packet)
42-
packet.drop_refs()
49+
packet.set_nfq_data(nfqueue, nfa)
50+
try:
51+
user_callback(packet)
52+
except BaseException as exc:
53+
if nfqueue.unbinding == True:
54+
do_write_unraisable(
55+
"netfilterqueue callback during unbind"
56+
)
57+
else:
58+
raise
59+
finally:
60+
packet.drop_refs()
4361
return 1
4462

4563
cdef class Packet:
@@ -54,7 +72,7 @@ cdef class Packet:
5472
protocol = PROTOCOLS.get(hdr.protocol, "Unknown protocol")
5573
return "%s packet, %s bytes" % (protocol, self.payload_len)
5674

57-
cdef set_nfq_data(self, nfq_q_handle *qh, nfq_data *nfa):
75+
cdef set_nfq_data(self, NetfilterQueue queue, nfq_data *nfa):
5876
"""
5977
Assign a packet from NFQ to this object. Parse the header and load
6078
local values.
@@ -63,7 +81,7 @@ cdef class Packet:
6381
cdef nfqnl_msg_packet_hdr *hdr
6482

6583
hdr = nfq_get_msg_packet_hdr(nfa)
66-
self._qh = qh
84+
self._queue = queue
6785
self.id = ntohl(hdr.packet_id)
6886
self.hw_protocol = ntohs(hdr.hw_protocol)
6987
self.hook = hdr.hook
@@ -90,10 +108,12 @@ cdef class Packet:
90108
"""
91109
self.payload = NULL
92110

93-
cdef void verdict(self, u_int8_t verdict):
111+
cdef int verdict(self, u_int8_t verdict) except -1:
94112
"""Call appropriate set_verdict... function on packet."""
95113
if self._verdict_is_set:
96-
raise RuntimeWarning("Verdict already given for this packet.")
114+
raise RuntimeError("Verdict already given for this packet")
115+
if self._queue.qh == NULL:
116+
raise RuntimeError("Parent queue has already been unbound")
97117

98118
cdef u_int32_t modified_payload_len = 0
99119
cdef unsigned char *modified_payload = NULL
@@ -102,15 +122,15 @@ cdef class Packet:
102122
modified_payload = self._given_payload
103123
if self._mark_is_set:
104124
nfq_set_verdict2(
105-
self._qh,
125+
self._queue.qh,
106126
self.id,
107127
verdict,
108128
self._given_mark,
109129
modified_payload_len,
110130
modified_payload)
111131
else:
112132
nfq_set_verdict(
113-
self._qh,
133+
self._queue.qh,
114134
self.id,
115135
verdict,
116136
modified_payload_len,
@@ -126,7 +146,9 @@ cdef class Packet:
126146

127147
cpdef bytes get_payload(self):
128148
"""Return payload as Python string."""
129-
if self._owned_payload:
149+
if self._given_payload:
150+
return self._given_payload
151+
elif self._owned_payload:
130152
return self._owned_payload
131153
elif self.payload != NULL:
132154
return self.payload[:self.payload_len]
@@ -172,22 +194,23 @@ cdef class Packet:
172194
"""Repeat the packet."""
173195
self.verdict(NF_REPEAT)
174196

197+
175198
cdef class NetfilterQueue:
176199
"""Handle a single numbered queue."""
177200
def __cinit__(self, *args, **kwargs):
178-
self.af = kwargs.get("af", PF_INET)
201+
cdef u_int16_t af # Address family
202+
af = kwargs.get("af", PF_INET)
179203

204+
self.unbinding = False
180205
self.h = nfq_open()
181206
if self.h == NULL:
182207
raise OSError("Failed to open NFQueue.")
183-
nfq_unbind_pf(self.h, self.af) # This does NOT kick out previous
184-
# running queues
185-
if nfq_bind_pf(self.h, self.af) < 0:
186-
raise OSError("Failed to bind family %s. Are you root?" % self.af)
208+
nfq_unbind_pf(self.h, af) # This does NOT kick out previous queues
209+
if nfq_bind_pf(self.h, af) < 0:
210+
raise OSError("Failed to bind family %s. Are you root?" % af)
187211

188212
def __dealloc__(self):
189-
if self.qh != NULL:
190-
nfq_destroy_queue(self.qh)
213+
self.unbind()
191214
# Don't call nfq_unbind_pf unless you want to disconnect any other
192215
# processes using this libnetfilter_queue on this protocol family!
193216
nfq_close(self.h)
@@ -232,7 +255,11 @@ cdef class NetfilterQueue:
232255
def unbind(self):
233256
"""Destroy the queue."""
234257
if self.qh != NULL:
235-
nfq_destroy_queue(self.qh)
258+
self.unbinding = True
259+
try:
260+
nfq_destroy_queue(self.qh)
261+
finally:
262+
self.unbinding = False
236263
self.qh = NULL
237264
# See warning about nfq_unbind_pf in __dealloc__ above.
238265

@@ -251,11 +278,19 @@ cdef class NetfilterQueue:
251278
while True:
252279
with nogil:
253280
rv = recv(fd, buf, sizeof(buf), recv_flags)
254-
if (rv >= 0):
255-
nfq_handle_packet(self.h, buf, rv)
256-
else:
257-
if errno != ENOBUFS:
281+
if rv < 0:
282+
if errno == EAGAIN:
258283
break
284+
if errno == ENOBUFS:
285+
# Kernel is letting us know we dropped a packet
286+
continue
287+
if errno == EINTR:
288+
PyErr_CheckSignals()
289+
continue
290+
raise OSError(errno, "recv failed")
291+
rv = nfq_handle_packet(self.h, buf, rv)
292+
if rv < 0:
293+
raise OSError(errno, "nfq_handle_packet failed")
259294

260295
def run_socket(self, s):
261296
"""Accept packets using socket.recv so that, for example, gevent can monkeypatch it."""

setup.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
# Use Cython
88
from Cython.Build import cythonize
99

10+
setup_requires = []
1011
ext_modules = cythonize(
1112
Extension(
1213
"netfilterqueue", ["netfilterqueue.pyx"], libraries=["netfilter_queue"]
@@ -15,7 +16,11 @@
1516
)
1617
except ImportError:
1718
# No Cython
18-
if not os.path.exists(os.path.join(os.path.dirname(__file__), "netfilterqueue.c")):
19+
if "egg_info" in sys.argv:
20+
# We're being run by pip to figure out what we need. Request cython in
21+
# setup_requires below.
22+
setup_requires = ["cython"]
23+
elif not os.path.exists(os.path.join(os.path.dirname(__file__), "netfilterqueue.c")):
1924
sys.stderr.write(
2025
"You must have Cython installed (`pip install cython`) to build this "
2126
"package from source.\nIf you're receiving this error when installing from "
@@ -29,6 +34,7 @@
2934

3035
setup(
3136
ext_modules=ext_modules,
37+
setup_requires=setup_requires,
3238
name="NetfilterQueue",
3339
version=VERSION,
3440
license="MIT",

tests/conftest.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import trio
88
import unshare
99
import netfilterqueue
10-
from typing import AsyncIterator
10+
from functools import partial
11+
from typing import AsyncIterator, Callable, Optional
1112
from async_generator import asynccontextmanager
1213
from pytest_trio.enable_trio_mode import *
1314

@@ -93,7 +94,7 @@ async def peer_main(idx: int, parent_fd: int) -> None:
9394

9495
# Enter the message-forwarding loop
9596
async def proxy_one_way(src, dest):
96-
while True:
97+
while src.fileno() >= 0:
9798
try:
9899
msg = await src.recv(4096)
99100
except trio.ClosedResourceError:
@@ -111,6 +112,14 @@ async def proxy_one_way(src, dest):
111112
nursery.start_soon(proxy_one_way, peer, parent)
112113

113114

115+
def _default_capture_cb(
116+
target: "trio.MemorySendChannel[netfilterqueue.Packet]",
117+
packet: netfilterqueue.Packet,
118+
) -> None:
119+
packet.retain()
120+
target.send_nowait(packet)
121+
122+
114123
class Harness:
115124
def __init__(self):
116125
self._received = {}
@@ -155,7 +164,9 @@ async def _run_peer(self, idx: int, *, task_status):
155164
"peer subprocess exited with code {}".format(retval)
156165
)
157166
finally:
158-
await trio.run_process(f"ip link delete veth{idx}".split())
167+
# On some kernels the veth device is removed when the subprocess exits
168+
# and its netns goes away. check=False to suppress that error.
169+
await trio.run_process(f"ip link delete veth{idx}".split(), check=False)
159170

160171
async def _manage_peer(self, idx: int, *, task_status):
161172
async with trio.open_nursery() as nursery:
@@ -192,24 +203,28 @@ async def run(self):
192203

193204
@asynccontextmanager
194205
async def capture_packets_to(
195-
self, idx: int, *, queue_num: int = -1, **options
206+
self,
207+
idx: int,
208+
cb: Callable[
209+
["trio.MemorySendChannel[netfilterqueue.Packet]", netfilterqueue.Packet],
210+
None,
211+
] = _default_capture_cb,
212+
*,
213+
queue_num: int = -1,
214+
**options: int,
196215
) -> AsyncIterator["trio.MemoryReceiveChannel[netfilterqueue.Packet]"]:
197216

198217
packets_w, packets_r = trio.open_memory_channel(math.inf)
199218

200-
def stash_packet(p):
201-
p.retain()
202-
packets_w.send_nowait(p)
203-
204219
nfq = netfilterqueue.NetfilterQueue()
205220
# Use a smaller socket buffer to avoid a warning in CI
206221
options.setdefault("sock_len", 131072)
207222
if queue_num >= 0:
208-
nfq.bind(queue_num, stash_packet, **options)
223+
nfq.bind(queue_num, partial(cb, packets_w), **options)
209224
else:
210225
for queue_num in range(16):
211226
try:
212-
nfq.bind(queue_num, stash_packet, **options)
227+
nfq.bind(queue_num, partial(cb, packets_w), **options)
213228
break
214229
except Exception as ex:
215230
last_error = ex

0 commit comments

Comments
 (0)