Skip to content
  • There are no suggestions because the search field is empty.

Sending Events via Kafka Producer API

 

To send events via the Kafka Producer API, you would need to do following config:

bootstrap.servers={clientEndpointId}-fane01.servicebus.windows.net:9093

security.protocol=SASL_SSL 

sasl.mechanism=PLAIN 

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{clientEndpointId}-fane01.servicebus.windows.net/;SharedAccessKeyName=upload;SharedAccessKey={ClientSharedAccessKey}";

The shared access key can be retrieved from an API endpoint and details are provided during client onboarding.

  • Kafka records can have any string key or null
  • Kafka record values need to be in JSON string format
  • Producers should send uncompressed records

The following code example uses Python to send events using a Kafka Producer:

from kafka import KafkaProducer

import json




def on_send_success(record_metadata):

    print(f"SUCCESS: {record_metadata.topic}:{record_metadata.partition}:{record_metadata.offset}")




def on_send_error(ex):

    print("ERROR: ", ex)




if __name__ == "__main__":




    producer = KafkaProducer(

        bootstrap_servers="casinomagic-fane01.servicebus.windows.net:9093",

        security_protocol="SASL_SSL",

        sasl_mechanism="PLAIN",

        sasl_plain_username="$ConnectionString",

        sasl_plain_password="Endpoint=sb://casinomagic-fane01.servicebus.windows.net/;SharedAccessKeyName=upload;SharedAccessKey=FooBar1234="

    )




    producer.send(

        "test",

        str.encode(

            json.dumps({

                "client": "kafka"

            })

        )

    ).add_callback(on_send_success).add_errback(on_send_error)




    producer.flush()

Back to Data Integration