Kafka Delivery Method

The Kafka Delivery method enables a STEP platform integrated with Apache Kafka, which is an open-source distributed event-streaming data platform, to use a Kafka queue. For more information about Apache Kafka, search the web.

This delivery method is only available in OIEPs.

In an OIEP, the delivery method is displayed on the Configuration tab of the editor in the Delivery Method flipper.

Prerequisites

  1. Before setting up a Kafka Delivery, read the Considerations for Setting up Kafka Receiver or Delivery (here) topic in this documentation.
  1. Prior to configuration, clicking the Server dropdown parameter displays the required server name. Provide a selection for the dropdown parameter via the sharedconfig.properties file on the STEP application server using the case-sensitive Kafka.Server configuration property. If connecting to a cluster, use a comma-separated list.

    The following example shows two Kafka server configurations where the first server is a cluster:

    Kafka.Server.1=mybroker1a:9094,mybroker1b:9094
    Kafka.Server.2=mybroker2a:9094
  2. Prior to configuration, clicking the Topic dropdown parameter displays the available topics. Provide a selection for the dropdown parameter via the sharedconfig.properties file on the STEP application server using the case-sensitive Kafka.Topic configuration property. For example:

    Kafka.Topic.1 = sample4
  3. Prior to configuration, clicking the KeystoreLocation dropdown parameter displays the required property name. Provide a selection for the dropdown parameter via the sharedconfig.properties file on the STEP application server using the case-sensitive Kafka.SSLKeyStoreLocation property. For example:

    Kafka.SSLKeyStoreLocation.1=[/[path]/key_store.jks].
  4. Prior to configuration, clicking the Truststore Location dropdown parameter displays the required property name. Provide a selection for the dropdown parameter via the sharedconfig.properties file on the STEP application server using the case-sensitive Kafka.SSLTrustStoreLocation property. For example:

     Kafka.SSLTrustStoreLocation.1=[/[path]/trust_store.jks].
  5. SASL / OAuth 2.0 can be configured for STEP with Kafka using the ExtraDriverOptions property to authenticate securely via bearer tokens. A business function allows integration with the Kafka Delivery method by handling the OAuth authentication and returning a HashMap containing the Bearer Token and other details.

    Configure the following case-sensitive sharedconfig.properties entries within the value of the ExtraDriverOptions as shown in the example below, which can be copied:

    • Kafka.Delivery.[OIEP_ID].ExtraDriverOptions - indicate the OIEP ID.

    • stibo.authentication.function - set the JavaScript business function ID. Refer to the Business Function Example for Generating an OAuth Bearer Token section below.

    • security.protocol - indicate either SASL_PLAINTEXT (while testing) or SASL_SSL (in production) for the SSL transport layer.

    For example, in the property definition below, ‘MyKafka’ is the ID of the OIEP, ‘HydraAuthFunction’ is the ID of the business function, and ‘SASL_PLAINTEXT’ is the testing value for the security protocol and the rest of the property values are configured as displayed:

    Copy
    Kafka.Delivery.MyKafka.ExtraDriverOptions=sasl.mechanism=OAUTHBEARER,sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;,sasl.login.callback.handler.class=com.stibo.integration.kafka.domain.impl.auth.OAuthFunctionCallbackHandler,stibo.authentication.function=HydraAuthFunction,security.protocol=SASL_PLAINTEXT    

Configuration

The Kafka Delivery option is available when configuring an outbound integration endpoint (OIEP).

On the OIEP select 'Edit Delivery,' and then provide the following information:

  1. For Delivery, choose Kafka Delivery.

  2. For the Server, select the server(s) where the Kafka broker instances used by the endpoint are running.

  3. In Topic, select the topic used by this endpoint.

  4. For Max Events To Read Per Poll, enter the maximum number of events to read out of Kafka per invocation.

    Note: Setting this too high can require allocating additional memory in the system.

  5. In Decompress Message Content, optionally select a decompression option:

    • None - message content will remain compressed.
    • LZ4 - lossless data compression algorithm. Search the web for more information.
  6. In Kafka Message Key Template, enter the desired key template. Viable templates include: ${endpointId}, ${nodeType}, and ${nodeId}.

  7. In Keystore Location, if SSL encryption is required, select an SSL encrypted connection to Kafka. Otherwise, leave this parameter blank.

  8. In Keystore Password, enter the password for keystore if required.

  9. In Keystore PrivateKey Password, enter the password of the private key in the keystore file, if needed.

  10. In Truststore Location, if SSL encryption is required, select an SSL encrypted connection to the Kafka. Otherwise, leave this parameter blank.

  11. In Truststore Password, enter the password for truststore if required.

  12. Click the Next button to continue with the IIEP - Configure Endpoint step (here), or the Finish button to close the wizard.

Business Function Example for Generating an OAuth Bearer Token

The following code example provides a framework for developing a business function to be used with Kafka Delivery and Receiver properties when SASL / OAuth 2.0 Bearer Token authentication is configured.

Copy
var scope = "producer.kafka";
var url = new java.net.URL("http://localhost:4444/oauth2/token");
var http = url.openConnection();
var startTime = java.lang.System.currentTimeMillis();
http.setRequestMethod("POST");
http.setDoOutput(true);
http.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
http.setRequestProperty("Authorization", "Basic cHJvZHVjZXIta2Fma2E6cHJvZHVjZXIta2Fma2E=");
http.connect();
var os = http.getOutputStream()
try {
os.write(new java.lang.String("grant_type=client_credentials&scope=" + scope).getBytes("UTF-8"));
} finally {
os.close();
}
var input= http.getInputStream();
try {
var reader = new java.io.BufferedReader(new java.io.InputStreamReader(input));
var string ="";
while (reader.ready()){
string=string+reader.readLine();
}
logger.info(string);
var json = JSON.parse(string);
// build token map, all values must be strings.
var map = new java.util.HashMap();
map.put("access_token", json.access_token);  // access token
map.put("scope", json.scope); // space separateled list of scopes
map.put("expires_in", "" + json.expires_in); // expires_in is in seconds since start time.
map.put("start_time", "" + startTime);   // starttime is in ms since 1/1 1970
map.put("principal_name","producer.kafka");  // May or may not be required
return map;
} finally {
reader.close();
}