Skip to content

Issue- Python kafka producer to send the data using the kerberos authentication. #1934

@narasimha-reddy-kr

Description

@narasimha-reddy-kr

Below is the sample program, we have everything working with Java and tested the principal and keytab able to produce the message but through kafka with below pattern noticing the

%7|1741244952.688|CONTROLLERID|rdkafka#producer-1| [thrd:main]: sasl_plaintext://{server:port}/bootstrap: ControllerId update -1 -> 1
Message delivery failed: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Broker: Topic authorization failed"}

import os
import logging
from confluent_kafka import Producer

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

os.environ['KRB5_CONFIG'] = './config/krb5.conf'
KAFKA_BROKER = "server:port"
TOPIC = "TEST"
KEYTAB_PATH = "./certs/my.keytab"
PRINCIPAL = "service@domain.COM"

conf = {
    'bootstrap.servers': KAFKA_BROKER,
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanisms': 'GSSAPI',
    'sasl.kerberos.service.name': 'kafka',
    'sasl.kerberos.principal': PRINCIPAL,
    'sasl.kerberos.keytab': KEYTAB_PATH,
    'debug': 'security,broker,protocol'
}



def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

def processKafka():

    print("started the processTwoTierKafka")
    producer = Producer(conf)
    print("Procducer", producer)
    message = 'Hello, Kafka validation with kerberos authentication!'
    try:
        producer.produce(TOPIC, value=message.encode('utf-8'), callback=delivery_report)
        producer.flush()
    except Exception as e:
        print(f'An error occurred: {e}')`

Here is the trace log.

> 
started the processKafka
%7|1741244944.818|SASL|rdkafka#producer-1| [thrd:app]: Selected provider Cyrus for SASL mechanism GSSAPI
%7|1741244944.819|SASLREFRESH|rdkafka#producer-1| [thrd:main]: Refreshing Kerberos ticket with command: kinit -R -t "./certs/my.keytab" -k username@domain.com || kinit -t "./certs/dev.keytab" -k username@domain.com
%7|1741244944.819|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1741244944.819|BROKER|rdkafka#producer-1| [thrd:app]: sasl_plaintext://{server:port}/bootstrap: Added new broker with NodeId -1
%7|1741244944.819|CONNECT|rdkafka#producer-1| [thrd:app]: sasl_plaintext://{server:port}/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1741244944.819|BRKMAIN|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Enter main broker thread
%7|1741244944.819|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v2.8.0 (0x20800ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x282)
%7|1741244944.819|CONNECT|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received CONNECT op
%7|1741244944.820|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state INIT -> TRY_CONNECT
Procducer <cimpl.Producer object at 0x106e7c5c0>
kinit: krb5_get_kdc_cred: KDC can't fulfill requested option
%7|1741244948.880|SASLREFRESH|rdkafka#producer-1| [thrd:main]: First kinit command finished: waking up broker threads
%7|1741244948.880|WAKEUP|rdkafka#producer-1| [thrd:main]: Wake-up sent to 1 broker thread in state >= INIT: Kerberos ticket refresh
%7|1741244948.880|SASLREFRESH|rdkafka#producer-1| [thrd:main]: Kerberos ticket refreshed in 4061ms
%7|1741244948.880|CONNECT|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: broker in state TRY_CONNECT connecting
%7|1741244948.880|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1741244948.880|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1741244948.880|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
%7|1741244948.963|CONNECT|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Connecting to ipv4#{ipaddress:port} (sasl_plaintext) with socket 9
%7|1741244948.996|CONNECT|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Connected to ipv4#{ipaddress:port}
%7|1741244948.996|CONNECTED|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Connected (#1)
%7|1741244948.996|FEATURE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1741244948.996|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1741244948.996|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent ApiVersionRequest (v3, 66 bytes @ 0, CorrId 1)
%7|1741244949.210|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received ApiVersionResponse (v3, 599 bytes, CorrId 1, rtt 213.79ms)
%7|1741244949.210|FEATURE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
%7|1741244949.210|AUTH|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Auth in state APIVERSION_QUERY (handshake supported)
%7|1741244949.210|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state APIVERSION_QUERY -> AUTH_HANDSHAKE
%7|1741244949.211|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent SaslHandshakeRequest (v1, 29 bytes @ 0, CorrId 2)
%7|1741244949.331|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SaslHandshakeResponse (v1, 14 bytes, CorrId 2, rtt 120.38ms)
%7|1741244949.331|SASLMECHS|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker supported SASL mechanisms: GSSAPI
%7|1741244949.331|AUTH|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Auth in state AUTH_HANDSHAKE (handshake supported)
%7|1741244949.331|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state AUTH_HANDSHAKE -> AUTH_REQ
%7|1741244949.331|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Initializing SASL client: service name kafka, hostname server, mechanisms GSSAPI, provider Cyrus
%7|1741244949.884|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1741244949.895|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 39ms: no cluster connection
%7|1741244950.850|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: My supported SASL mechanisms: SRP GSSAPI GSSAPI DIGEST-MD5 WEBDAV-DIGEST EXTERNAL SMB-NTLMv2 DHX MS-CHAPv2 NTLM CRAM-MD5 APOP PLAIN-CLIENTTOKEN ATOKEN OAUTHBEARER PLAIN LOGIN XOAUTH2 ANONYMOUS
%7|1741244950.897|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1741244950.897|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
%7|1741244951.899|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1741244951.899|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
%7|1741244952.187|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Send SASL Kafka frame to broker (3011 bytes)
%7|1741244952.187|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent SaslAuthenticateRequest (v1, 3036 bytes @ 0, CorrId 3)
%7|1741244952.434|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SaslAuthenticateResponse (v1, 124 bytes, CorrId 3, rtt 247.20ms)
%7|1741244952.434|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SASL frame from broker (108 bytes)
%7|1741244952.435|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: CB_CANON: flags 0x3, "myid@domain.com" @ "(null)": returning "username@domain.com"
%7|1741244952.435|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Send SASL Kafka frame to broker (0 bytes)
%7|1741244952.435|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent SaslAuthenticateRequest (v1, 25 bytes @ 0, CorrId 4)
%7|1741244952.568|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SaslAuthenticateResponse (v1, 48 bytes, CorrId 4, rtt 132.93ms)
%7|1741244952.568|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SASL frame from broker (32 bytes)
%7|1741244952.569|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Send SASL Kafka frame to broker (32 bytes)
%7|1741244952.569|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: GSSAPI authentication complete but awaiting final response from broker
%7|1741244952.569|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent SaslAuthenticateRequest (v1, 57 bytes @ 0, CorrId 5)
%7|1741244952.603|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SaslAuthenticateResponse (v1, 16 bytes, CorrId 5, rtt 34.15ms)
%7|1741244952.603|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SASL frame from broker (0 bytes)
%7|1741244952.603|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Authenticated as username@domain.com using GSSAPI (gssapiv2)
%7|1741244952.603|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state AUTH_REQ -> UP
%7|1741244952.604|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent MetadataRequest (v12, 65 bytes @ 0, CorrId 6)
%7|1741244952.687|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received MetadataResponse (v12, 115 bytes, CorrId 6, rtt 83.23ms)
%7|1741244952.688|BROKER|rdkafka#producer-1| [thrd:main]: sasl_plaintext://{server:port}/1: Added new broker with NodeId 1
%7|1741244952.688|BRKMAIN|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Enter main broker thread
%7|1741244952.688|CLUSTERID|rdkafka#producer-1| [thrd:main]: sasl_plaintext://{server:port}/bootstrap: ClusterId update "" -> "1rVMr9jnSkaob2dlVhMzhQ"
%7|1741244952.688|CONTROLLERID|rdkafka#producer-1| [thrd:main]: sasl_plaintext://{server:port}/bootstrap: ControllerId update -1 -> 1
Message delivery failed: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Broker: Topic authorization failed"}
%7|1741244952.689|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1741244952.702|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1741244952.702|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics
%7|1741244952.702|DESTROY|rdkafka#producer-1| [thrd:main]: Sending TERMINATE to sasl_plaintext://{server:port}/1
%7|1741244952.702|DESTROY|rdkafka#producer-1| [thrd:main]: Sending TERMINATE to sasl_plaintext://{server:port}/bootstrap
%7|1741244952.702|TERM|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1741244952.702|FAIL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Client is terminating (after 14ms in state INIT) (_DESTROY)
%7|1741244952.702|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Broker changed state INIT -> DOWN
%7|1741244952.704|BRKTERM|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1741244952.704|TERMINATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Handle is terminating in state DOWN: 1 refcnts (0x7f928380be40), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1741244952.704|FAIL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1741244952.702|TERM|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received TERMINATE op in state UP: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1741244952.703|TERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1741244952.705|FAIL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Client is terminating (after 101ms in state UP) (_DESTROY)
%7|1741244952.705|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state UP -> DOWN
%7|1741244952.705|BRKTERM|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1741244952.705|TERMINATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x7f928107a440), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1741244952.705|FAIL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1741244952.705|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Client is terminating (after 7886ms in state INIT) (_DESTROY)
%7|1741244952.706|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
%7|1741244952.706|BRKTERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1741244952.706|TERMINATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x7f927e87c240), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1741244952.706|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)

Metadata

Metadata

Assignees

No one assigned

    Labels

    component:producerIssues tied specifically to producer logic or code pathsstatus:needs-more-infoIssues that require more information to cleanup.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions