Skip to content

Correlation IDs do not match [1.4.5] #1748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
isamaru opened this issue Mar 20, 2019 · 6 comments
Closed

Correlation IDs do not match [1.4.5] #1748

isamaru opened this issue Mar 20, 2019 · 6 comments

Comments

@isamaru
Copy link
Contributor

isamaru commented Mar 20, 2019

I am trying 1.4.5 consumers with autocommit=off. I occassionally see Correlation IDs do not match` error in the logs, in different scenarios. Sometimes it's OffsetCommitRequest, sometimes HeartbeatRequest.

2019-03-20 16:20:18,083 WARNING [kafka.client] Node 2 connection failed -- refreshing metadata
2019-03-20 16:20:18,084 ERROR [kafka.coordinator] Error sending OffsetCommitRequest_v2 to node 2 [CorrelationIdError: Correlation IDs do not match: sent 266569, recv 266570]
2019-03-20 16:20:18,084 WARNING [kafka.coordinator] Marking the coordinator dead (node 2) for group <redacted-group>: CorrelationIdError: Correlation IDs do not match: sent 266569, recv 266570.
2019-03-20 16:20:18,085 ERROR [kafka.coordinator] Error sending HeartbeatRequest_v1 to node 2 [CorrelationIdError: Correlation IDs do not match: sent 266569, recv 266570]
2019-03-20 16:20:18,189 INFO [kafka.cluster] Group coordinator for <redacted-group> is BrokerMetadata(nodeId=2, host='<redacted-node-2>', port=9092, rack=None)

#1733 uses map to pair responses with correlate_ids, but only does so in conn.py. protocol/parser.py still uses deque and this is where the error occurs.

Related to my investigation of #1744, where I am playing with more locking on in_flight_requests manipulation and closing in conn.py. This causes this race condition to occur even more frequently in parser.py (which is not that surprising).
Also related to #1529

@dpkp
Copy link
Owner

dpkp commented Mar 21, 2019

Background questions: (1) what broker version(s) are you running, and (2) what is your security_protocol configuration (SSL? SASL? both?)

Because the broker protocol is ordered, the protocol parser buffer should also be an ordered queue. This is also how the official java client is structured. So I don't plan to accept any changes to the underlying protocol structure / dequeue.

If Correlation Ids do not match there are only a few explanations:

(1) tracking of what we actually sent on the socket gets out of sync (the protocol parser dequeue)
(2) tracking of what we actually received on the socket gets out of sync (same)
(3) we incorrectly expected a response when we shouldn't (e.g., a ProduceRequest with acks=0)
(4) there is a broker bug that breaks response ordering

Going backwards,
(4) is extremely unlikely (but not impossible if you're running a strange or hacked-on broker version.
(3) Is also unlikely. The only request that I know of that does not generate a response is ProduceRequest w/ acks=0. kafka-python tracks this correctly, and in any event it should not be implicated by a consumer heartbeating / committing.
(2) I can't find any codepath that would cause this. The CorrelationID check happens in BrokerConnection.recv -> BrokerConnection._recv -> KafkaProtocol.receive_bytes -> KafkaProtocol._process_response . It is true that this stack itself is not locked / synchronized, but it is only called by (1) KafkaClient.poll, which is locked, (2) during SASL authentication, which only happens within connect(), and that is only called by KafkaClient._maybe_connect, which is locked, and (3) during check_version(), which is only called during KafkaClient.__init__ .
(1) This seems the most likely. On first glance, all of the send code paths are locked. correlation_id = self._protocol.send_request(request) is always locked, so we should have a consistent view of correlation_id => request, and inserts onto the ifr queue should always be in the correct order. Similarly, send_pending_requests is always locked, but here the lock is via KafkaClient. So we've guaranteed that no two threads will call send_pending_requests concurrently. But so I think the issue is that these locks are different, meaning we do not end up synchronizing between protocol.send_request and protocol.send_bytes. And because of that, it seems like we may see something like this:

Thread A: protocol.send_bytes -> reads data from bytes_to_send buffer.
Thread B: protocol.send_request -> appends data to bytes_to_send buffer.
Thread A: protocol.send_bytes -> resets bytes_to_send buffer = [], erasing Thread B's data
Thread B: protocol.send_request -> appends correlation id to in-flight-requests queue

Now when next request that comes along, with the next correlation_id, is sent to the broker, the broker responds, we decode it and compare to the in-flight-requests queue... we see a mismatch. where is the correlation-id response for Thread B ?

This matches your logs and I think would be fixed with a simple additional lock in kafka/conn.py send_pending_requests():

-        data = self._protocol.send_bytes()
+        with self._lock:
+            data = self._protocol.send_bytes()

If you're able to hot-patch, can you give that a try?

@isamaru
Copy link
Contributor Author

isamaru commented Mar 21, 2019

Thanks a lot for looking into it! This actually makes a lot of sense: errors I see are more consistent with dropping certain messages to be sent, rather then reordering. There was always a message skipped, never a message "ahead of time". It also explains connection timeouts I see that I blamed on the network.

I will try the patch immediately and let you know.

(I have clusters of 1.0.1 and 2.1, no auth)

@isamaru
Copy link
Contributor Author

isamaru commented Mar 21, 2019

Deployed for over 4 hours and no problems so far. 1.4.5 had several failures for a comparable time period.
I believe this is it! 👍 Thank you very much, @dpkp
I'll post here if there is a problem.

FYI I'm reproducing the other race condition described in #1744 even with this, I'm preparing a separate fix.

isamaru added a commit to exponea/kafka-python that referenced this issue Mar 21, 2019
…ion_in_protocol.send_bytes

kafka-python dpkp#1748 Fix race condition in protocol.send_bytes
@jeffwidman
Copy link
Contributor

jeffwidman commented Mar 21, 2019

This is a serious error since it can result in data loss during a producer.send() call. (Assuming I interpret the fix in #1752 correctly.)

I checked our production logs, and I do see CorrelationIdError showing up at a very low volume in 1.4.4, then spiking after we upgraded a bunch of machines to 1.4.5... I assume because of #1729 but I may be wrong. Unfortunate that I didn't encounter this when I was initially testing the release candidate.

@dpkp once #1744 is fixed, I think it'd be a good idea to push another release.

@dpkp
Copy link
Owner

dpkp commented Mar 22, 2019

This one is definitely important to fix quickly. But I don't believe it would cause data loss unless producer is not checking future success/failure or not using retries. CorrelationIdError will trigger all in-flight-requests to fail, including, in my example above, Thread B's request/response future, which does not get lost.

#1744 is a little more pernicious, in my opinion, because it causes a future to get lost and block forever.

But anyways, agree that we should push a new release with these fixes asap.

@jeffwidman
Copy link
Contributor

But I don't believe it would cause data loss unless producer is not checking future success/failure or not using retries.

Ah, right, I forgot about that. Yeah, as long as acks > 0, it should be fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants