Real-time Data Ingestion
Kafka Native & HTTP Proxy Implementation for Future Anthem Platform
Realtime data can be sent either via:
- Kafka Native:
Using Kafka Producers to a Future Anthem AWS MSK cluster
- Http Proxy:
Using any http library to send POST requests to a Future Anthem REST proxy.
DEV Environment
Client ID |
|
API URI |
https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3 |
API Key |
|
topics |
ing-staging-<#TOPIC#> |
Kafka User Password |
|
Kafka Native
Access to the Kafka brokers requires IP whitelisting
1. Share source IP with Future Anthem for whitelisting.2. Request bootstrap server endpoint using the API:
curl -s --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters' \
--header 'x-api-key: <#API_KEY#>' | jq -r '.data[0].brokers.related'
3. Use the received API endpoint to get the Kafka bootstrap server URIs
curl -s --location '<URI FROM PREVIOUS REQUEST>' \
--header 'x-api-key: <#API_KEY#>' | jq -r '.data[] | "\(.host):\(.port)"'
Producer Configuration:
bootstrap.server=<AS RECEIVED FROM API ENDPOINT in 2.>
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config= org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="<#MSK_PWD#>";
Kafka Topics must be created by Future Anthem in advance of producing data.
Http API
Request Cluster ID needed for API calls
curl -s --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters' \
--header 'x-api-key: <#API_KEY#>' | jq -r '.data[0].cluster_id '
Send single events:
Format can be STRING or JSON payloads
curl --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters/<#CLUSTERID#>/topics/ing-staging-<#TOPIC#>/records' \
--header 'Content-Type: application/json' \
--header "x-api-key: <#API_KEY#>" \
--data '{
"value": {
"type": "STRING",
"data": "{\"playerId\": \"123\", \"value\": \"test event 1\"}"
}
}'
curl --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters/<#CLUSTERID#>/topics/ing-staging-<#TOPIC#> /records' \
--header 'Content-Type: application/json' \
--header "x-api-key: <#API_KEY#>" \
--data '{
"value": {
"type": "JSON",
"data": {"playerId": "123", "value": "test event 1"}
}
}
'
Streaming events:
The REST API can be used to stream events by setting “Transfer-Encoding: chunked” header. For as long as the connection is kept open, the server will keep accepting records.
For each record sent to the server, the server will asynchronously send back a delivery report, in the same order. Records are streamed to and from the server as Concatenated JSON.
Errors are reported per record. The HTTP status code will be HTTP 200 OK as long as the connection is successfully established.
curl --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters/<#CLUSTERID#>/topics/ing-staging-<#TOPIC#> /records' \
--header 'Content-Type: application/json' \
--header "x-api-key: <#API_KEY#>" \
--data '{
"value": {
"type": "JSON",
"data": {}
}
}
{
"value": {
"type": "JSON",
"data": {}
}
}
'
Send Batches of events
curl --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters/<#CLUSTERID#>/topics/ing-staging-<#TOPIC#>/records:batch' \
--header 'Content-Type: application/json' \
--header "x-api-key: <#API_KEY#>" \
--data '{
"entries": [
{
"id": "1",
"value": {
"type": "JSON",
"data": {}
}
},
{
"id": "2",
"value": {
"type": "JSON",
"data": {}
}
}
]
}'
The provided ids are used in the response as the correlation id to get the status of the individual event’s ingestion as part of the API response. The IDs need to be of type string
An Example Response Schema:
{
"successes": [
{
"id": "1",
"cluster_id": "",
"topic_name": "",
"partition_id": 0,
"offset": 5,
"timestamp": "2023-03-09T14:07:23.592Z",
"value": {
"type": "JSON",
"size": 7
}
}
],
"failures": [
{
"id": "2",
"error_code": 400,
"message": "Bad Request: …"
}
]
}
Full Documentation
Further details on the REST proxy can be retrieved from confluent. The version used by Future Anthem is v3 but the admin endpoints are disabled for the producer user and no changes to the cluster, topics and consumer groups can be made. Any changes and setup of topics needs to be requested from Future Anthem.
https://docs.confluent.io/platform/current/kafka-rest/index.html
https://github.com/confluentinc/kafka-rest
Production Environment
Client ID |
|
API URI |
https://producer.msk-rest-proxy.<#CLIENT#>.future-anthem-lz-pr.com/v3 |
API Key
|
|
topics |
ing-production-<#TOPIC#> |
Kafka User Password |
|
Notes
The security features and standards for a publicly accessible Amazon MSK (Managed Streaming for Apache Kafka) cluster: [1]
Network Security:
- Amazon VPC to isolate the MSK cluster in a private network.
- Security groups to control inbound and outbound traffic to the cluster.
Encryption:
- Encryption in transit using TLS for communication between clients and brokers.
- Encryption at rest using AWS Key Management Service (KMS) for data stored on the brokers.
Authentication and Authorization:
- Access control method: SASL/SCRAM for username/password authentication
- Apache Kafka Access Control Lists (ACLs) for fine-grained authorization on the data plane.
Monitoring and Logging:
- AWS CloudTrail to log API calls and user activities.
- Broker logs to Amazon CloudWatch Logs for analysis and auditing.
Compliance:
- Amazon MSK is compliant with various industry standards, including SOC 1, SOC 2, and HIPAA. Refer to AWS documentation for the most up-to-date compliance information.
Client Security:
- TLS 1.2 or later (TLS 1.3 recommended). [3]
- Use cipher suites with perfect forward secrecy (PFS) such as ECDHE.
Access Management:
- Principle of least privilege for Producer role for clients.
- IAM roles and policies to control access to MSK APIs and resources within AWS account
Regular Updates:
- MSK cluster updated with the latest security patches and version upgrades.
Sources
- [1] Security in Amazon MSK - Amazon Managed Streaming for Apache Kafka
- [2] Managed Apache Kafka Service Features – Amazon MSK Features – Amazon Web Services
- [3] Turn on public access to an MSK Provisioned cluster - Amazon Managed Streaming for Apache Kafka
How do I get support or make a feature suggestion?
Visit our 'here to help' form and we will respond accordingly.