Skip to content
  • There are no suggestions because the search field is empty.

Real-time Data Ingestion

Kafka Native & HTTP Proxy Implementation for Future Anthem Platform

Realtime data can be sent either via: 

  • Kafka Native:  
    Using Kafka Producers to a Future Anthem AWS MSK cluster 
  • Http Proxy: 
    Using any http library to send POST requests to a Future Anthem REST proxy. 

 

DEV Environment 

Client ID 
<#CLIENT#> 

 

API URI 

https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3 

API Key 
<#API_KEY#> 

 

topics 

ing-staging-<#TOPIC#> 

Kafka User Password 
<#MSK_PWD#> 

 

 

Kafka Native 

Access to the Kafka brokers requires IP whitelisting

1. Share source IP with Future Anthem for whitelisting. 
2. Request bootstrap server endpoint using the API:
curl -s --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters' \ 

--header 'x-api-key: <#API_KEY#>' | jq -r '.data[0].brokers.related'

3. Use the received API endpoint to get the Kafka bootstrap server URIs 

curl -s --location '<URI FROM PREVIOUS REQUEST>' \ 
--header 'x-api-key: <#API_KEY#>' | jq -r '.data[] | "\(.host):\(.port)"' 

 

Producer Configuration: 

bootstrap.server=<AS RECEIVED FROM API ENDPOINT in 2.> 
security.protocol=SASL_SSL 
sasl.mechanism=SCRAM-SHA-512 
sasl.jaas.config= org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="<#MSK_PWD#>"; 

Kafka Topics must be created by Future Anthem in advance of producing data. 

 

Http API 

Request Cluster ID needed for API calls 
curl -s --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters' \ 
 --header 'x-api-key: <#API_KEY#>' | jq -r '.data[0].cluster_id ' 

 

Send single events: 

Format can be STRING or JSON payloads 

curl --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters/<#CLUSTERID#>/topics/ing-staging-<#TOPIC#>/records' \
--header 'Content-Type: application/json' \
--header "x-api-key: <#API_KEY#>" \
--data '{
    "value": {
        "type": "STRING",
        "data": "{\"playerId\": \"123\", \"value\": \"test event 1\"}"
    }
}'
curl --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters/<#CLUSTERID#>/topics/ing-staging-<#TOPIC#> /records' \ 

--header 'Content-Type: application/json' \ 

--header "x-api-key: <#API_KEY#>" \ 

--data '{ 

    "value": { 

        "type": "JSON", 

        "data": {"playerId": "123", "value": "test event 1"} 

    } 



'

 

Streaming events: 

The REST API can be used to stream events by setting “Transfer-Encoding: chunked” header. For as long as the connection is kept open, the server will keep accepting records.

For each record sent to the server, the server will asynchronously send back a delivery report, in the same order. Records are streamed to and from the server as Concatenated JSON.

Errors are reported per record. The HTTP status code will be HTTP 200 OK as long as the connection is successfully established. 

curl --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters/<#CLUSTERID#>/topics/ing-staging-<#TOPIC#> /records' \ 

--header 'Content-Type: application/json' \ 

--header "x-api-key: <#API_KEY#>" \ 

--data '{ 

    "value": { 

        "type": "JSON", 

        "data": {} 

    } 





    "value": { 

        "type": "JSON", 

        "data": {} 

    } 





 

Send Batches of events 
curl --location 'https://producer.msk-rest-proxy.dev.<#CLIENT#>.future-anthem-lz-pr.com/v3/clusters/<#CLUSTERID#>/topics/ing-staging-<#TOPIC#>/records:batch' \ 

--header 'Content-Type: application/json' \ 

--header "x-api-key: <#API_KEY#>" \ 

--data '{ 

    "entries": [ 

        { 

            "id": "1", 

            "value": { 

                "type": "JSON", 

                "data": {} 

            } 

        }, 

        { 

            "id": "2", 

            "value": { 

                "type": "JSON", 

                "data": {} 

            } 

        } 

    ] 

}'

 

The provided ids are used in the response as the correlation id to get the status of the individual event’s ingestion as part of the API response. The IDs need to be of type string 

An Example Response Schema: 

{ 

  "successes": [ 

    { 

      "id": "1", 

      "cluster_id": "", 

      "topic_name": "", 

      "partition_id": 0, 

      "offset": 5, 

      "timestamp": "2023-03-09T14:07:23.592Z", 

      "value": { 

        "type": "JSON", 

        "size": 7 

      } 

    } 

  ], 

  "failures": [ 

    { 

      "id": "2", 

      "error_code": 400, 

      "message": "Bad Request: …" 

    } 

  ] 

}

 

Full Documentation 

Further details on the REST proxy can be retrieved from confluent. The version used by Future Anthem is v3 but the admin endpoints are disabled for the producer user and no changes to the cluster, topics and consumer groups can be made. Any changes and setup of topics needs to be requested from Future Anthem. 

https://docs.confluent.io/platform/current/kafka-rest/index.html  

https://github.com/confluentinc/kafka-rest 

 

Production Environment 

Client ID 
<#CLIENT#> 

 

API URI 

https://producer.msk-rest-proxy.<#CLIENT#>.future-anthem-lz-pr.com/v3 

API Key 
<#API_KEY#> 

 

 

topics 

ing-production-<#TOPIC#> 

Kafka User Password 
<#MSK_PWD#> 

 

 

Notes 

The security features and standards for a publicly accessible Amazon MSK (Managed Streaming for Apache Kafka) cluster: [1]  

Network Security: 
  • Amazon VPC to isolate the MSK cluster in a private network. 
  • Security groups to control inbound and outbound traffic to the cluster. 
Encryption: 
  • Encryption in transit using TLS for communication between clients and brokers. 
  • Encryption at rest using AWS Key Management Service (KMS) for data stored on the brokers. 
Authentication and Authorization: 
  • Access control method: SASL/SCRAM for username/password authentication  
  • Apache Kafka Access Control Lists (ACLs) for fine-grained authorization on the data plane. 
Monitoring and Logging: 
  • AWS CloudTrail to log API calls and user activities. 
  • Broker logs to Amazon CloudWatch Logs for analysis and auditing. 
Compliance: 
  • Amazon MSK is compliant with various industry standards, including SOC 1, SOC 2, and HIPAA. Refer to AWS documentation for the most up-to-date compliance information. 
Client Security: 
  • TLS 1.2 or later (TLS 1.3 recommended). [3]  
  • Use cipher suites with perfect forward secrecy (PFS) such as ECDHE. 
Access Management: 
  • Principle of least privilege for Producer role for clients.  
  • IAM roles and policies to control access to MSK APIs and resources within AWS account 
Regular Updates: 
  • MSK cluster updated with the latest security patches and version upgrades. 

 

Sources 

 

How do I get support or make a feature suggestion?

Visit our 'here to help' form and we will respond accordingly.