Sending Events via Kafka Producer API

 

To send events via the Kafka Producer API, you would need to do following config:

bootstrap.servers={clientEndpointId}-fane01.servicebus.windows.net:9093

security.protocol=SASL_SSL 

sasl.mechanism=PLAIN 

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{clientEndpointId}-fane01.servicebus.windows.net/;SharedAccessKeyName=upload;SharedAccessKey={ClientSharedAccessKey}";

The shared access key can be retrieved from an API endpoint and details are provided during client onboarding.

  • Kafka records can have any string key or null
  • Kafka record values need to be in JSON string format
  • Producers should send uncompressed records

The following code example uses Python to send events using a Kafka Producer:

from kafka import KafkaProducer

import json




def on_send_success(record_metadata):

    print(f"SUCCESS: {record_metadata.topic}:{record_metadata.partition}:{record_metadata.offset}")




def on_send_error(ex):

    print("ERROR: ", ex)




if __name__ == "__main__":




    producer = KafkaProducer(

        bootstrap_servers="casinomagic-fane01.servicebus.windows.net:9093",

        security_protocol="SASL_SSL",

        sasl_mechanism="PLAIN",

        sasl_plain_username="$ConnectionString",

        sasl_plain_password="Endpoint=sb://casinomagic-fane01.servicebus.windows.net/;SharedAccessKeyName=upload;SharedAccessKey=FooBar1234="

    )




    producer.send(

        "test",

        str.encode(

            json.dumps({

                "client": "kafka"

            })

        )

    ).add_callback(on_send_success).add_errback(on_send_error)




    producer.flush()

Back to Data Integration