Kafka Delivery Method

The Kafka Delivery method enables a STEP platform integrated with Apache Kafka, which is an open-source distributed event-streaming data platform, to use a Kafka queue. For more information about Apache Kafka, search the web.

This delivery method is only available in OIEPs.

In an OIEP, the delivery method is displayed on the Configuration tab of the editor in the Delivery Method section.

Prerequisites

  1. Before setting up a Kafka Delivery, read the Considerations for Setting Up Kafka Delivery topic.

  1. Prior to configuration, clicking the Server dropdown parameter displays the required server name. Provide a selection for the dropdown parameter via the sharedconfig.properties file on the STEP application server using the case-sensitive Kafka.Server configuration property. If connecting to a cluster, use a comma-separated list.

    The following example shows two Kafka server configurations where the first server is a cluster:

    Kafka.Server.1=mybroker1a:9094,mybroker1b:9094
    Kafka.Server.2=mybroker2a:9094
  2. Prior to configuration, clicking the Topic dropdown parameter displays the available topics. Provide a selection for the dropdown parameter via the sharedconfig.properties file on the STEP application server using the case-sensitive Kafka.Topic configuration property. For example:

    Kafka.Topic.1 = sample4
  3. The Message Metadata Source allows users to select either a template (defined in the steps below) or a business function for the metadata. Using a business function allows you to define the Message Key and Headers metadata for:

    • formats where messages are converted from STEPXML to another text file export format

    Prior to the delivery method configuration, create a business function that provides metadata for the message key and headers using string processing, or parsing if the messages are always smaller than 1 MB because only the first 1 MB is available to the business function. The function must have 'Input Parameter' of String and 'Return Type' as Map<String, String> where map.get('MessageKey') is used by the delivery method to fetch the message key from the map, and other data is used as headers. Only business functions that have the expected input parameter and return type are available to select in the ‘Select Business Function’ window.

    The following is a very simple example of a business function used for the Message Metadata Source:

    The example code is:

    Copy
    //Create map
    var map = new java.util.HashMap();
    //... parse a message ...
    var myKey = 'my found key value, found from parsing the message'
    map.put("MessageKey", myKey);
    //Anything other than "MessageKey" will be added as a header
    map.put("HeaderA", 'HeaderA Value');
    //the delivery plugin will use map.get("MessageKey") to retrieve the key for the message
    return map;    
  4. Prior to configuration, clicking the KeystoreLocation dropdown parameter displays the required property name. Provide a selection for the dropdown parameter via the sharedconfig.properties file on the STEP application server using the case-sensitive Kafka.SSLKeyStoreLocation property. For example:

    Kafka.SSLKeyStoreLocation.1=[/[path]/key_store.jks]
  5. Prior to configuration, clicking the Truststore Location dropdown parameter displays the required property name. Provide a selection for the dropdown parameter via the sharedconfig.properties file on the STEP application server using the case-sensitive Kafka.SSLTrustStoreLocation property. For example:

    Kafka.SSLTrustStoreLocation.1=[/[path]/trust_store.jks]
  6. SASL / OAuth 2.0 can be configured for STEP with Kafka using the ExtraDriverOptions property to authenticate securely via bearer tokens. A business function allows integration with the Kafka Delivery method by handling the OAuth authentication and returning a HashMap containing the Bearer Token and other details.

    Configure the following case-sensitive sharedconfig.properties entries within the value of the ExtraDriverOptions as shown in the example below, which can be copied:

    • Kafka.Delivery.[OIEP_ID].ExtraDriverOptions - indicate the OIEP ID.

    • stibo.authentication.function - set the JavaScript business function ID. Refer to the Business Function Example for Generating an OAuth Bearer Token section below.

    • security.protocol - indicate either SASL_PLAINTEXT (while testing) or SASL_SSL (in production) for the SSL transport layer.

    For example, in the property definition below, ‘MyKafka’ is the ID of the OIEP, ‘HydraAuthFunction’ is the ID of the business function, and ‘SASL_PLAINTEXT’ is the testing value for the security protocol and the rest of the property values are configured as displayed:

    Copy
    Kafka.Delivery.MyKafka.ExtraDriverOptions=sasl.mechanism=OAUTHBEARER,sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;,sasl.login.callback.handler.class=com.stibo.integration.kafka.domain.impl.auth.OAuthFunctionCallbackHandler,stibo.authentication.function=HydraAuthFunction,security.protocol=SASL_PLAINTEXT    

Configuration

The Kafka Delivery option is available when configuring an outbound integration endpoint (OIEP).

On the OIEP editor, click the 'Edit Delivery' link, and then provide the following information:

  1. For Select Delivery Method, choose Kafka Delivery.

  2. For Server, select the server(s) where the Kafka broker instances used by the endpoint are running.

  3. In Topic, select the topic used by this endpoint.

  4. In Compress Message Content, select an option:

    • None - message content is not compressed.

    • LZ4 - uses lossless data compression algorithm. Search the web for more information.

  5. The Message Metadata Source allows users to select:

    • Template - the desired key template using these options: ${endpointId}, ${nodeType}, and ${nodeId}.

    • Function - a business function that defines the Message Key and Headers metadata as defined in the Prerequisites section above. As shown in the dialog above, the outbound message content (string) passed to the delivery method is used by the selected business function is truncated at 1MB. Due to the size limitation, string processing is recommended over parsing so one method is used for identifying keys and headers vs. different approaches for complete vs. incomplete message strings.

    Important: When a function is selected and a template is defined, the function overrides the template. When neither a function nor a template is defined, the message key is omitted.

  6. For Use SSL checkbox, check to enable the Keystore and Truststore options.

    • In Keystore Location, if SSL encryption is required, select an SSL encrypted connection to Kafka. Otherwise, leave this parameter blank.

    • In Keystore Password, enter the password for keystore if required.

    • In Keystore PrivateKey Password, enter the password of the private key in the keystore file, if needed.

    • In Truststore Location, if SSL encryption is required, select an SSL encrypted connection to the Kafka. Otherwise, leave this parameter blank.

    • In Truststore Password, enter the password for truststore if required.

  7. Click the Next button to continue with the IIEP - Configure Endpoint step, or the Finish button to close the wizard.

Business Function Example for Generating an OAuth Bearer Token

The following code example provides a framework for developing a business function to be used with Kafka Delivery and Receiver properties when SASL / OAuth 2.0 Bearer Token authentication is configured.

Copy
var scope = "producer.kafka";
var url = new java.net.URL("http://localhost:4444/oauth2/token");
var http = url.openConnection();
var startTime = java.lang.System.currentTimeMillis();
http.setRequestMethod("POST");
http.setDoOutput(true);
http.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
http.setRequestProperty("Authorization", "Basic cHJvZHVjZXIta2Fma2E6cHJvZHVjZXIta2Fma2E=");
http.connect();
var os = http.getOutputStream()
try {
os.write(new java.lang.String("grant_type=client_credentials&scope=" + scope).getBytes("UTF-8"));
} finally {
os.close();
}
var input= http.getInputStream();
try {
var reader = new java.io.BufferedReader(new java.io.InputStreamReader(input));
var string ="";
while (reader.ready()){
string=string+reader.readLine();
}
logger.info(string);
var json = JSON.parse(string);
// build token map, all values must be strings.
var map = new java.util.HashMap();
map.put("access_token", json.access_token);  // access token
map.put("scope", json.scope); // space separateled list of scopes
map.put("expires_in", "" + json.expires_in); // expires_in is in seconds since start time.
map.put("start_time", "" + startTime);   // starttime is in ms since 1/1 1970
map.put("principal_name","producer.kafka");  // May or may not be required
return map;
} finally {
reader.close();
}