Kafka Streaming Receiver

The Kafka Streaming Receiver integrates STEP with Apache Kafka to read messages from a topic, with parallelization based on partitions and without the use of individual background processes per message. For more information about Apache Kafka, search the web.

Note: Kafka Streaming Receiver is in a ramp-up phase. To learn more about the ramp-up phase / status, refer to the License and Component Lifecycle in the License and Component Lifecycle.

Message streaming is designed for fast imports of small messages representing a single object, therefore the ‘maximum message size in bytes’ defined on a Kafka topic is limited to 1 MB (1048576 bytes).

Note: For the Streaming Receiver to validate the Topic, Error Topic, and Maximum Message size, which is done using the Kafka AdminClient, ACL’s (Access Control List) must be granted for read, describe, and describeConfigs for Topics and Consumer Groups.

The Kafka Streaming Receiver option is only available in an inbound integration endpoint (IIEP) and requires the use of the 'STEP Streaming Importer' processing engine. This means Kafka Streaming Receiver is not compatible with the 'STEP Match and Merge Importer' or other processing engines.

When using the Kafka Streaming Receiver, be aware of the following functionality differences compared to the Kafka Receiver described in the section below:

  • In the 'Kafka Streaming Receiver Configuration' section of the workbench editor and on the 'Choose Receiver' wizard step:

    • The 'Error Handling Strategy' parameter allows users to stop the IIEP when any error is encountered, ignore errors and continue processing, or send errors to the 'Error Topic' and continue processing. When 'Error Topic' is selected, errors are included in a JSON array within the STEPErrorReason header, along with any headers that were supplied on the original message, and the original message.

    • When changes are made to the configuration using the wizard, STEPXML imports, or change package installations, the IIEP is stopped because the active threads controlling streaming must be stopped and restarted to apply changes. Enable or Resume the IIEP to restart processing with the new configuration. For details, refer to the Handling Failed IIEP Background Processes topic.

  • In the 'Configuration' section of the workbench editor and on the 'Configure Endpoint' step of the wizard:

    • The 'Process Engine' / 'Processing Engine' selection must be 'STEP Streaming Importer'.

    • The 'Transactional Settings' parameter is not available. A combination of the error handling options on the receiver configuration and the Kafka partition configuration on the topic replaces the 'Transactional Settings'.

      To mimic the Transactional Settings 'Strict' mode, where the sequence of messages can be imported in order, configure the Kafka Topic with only one partition and set the 'Error Handling Strategy' parameter to ‘Stop’. This setup respects the order of message processing.

      To mimic the Transactional Settings ‘None’ mode, which allows for parallel imports when system resources are available, configure the Kafka Topic with up to 10 partitions and set the 'Error Handling Strategy' to either ‘Ignore’ or ‘Error Topic’. With this configuration, topics with more partitions will have priority since a streaming thread is opened for each partition. This set up allows faster processing when the message order is not important.

    • The 'Queue for Endpoint' parameter is only valid when using the legacy BGP execution mechanism (as described in the BGP Legacy Multiple Queues topic) and the queue for a streaming receiver is automatically changed to ‘StreamingQueue’ although the name can be changed if desired. Other imports use the 'InboundQueue' by default. Using a separate queue removes the controlling BGP from standard imports and independently manages the streaming threads and logging activity.

    • The 'Maximum Number of Waiting Processes', 'Maximum Number of Failed Processes', 'Maximum Age of Failed Processes', 'Maximum Number of Succeeded Processes', 'Maximum Age of Succeeded Processes', and 'Number of Messages per Background Process' parameters are not valid.

  • In the 'Configuration' section of the workbench editor and on the 'Configure Pre-processor' step of the wizard, valid standard options are 'No pre-processing' or 'Transformation by XSLT' or 'Transformation by Import Configuration'. Additionally, custom pre-processors created with Extension API can be used.

  • In the 'Configure Processing Engine' step of the wizard for 'Select Format', the recommended format is STEPXML, but other valid formats are BMECat, BMECat 2005, CSV, Fixed Width, Generic JSON, Generic XML, IDoc MATMAS 05. Binary formats, such as Excel, are not supported.

  • In the 'Configuration' section of the workbench editor and on the 'Configure Post-processor' step of the wizard, a post-processor is not valid.

  • In the 'Configuration' section of the workbench editor and on the 'Schedule Endpoint' step of the wizard, the schedule is not available, nor is there an 'Invoke' option on the right-click menu. Enable and disable the IIEP to start and stop the threads.

  • In the 'Configuration' section of the workbench editor and on the 'Error Handling & Reporting' step of the wizard, the 'Connection Error Handling' parameters are not valid since they are addressed automatically by the receiver. Refer to the Integration Endpoint Log section below for details.

  • Although the workbench editor does not include the Background Process Tab for a streaming importer, the import process can be monitored on the Integration Endpoint Log, the Statistics Tab, and the Error Logs Tab, as described in the sections below. Monitoring is also available via the KafkaStreamingReceiverStatusSensor as described in the Sensors for External Monitoring topic in the Administration Portal documentation.

Additional items of interest for the Kafka Streaming Receiver include:

  • When using REST, the Invoke and Statistics calls are not valid for a streaming receiver. If used, a 400 error is returned with an explanation message.

  • Objects that are consumed by the Kafka Streaming Receiver must be designed to support idempotent operations, where the desired outcome is achieved even if the import process is called multiple times, which on rare occasions can happen if Kafka re-balances while messages are read. To prevent duplicate object creation, avoid the use of autogenerated IDs for objects. Also, to avoid duplicating actions and/or avoiding race conditions, move asynchronous operations included in business actions to an event processor that follows import.

Prerequisites

  1. Before setting up Kafka Streaming Receiver, read the Considerations for Setting Up Kafka Streaming Receiver topic.

    Important: Prior to configuration, dropdown parameters that rely on a property are empty. Hovering over the dropdown or clicking a dropdown displays the required property name to configure. To display the value(s), configure the property for your system. Property configuration is based on the type of environment. For on-prem systems, add the case-sensitive text to the sharedconfig.properties file on the STEP application server. For Stibo Systems SaaS environments, log in to the Self-Service UI, select the environment, and edit the properties on the 'Configuration properties' tab.

  2. Configure the Bootstrap Server(s) dropdown parameter using the Kafka.Server configuration property. If connecting to a cluster, use a comma-separated list.

    The following example shows two Kafka server configurations where the first server is a cluster:

    Kafka.Server.1=mybroker1a:9092,mybroker1b:9092
    Kafka.Server.2=mybroker2a:9094
  3. Configure the Topic dropdown parameter using the Kafka.Topic configuration property. For example:

    Kafka.Topic.1=my-topic
  4. Optionally, configure this property when the 'Error Handling Strategy' parameter will be set to 'Error Topic'. The Error Topic dropdown parameter uses the Kafka.ErrorTopic configuration property. For example:

    Kafka.ErrorTopic.1=my-error-topic
  5. Optionally, configure the Keystore Location dropdown parameter using the Kafka.SSLKeyStoreLocation property. For example:

    Kafka.SSLKeyStoreLocation.1=[/[path]/key_store.jks].
  6. Optionally, configure the Truststore Location dropdown parameter using the Kafka.SSLTrustStoreLocation property. For example:

    Kafka.SSLTrustStoreLocation.1=[/[path]/trust_store.jks].
  7. Optionally, SASL / OAuth 2.0 can be configured for STEP with Kafka using the ExtraDriverOptions property to authenticate securely via bearer tokens. A business function allows integration with the Kafka Receiver by handling the OAuth authentication and returning a HashMap containing the Bearer Token and other details.

    Note: Depending on the configured Kafka authentication requirements, each IIEP using the Kafka Streaming Receiver may require the configuration property Kafka.StreamingReceiver.Dynamic.ExtraDriverOptions to be defined. 'Dynamic' must be replaced with a valid IIEP ID before the IIEP can connect to the Bootstrap Server(s) when it requires authentication.

    Configure the ExtraDriverOptions property as:

    • Kafka.Receiver.[IIEP_ID].ExtraDriverOptions - indicate the IIEP ID.

    • stibo.authentication.function - set the JavaScript business function ID. Refer to the Business Function Example for Generating an OAuth Bearer Token section in the Kafka Delivery Method topic.

    • security.protocol - indicate either SASL_PLAINTEXT (while testing) or SASL_SSL (in production) for the SSL transport layer.

    For example, in the property definition below, ‘MyInboundKafka’ as the ID of the IIEP, ‘HydraAuthFunction’ is the ID of the business function, and ‘SASL_PLAINTEXT’ is the testing value for the security protocol. The rest of the property values are configured as displayed:

    Copy
    Kafka.Receiver.MyInboundKafka.ExtraDriverOptions=sasl.mechanism=OAUTHBEARER,sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;,sasl.login.callback.handler.class=com.stibo.integration.kafka.domain.impl.auth.OAuthFunctionCallbackHandler,stibo.authentication.function=HydraAuthFunction,security.protocol=SASL_PLAINTEXT    

Use the 'Copy' option to add this property for your system and then edit it for your environment.

Configuration

After completing the prerequisite steps, edit the receiver of the IIEP. Use the following parameters to configure the IIEP. For information on a parameter, hover over the parameter field to display help text.

  1. For the Receiver, choose Kafka Streaming Receiver.

  2. For Consumer Group ID, a read-only value is displayed and includes the name of the server and the ID of the IIEP, separated by a dash (-). This allows for an IIEP to be unique by using the IIEP ID and the system.

  3. For Bootstrap Server(s), select the server(s) where the Kafka broker instances used by the endpoint are running.

  4. For Topic, select the topic used by this endpoint.

  5. In Error Handling Strategy, select an option.

    Note: For all options, errors are reported in the Integration Endpoint Log and in the step.0.log file.

    • Stop - when an error occurs, the IIEP is stopped. Errors should be investigated before resuming.

    • Ignore - the IIEP continues to run as errors are reported.

    • Error Topic - the IIEP continues to run, and the original message and the error details are posted to the topic selected in the Error Topic parameter.

      • When the 'Error Topic' error handling strategy is used, for the Error Topic parameter, select the topic for logging messages and errors.

  6. If required, check the Use SSL checkbox to activate the additional parameters:

    • In Keystore Location, if SSL encryption is required, select an SSL encrypted connection to Kafka. Otherwise, leave this parameter blank.

    • In Keystore Password, enter the password for keystore if required.

    • In Keystore PrivateKey Password, enter the password of the private key in the keystore file, if needed.

    • In Truststore Location, if SSL encryption is required, select an SSL encrypted connection to the Kafka. Otherwise, leave this parameter blank.

    • In Truststore Password, enter the password for truststore if required.

  7. Click the Next button to continue with the IIEP - Configure Endpoint and subsequent steps.

Background Processes Tab

Because individual background processes are not used to move messages from partitions between the Kafka topic and STEP, no Background Processes tab is displayed for streaming IIEPs.

Monitoring progress of the streaming import processes is observed using the Statistics tab, the integration endpoint log, the Error Logs tab in Workbench and dedicated 'message-streaming' log files available in the admin portal, as well as the step.*.log files in the Admin Portal, as described in the Logs topic in the Administration Portal documentation. Monitoring is also available via the KafkaStreamingReceiverStatusSensor as described in the Sensors for External Monitoring topic in the Administration Portal documentation.

Integration Endpoint Log

The Kafka Receiver Integration Endpoint Log includes the following entries:

  • The Integration Endpoint Log information displays when the IIEP is enabled or failed. When the IIEP is stopped, the log is cleared, and the log is not displayed while the IIEP is disabled.

  • When the configuration of a running endpoint is changed, the endpoint is disabled, and all threads are stopped automatically to prepare for implementing the changes. A user must enable the IIEP manually or via a REST call to continue processing with the new configuration by starting new threads.

  • The Error Logs tab groups error details from a message as defined by the partition and offset, and it typically includes the date and time of the error while the IIEP is not disabled.

  • When the Kafka broker is first connected but no messages are retrieved, the number of open threads is based on the number of partitions up to a limit of 10.

  • The IIEP fails and a relevant error message is displayed when the Kafka broker is not running, or the Topic or Error Topic does not exist, or the ExtraDriverOptions property (described in the Prerequisites section above) is not configured or is configured incorrectly when required.

  • Info and warning messages are not reported in the Integration Endpoint Log, only errors. For complete import activity including INFO, WARNING, and SEVERE log entries, review the message-streaming log files in the Admin Portal. For more information, refer to the Logs topic in the Administration Portal documentation.

  • The step.0.log includes additional information limited to WARNING information as reporting on libraries used would be verbose on INFO level.

Statistics Tab

The Statistics tab displays the Endpoint Uptime and the number of completed imports broken down over time intervals (1 min / 1 hour / 8 hours / 24 hours) separately for successful and failed imports. The tab does not include the same parameters that are displayed for non-streaming IIEPs using multiple background processes as they do not apply to message streaming.

Error Logs Tab

Execution report detail is stored in dedicated 'message-streaming' log files with exceptions grouped by message (partition-offset) on the Error Logs tab and listed individually. Other status messages are on the Inbound Integration Endpoint tab in the Integration Endpoint Log section.