DataForge integrates directly with Kafka Event Topics for batch or stream data Source ingestion and Output publishing. This article explains how to set up a connection to Kafka and either a Source or Output configuration. For more information on setting up Kafka, visit the Apache Kafka documentation. The documentation links below refer to Confluent, a commonly used platform for integrating Kafka Events.
Source Connection
Create a new connection using the following settings and values:
- Connection Direction: Source
- Connection Type: Event
- Uses Agent: No
- Platform: Kafka
- Kafka Boostrap Servers*: Enter hosted server location of Kafka
Expand the Parameters section and fill in the remaining details on the page:
-
Kafka Parameters: Configure optional non-sensitive parameters for connecting to Kafka.
- Example to configure PLAIN authentication: {"kafka.sasl.mechanism":"PLAIN","kafka.security.protocol":"SASL_SSL"}
-
Kafka Sensitive Parameters: Configure sensitive parameters for connecting to Kafka.
- Example to configure PLAIN authentication (typically Cluster API keys used for username/password):
{"kafka.sasl.jaas.config":"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";"}
- Example to configure PLAIN authentication (typically Cluster API keys used for username/password):
- Schema Registry Address: Host address of the schema registry
-
Schema Registry Parameters: Configure optional non-sensitive parameters for connecting to Kafka Schema Registry.
- Example: {"confluent.schema.registry.basic.auth.credentials.source":"USER_INFO"}
-
Schema Registry Sensitive Parameters: Configure optional sensitive parameters for connecting to Kafka Schema Registry.
- Example using API keys for Schema Registry:
{"confluent.schema.registry.basic.auth.user.info":"<apikeyvalue>:<apikeysecret>"}
- Example using API keys for Schema Registry:
Source Settings
Create a new Source to ingest data from a Kafka topic using the following settings and values:
- Processing Type: Batch or Stream
- Connection Type: Event
- Connection: Select a Kafka source connection from the drop-down
- Topic Name: Enter a Kafka topic name from the Kafka Connection server
The following parameters are optional if the Kafka Connection selected has a Schema Registry defined.
If no Schema Registry is defined on the connection, expand the Ingestion Parameters and enter any remaining fields for ingestion.
Parameter Name | Value | Description |
Select List | *, value.* |
Applies specified comma-separated select list to ingested data, allowing to expand or hide columns. Expands struct attributes to unique columns via struct.* For Kafka events, value.* is a common use case. |
Value Schema Type | binary, avro, avro_from_registry, json |
Schema type for value schema. Used to associate event with schema during ingestion |
Key Schema Type | binary, avro, avro_from_registry, json |
Schema type for key schema. Used to associate event with schema during ingestion |
Value Schema |
Dependent on Value Schema Type. Binary: not required Avro: JSON (Avro) format JSON: JSON (DDL) format |
Json (avro) or text (json DDL) specified when not using schema registry |
Key Schema |
Dependent on Key Schema Type. Binary: not required Avro: JSON (Avro) format JSON: JSON (DDL) format |
Json (avro) or text (json DDL) specified when not using schema registry |
Key Subject | Open text |
Schema registry subject for key schema |
Value Subject | Open text |
Schema registry subject for value schema |
Kafka Parameters | JSON |
Non-sensitive Kafka parameters in JSON key-value format, saved in plaintext |
Kafka Sensitive Parameters | JSON |
Sensitive Kafka parameters in JSON key-value format, saved in plaintext |
Starting Offsets |
earliest, deltas, JSON format Example: {"topic":{"<partition_no":offset}} |
Starting offsets for topic, options are "earliest","deltas" or a json string. Only applies to first input if "deltas" is used as value |
Ending Offsets |
latest or JSON format Example: {"topic":{"<partition_no":offset}} |
Ending offsets for topic, options are "latest" or a json string |
Strip Leading Binary Bytes | Number |
Remove byte amount of binary data from value record that points to schema registry |
Number of Topic Partitions | Number |
Define if "deltas" is specified for starting_offsets but some partitions on the topic have no data. This will fill in the delta offsets for missing partitions with "-2" to get earliest on those partitions, preventing ingestion error |
Example of no schema registry JSON deltas format
Output Connection
Create a new connection using the following settings and values:
- Connection Direction: Output
- Connection Type: Event
- Platform: Kafka
- Kafka Boostrap Servers*: Enter hosted server location of Kafka
Expand the Parameters section and fill in the remaining details on the page:
-
Kafka Parameters: Configure optional non-sensitive parameters for connecting to Kafka.
- Example to configure PLAIN authentication: {"kafka.sasl.mechanism":"PLAIN","kafka.security.protocol":"SASL_SSL"} to
-
Kafka Sensitive Parameters: Configure sensitive parameters for connecting to Kafka.
- Example to configure PLAIN authentication:
{"kafka.sasl.jaas.config":"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username> " password=\"<password>";"}
- Example to configure PLAIN authentication:
- Schema Registry Address: Host address of the schema registry
-
Schema Registry Parameters: Configure optional non-sensitive parameters for connecting to Kafka Schema Registry.
- Example: {"confluent.schema.registry.basic.auth.credentials.source":"USER_INFO"}
-
Schema Registry Sensitive Parameters: Configure optional sensitive parameters for connecting to Kafka Schema Registry.
- Example:
{"confluent.schema.registry.basic.auth.user.info":"<apikeyvalue>"}
- Example:
Output Settings
Create a new Output to ingest data from a Kafka topic using the following settings and values:
- Output Type: Event
- Connection: Select a Kafka output connection from the drop-down
- Topic Name: Enter a Kafka topic name from the Kafka Connection server
The following parameters are optional if the Kafka connection selected has a Schema Registry defined.
If no Schema Registry is defined on the connection, expand the Output Parameters and enter any remaining fields for output.
Parameter Name | Value | Description |
Value Schema Type | string, avro, avro_from_registry, json |
Schema type for value schema. Used to associate event with schema during output |
Key Schema Type | string, avro, avro_from_registry, json |
Schema type for key schema. Used to associate event with schema during output |
Value Schema |
Dependent on Value Schema Type. String: not required Avro: JSON (Avro) format JSON: JSON (DDL) format |
Json (avro) or text (json DDL) specified |
Key Schema |
Dependent on Key Schema Type. String: not required Avro: JSON (Avro) format JSON: JSON (DDL) format |
Json (avro) or text (json DDL) specified |
Key Subject | Open text |
Schema registry subject for key schema |
Value Subject | Open text |
Schema registry subject for value schema |
Output Mapping
Kafka Outputs only allow sources with Full, Key, or None refresh types for Source Mappings. Kafka Outputs only support two columns, key and value. When the Output settings are first saved, these columns will be automatically added to the Mapping definition.
The Key and Value columns can only be data types of either String or Struct.
To convert a String into Struct:
- Open the Source mapped to the output
- Create a rule that builds the Struct result referencing the String attribute
- Map the Struct rule attribute to the output mapping column
Updated