Kafka Events Connection

DataForge integrates directly with Kafka Event Topics for batch or stream data Source ingestion and Output publishing. This article explains how to set up a connection to Kafka and either a Source or Output configuration. For more information on setting up Kafka, visit the Apache Kafka documentation. The documentation links below refer to Confluent, a commonly used platform for integrating Kafka Events.

Source Connection

Create a new connection using the following settings and values:

  • Connection Direction: Source
  • Connection Type: Event
  • Uses Agent: No
  • Platform: Kafka
  • Kafka Boostrap Servers*: Enter hosted server location of Kafka

Expand the Parameters section and fill in the remaining details on the page:

  • Kafka Parameters: Configure optional non-sensitive parameters for connecting to Kafka.
    • Example to configure PLAIN authentication: {"kafka.sasl.mechanism":"PLAIN","kafka.security.protocol":"SASL_SSL"}
  • Kafka Sensitive Parameters:  Configure sensitive parameters for connecting to Kafka.
    • Example to configure PLAIN authentication (typically Cluster API keys used for username/password):

      {"kafka.sasl.jaas.config":"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";"}

  • Schema Registry Address: Host address of the schema registry 
  • Schema Registry Parameters: Configure optional non-sensitive parameters for connecting to Kafka Schema Registry.
    • Example: {"confluent.schema.registry.basic.auth.credentials.source":"USER_INFO"}
  • Schema Registry Sensitive Parameters: Configure optional sensitive parameters for connecting to Kafka Schema Registry.
    • Example using API keys for Schema Registry:

      {"confluent.schema.registry.basic.auth.user.info":"<apikeyvalue>:<apikeysecret>"}

Source Settings

Create a new Source to ingest data from a Kafka topic using the following settings and values:

  • Processing Type: Batch or Stream
  • Connection Type: Event
  • Connection: Select a Kafka source connection from the drop-down
  • Topic Name: Enter a Kafka topic name from the Kafka Connection server

The following parameters are optional if the Kafka Connection selected has a Schema Registry defined.

If no Schema Registry is defined on the connection, expand the Ingestion Parameters and enter any remaining fields for ingestion.

Parameter Name Value Description
Select List *, value.*

Applies specified comma-separated select list to ingested data, allowing to expand or hide columns. Expands struct attributes to unique columns via struct.*

For Kafka events, value.* is a common use case.

Value Schema Type binary, avro, avro_from_registry, json

Schema type for value schema. Used to associate event with schema during ingestion

Key Schema Type  binary, avro, avro_from_registry, json

Schema type for key schema. Used to associate event with schema during ingestion

Value Schema

Dependent on Value Schema Type. 

Binary: not required

Avro: JSON (Avro) format

JSON: JSON (DDL) format

Json (avro) or text (json DDL) specified when not using schema registry

Key Schema

Dependent on Key Schema Type. 

Binary: not required

Avro: JSON (Avro) format

JSON: JSON (DDL) format

Json (avro) or text (json DDL) specified when not using schema registry
Key Subject Open text

Schema registry subject for key schema

Value Subject Open text

Schema registry subject for value schema

Kafka Parameters JSON

Non-sensitive Kafka parameters in JSON key-value format, saved in plaintext

Kafka Sensitive Parameters JSON

Sensitive Kafka parameters in JSON key-value format, saved in plaintext

Starting Offsets

earliest, deltas, JSON format

Example: {"topic":{"<partition_no":offset}}

Starting offsets for topic, options are "earliest","deltas" or a json string. Only applies to first input if "deltas" is used as value

Ending Offsets

latest or JSON format

Example: {"topic":{"<partition_no":offset}}

Ending offsets for topic, options are "latest" or a json string

Strip Leading Binary Bytes Number

Remove byte amount of binary data from value record that points to schema registry

Number of Topic Partitions Number

Define if "deltas" is specified for starting_offsets but some partitions on the topic have no data. This will fill in the delta offsets for missing partitions with "-2" to get earliest on those partitions, preventing ingestion error

Example of no schema registry JSON deltas format

Output Connection

Create a new connection using the following settings and values:

  • Connection Direction: Output
  • Connection Type: Event
  • Platform: Kafka
  • Kafka Boostrap Servers*: Enter hosted server location of Kafka

Expand the Parameters section and fill in the remaining details on the page:

  • Kafka Parameters: Configure optional non-sensitive parameters for connecting to Kafka.
    • Example to configure PLAIN authentication: {"kafka.sasl.mechanism":"PLAIN","kafka.security.protocol":"SASL_SSL"} to 
  • Kafka Sensitive Parameters:  Configure sensitive parameters for connecting to Kafka.
    • Example to configure PLAIN authentication:

      {"kafka.sasl.jaas.config":"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username> " password=\"<password>";"}

  • Schema Registry Address: Host address of the schema registry 
  • Schema Registry Parameters: Configure optional non-sensitive parameters for connecting to Kafka Schema Registry.
    • Example: {"confluent.schema.registry.basic.auth.credentials.source":"USER_INFO"}
  • Schema Registry Sensitive Parameters: Configure optional sensitive parameters for connecting to Kafka Schema Registry.
    • Example:

      {"confluent.schema.registry.basic.auth.user.info":"<apikeyvalue>"}

Output Settings

Create a new Output to ingest data from a Kafka topic using the following settings and values:

  • Output Type: Event
  • Connection: Select a Kafka output connection from the drop-down
  • Topic Name: Enter a Kafka topic name from the Kafka Connection server

The following parameters are optional if the Kafka connection selected has a Schema Registry defined.

If no Schema Registry is defined on the connection, expand the Output Parameters and enter any remaining fields for output.

 

Parameter Name Value Description
Value Schema Type string, avro, avro_from_registry, json

Schema type for value schema. Used to associate event with schema during output

Key Schema Type  string, avro, avro_from_registry, json

Schema type for key schema. Used to associate event with schema during output

Value Schema

Dependent on Value Schema Type. 

String: not required

Avro: JSON (Avro) format

JSON: JSON (DDL) format

Json (avro) or text (json DDL) specified

Key Schema

Dependent on Key Schema Type. 

String: not required

Avro: JSON (Avro) format

JSON: JSON (DDL) format

Json (avro) or text (json DDL) specified
Key Subject Open text

Schema registry subject for key schema

Value Subject Open text

Schema registry subject for value schema

Output Mapping

Kafka Outputs only allow sources with Full, Key, or None refresh types for Source Mappings. Kafka Outputs only support two columns, key and value. When the Output settings are first saved, these columns will be automatically added to the Mapping definition.

The Key and Value columns can only be data types of either String or Struct. 

To convert a String into Struct:

  • Open the Source mapped to the output
  • Create a rule that builds the Struct result referencing the String attribute
  • Map the Struct rule attribute to the output mapping column

 

 

Updated

Was this article helpful?

0 out of 0 found this helpful