To send events via the Kafka Producer API, you would need to do following config:
bootstrap.servers={clientEndpointId}-fane01.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{clientEndpointId}-fane01.servicebus.windows.net/;SharedAccessKeyName=upload;SharedAccessKey={ClientSharedAccessKey}";
The shared access key can be retrieved from an API endpoint and details are provided during client onboarding.
- Kafka records can have any string key or null
- Kafka record values need to be in JSON string format
- Producers should send uncompressed records
The following code example uses Python to send events using a Kafka Producer:
from kafka import KafkaProducer
import json
def on_send_success(record_metadata):
print(f"SUCCESS: {record_metadata.topic}:{record_metadata.partition}:{record_metadata.offset}")
def on_send_error(ex):
print("ERROR: ", ex)
if __name__ == "__main__":
producer = KafkaProducer(
bootstrap_servers="casinomagic-fane01.servicebus.windows.net:9093",
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username="$ConnectionString",
sasl_plain_password="Endpoint=sb://casinomagic-fane01.servicebus.windows.net/;SharedAccessKeyName=upload;SharedAccessKey=FooBar1234="
)
producer.send(
"test",
str.encode(
json.dumps({
"client": "kafka"
})
)
).add_callback(on_send_success).add_errback(on_send_error)
producer.flush()