Skip to content

Fix error propagation rule for Python's C API #2019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

Conversation

k-raina
Copy link
Member

@k-raina k-raina commented Aug 12, 2025

What

This PR fixes issue : #865

Couple of Python C voilation rule we want to understand:

The Python C API Exception Rule, When a C function returns to Python:
1. If the function returns NULL (indicating an error), there MUST be a Python exception set
2. If the function returns a non-NULL value (success), there MUST NOT be a Python exception set
Violating this rule causes SystemError.
 C function should own the exception it sets when returning NULL.

Previous Workflow (Broken)

  1. Application calls consumer.consume() → Python → C extension (Consumer_consume) → librdkafka
  2. librdkafka triggers callback → error_cb() creates new Python execution context
  3. Python callback raises exception → Exception set in Python global state within callback context
  4. Return to librdkafka → librdkafka doesn't understand Python exceptions, continues normally
  5. librdkafka returns to Consumer_consume() → Exception still in global state from different context ❌
  6. Context mismatch detected → Consumer_consume() must return to Python but doesn't "own" this exception
  7. User gets SystemError → Python detects inconsistent exception ownership between contexts

New Workflow (Fixed)

  1. Application calls consumer.consume() → Python → C extension (Consumer_consume) → librdkafka
  2. librdkafka triggers callback → error_cb() creates new Python execution context, callback raises exception
  3. Exception context transfer → PyErr_Fetch() extracts exception from callback context + clears global state ✅
  4. librdkafka continues cleanly → No exception in global state, librdkafka completes normally
  5. CallState_end() restores context → PyErr_Restore() transfers exception ownership to Consumer_consume() context
  6. Consumer_consume() returns to Python → Now properly "owns" the exception, returns NULL with exception set
  7. User gets single exception → Only RuntimeError, proper exception ownership maintained

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required

References

https://docs.python.org/3/c-api/exceptions.html

As a general principle, a function that calls another function to perform some task should check whether the called function raised an exception, and if so, pass the exception state on to its caller.

When a function must fail, it is important to set an exception condition and return an error value (usually a NULL pointer).

Test & Review

#!/usr/bin/env python3
"""
Minimal reproducer for Issue #865: SystemError when error_cb raises

This script is based on the exact reproducer from the GitHub issue.
It will try to connect to a non-existent broker, which will trigger
the error callback, which will raise an exception, which will cause
the SystemError.
"""

import sys
import os

# Use our locally built version with the fix
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))

import confluent_kafka


def error_cb(error):
    """Error callback that raises an exception - this causes the SystemError"""
    # print(f"Error callback triggered: {error}")
    raise confluent_kafka.KafkaException(error)


def demo():
    """Main demo function that reproduces the SystemError"""
    consumer = confluent_kafka.Consumer({
        "bootstrap.servers": "nonexistent:9092",  # This will fail to resolve
        "group.id": "demo",
        "error_cb": error_cb,
    })
    
    consumer.subscribe(["test"])
    
    # This will trigger the error callback and cause SystemError
    return consumer.consume(1, timeout=1)


if __name__ == "__main__":
    print("Reproducing SystemError from Issue #865")
    print("=" * 50)
    print()
    
    print("This will attempt to connect to 'nonexistent:9092' which will fail.")
    print("The error_cb will be triggered and will raise a KafkaException.")
    print("You should see TWO exceptions:")
    print("1. Your KafkaException (the intended one)")
    print("2. A SystemError (the bug)")
    print()
    
    try:
        result = demo()
        print(f"Unexpected success: {result}")
    except Exception as e:
        print(f"Caught exception: {type(e).__name__}: {e}")
        print()
        
        # Print the full traceback to show both exceptions
        import traceback
        print("Full traceback showing the double exception:")
        print("-" * 50)
        traceback.print_exc()

@Copilot Copilot AI review requested due to automatic review settings August 12, 2025 13:23
@k-raina k-raina requested review from MSeal and a team as code owners August 12, 2025 13:23
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a Python C API exception rule violation that was causing SystemError when callback functions raised exceptions. The fix properly transfers exception ownership between different Python execution contexts by storing and restoring exceptions during callback execution.

  • Implements proper exception transfer using PyErr_Fetch/PyErr_Restore to maintain exception ownership
  • Adds exception storage fields to CallState structure for context preservation
  • Updates all callback functions to capture exceptions before crashing the call state

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated no comments.

Show a summary per file
File Description
src/confluent_kafka/src/confluent_kafka.h Adds exception storage fields to CallState structure
src/confluent_kafka/src/confluent_kafka.c Implements exception transfer logic in callbacks and CallState functions
src/confluent_kafka/src/Producer.c Updates delivery callback to capture exceptions before crashing
src/confluent_kafka/src/Consumer.c Updates rebalance callback to capture exceptions before crashing
tests/test_Producer.py Adds test verifying no SystemError occurs when delivery callback raises exception
tests/test_Consumer.py Adds test verifying no SystemError occurs when error callback raises exception
Comments suppressed due to low confidence (2)

Copy link
Contributor

@MSeal MSeal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great description of the issue and clean fix!

@MSeal
Copy link
Contributor

MSeal commented Aug 12, 2025

Looks like some whitespace issue are making the linter fail

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

1 similar comment
@sonarqube-confluent
Copy link

Passed

Analysis Details

5 Issues

  • Bug 0 Bugs
  • Vulnerability 0 Vulnerabilities
  • Code Smell 5 Code Smells

Coverage and Duplications

  • Coverage No coverage information (66.10% Estimated after merge)
  • Duplications No duplication information (5.50% Estimated after merge)

Project ID: confluent-kafka-python

View in SonarQube

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants