The Settings tab for a Source allows a user to specify key information about the Source including input types and source data format.
Settings Tab
Base Parameters
Asterisks (*) mean the Parameter is mandatory and must be specified by users.
- Name*: The name of the Source. The Name must be unique. This will be displayed on the Sources screen when browsing Sources.
- Description*: The description of the Source.
- Hub View Name: Name of view alias of the raw hub table in Databricks
- Active*: If set to Active, the Source will run as specified.
- Group: Used as part of Templates and Tokens
- Source Template: Used as part of Templates and Tokens
- Process Config*: The Process Configuration. Selected from a dropdown.
- Cleanup Config*: The Cleanup Configuration. All sources default to Default Cleanup config.
- Processing Type*: Batch
- Connection Type*: Selector to help filter the Connection dropdown
- Connection*: The Connection to use for this source
Processing Type: Batch vs. Stream
Sources can either be batch or stream based which is indicated in the source setting processing type. Each processing type has it's own set of parameters that apply.
The following connection types are supported for streaming:
- Event - Kafka
- Table - Delta
Stream sources assume a None data refresh type. For more information, view the None refresh type details in the Data Refresh Types section below.
Stream sources include two additional parameters:
-
Trigger Type: For more information, visit the Spark Structured Streaming docs in the "Triggers" section.
- default_micro_batch: Spark decides when to check for new records
- interval_micro_batch: Check for new records every x seconds based on the trigger interval
- available_now_micro_batch: Continues to pull records as long as the stream is sending. Shuts down when stream stops sending records.
-
Trigger Interval:
- Time in seconds to wait before pulling another set of records from the stream. If it is not specified, the system will check for availability of new data as soon as the previous processing has been completed. If a trigger time is missed because the previous processing has not been completed, then the system will trigger processing immediately.
Rules and relations can be used with stream sources to enrich and transform data with the following limitations:
-
Rules:
- All rules are snapshot recalculation mode
- No window functions or keep-current rules are allowed
- No unique flag is available on rules
- No aggregates are allowed
- No sub-source rules are allowed
-
Relations:
- Relations between stream and batch sources need to be of M:1 or 1:1 cardinality with the :1 side of the cardinality being the batch source.
DataForge will always start a new stream for stream sources if ingestion is enabled after maintenance and upgrade windows are complete or if backend services are restarted in the event of an outage. If this is of concern, DataForge recommends using the Available Now Micro Batch trigger in source setting parameters so the stream stops if no new records are available.
Connection Type Specific Parameters
Custom:
- Connection: Optional custom connection that can be used to upload custom parameters to be referenced in the custom Databricks notebook.
- Cluster Config*: The Cluster Configuration selected from a dropdown of available configurations.
File:
-
File Mask*: Name of the file within the connection folder/path. Supports further file path definition as an extension of the connection path. File extension is required in file mask for single file definition (e.g. .csv, .txt).
- Glob syntax supported example: myfile*.csv
- Multi-part file syntax (multi-part setting enabled):
- Multi-part files should end at the directory and not the files within. E.g. /multipart_dir and not /multipart_dir/*
- If the directory is empty or the file mask is pointed to a single file, the process will fail.
- File Type*: Serialization format of the file
-
Parser*: DataForge has two supported parsers for certain File Types
- (Recommended) Spark: Native Spark libraries or extensions used
- Core: DataForge custom Akka streams parser for delimited files with advanced error handling and malformed file debugging
Loopback:
- Virtual Output*: Name of the virtual output this source is linked to and will pull data from. Outputs will only be available for selection if they are a Virtual Output. See Output Settings for more details.
Table:
-
Batch Processing
- Source Query*: Query to run against the connection database.
- There are tokens available to assist with filtering data on ingest to only records updated or inserted that are not already in DataForge. These tokens can be used in the WHERE clause of the Source Query (e.g. WHERE my_date > <latest_timestamp>)
- <extract_datetime>
- This will be replaced with the timestamp of the last successful ingestion process
- <latest_sequence>
- This will be replaced with MAX(s_sequence) from all inputs previously processed
- <latest_timestamp>
- This will be replaced with MAX(s_update_timestamp) for Keyed or MAX(s_timestamp) for Timeseries refresh types over all inputs previously processed
- <latest_watermark>
- This will be replaced with MAX(max_watermark) for Custom refresh types over all inputs previously processed
- <extract_datetime>
-
Stream Processing
- Source Query*: Rather than a full select query, enter the database.table name to be used for the stream.
Event:
- Topic Name*: Kafka topic name to subscribe to
For more information on setting up Event sources, refer to the User Manual's Kafka Events Connections documentation.
API:
-
Source Query*: enter the SOQL statement using Salesforce syntax to run against the Salesforce environment listed on the connection attached.
- Example Query: SELECT * FROM opportunity (SELECT * FROM <object>)
- DataForge recommends using Select * statements and not combining individual columns with * in the source query.
For more information on setting up a Salesforce connection, refer to the User Manual's Salesforce Connection documentation.
Unmanaged External:
Unmanaged external sources allow users to connect to tables in Databricks that DataForge does not manage or control processing and updates to. This allows users to create relations and rules within DataForge that point to Delta Lake and Hive tables in Databricks.
No additional parameters are necessary for this connection type. For more information on setting up an Unmanaged External source, refer to the User Manual's Unmanaged External Source documentation.
Data Refresh Types
Full:
Key:
Timestamp:
Sequence:
None:
Custom:
Choosing a Data Refresh Type:
Questions leading to Refresh Type selection:
- Is the source data less than 100 mb? If yes, use Full refresh.
- Is the source data greater than 10 gb? If yes, use Custom refresh.
- Do you need to delete hard deleted records? If yes, use Full or Custom refresh depending on source size.
- Is the source data between 100 mb and 10 gb and has a unique key? If yes, use Key refresh.
If none of these questions fit, start with Full refresh. For optimizing performance, review the matrix below.
Initiation Type
Advanced Parameters
Select List
The Select List parameter is found in Source Ingestion Parameters. This parameter provides the ability to further refine the raw attributes that are stored for an input. Select List applied a specified select list to ingested data, allowing to expand or hide columns that exist in the source connection dataset. This parameter does not apply to Agent ingestions. The default value is "*".
Example Select List | Description |
* | Ingest every attribute |
*, value.* | Ingest every attribute. Create new columns for every attribute within struct field named "value". |
Schema Evolution
The Schema Evolution Parameter is found in the Source Parsing parameters. It contains a drop-down with preselected combinations of the below options. The default option is "Add, Remove, Upcast, Extend Complex, Clone".
Option | Description | Input 1 Schema | Input 2 Schema | Input 2 Result | Hub Table Result Schema |
Lock | Any schema changes will generate ingestion or parsing error | col_a INT |
col_a INT, col_b STRING |
Fail | col_a INT |
Add | Allows adding new columns | col_a INT |
col_a INT, col_b STRING |
Succeed |
col_a INT, col_b STRING |
Remove | Allows missing (removed) columns in inputs. Missing column values will be substituted with nulls |
col_a INT, col_b STRING |
col_a INT | Succeed |
col_a INT, col_b STRING |
Upcast* | Allows retaining of the same column alias when column datatype changes to the type that can be safely converted to an existing type (upcast). | col_a STRING | col_a INT | Succeed | col_a STRING |
Extend* (Complex/All) | Allows converting hub table column data type to match the data type of the same column from the latest set of data refreshed. Extend complex extends data types that are arrays or structs. Extend all includes complex types and standard types. | col_a INT | col_a DECIMAL | Succeed | col_a DECIMAL |
Clone | Allows creation of new column aliased <original column>_2 when column data type changes and it's not compatible with the existing column. | col_a INT | col_a DECIMAL | Succeed |
col_a INT, col_a_2 DECIMAL |
*When Upcast and/or Extend are enabled, DataForge will only upcast or extend a source raw column if all downstream rules can be upcasted/extended and will still be compatible with any mapped output column data types.
The following matrix represents the data type compatibility for upcasting.
Hub Type | ||||||||||||
string | decimal | timestamp | boolean | int | long | float | double | struct | array | date | ||
Input Type
|
string | identical | ||||||||||
decimal | upcastable | identical | upcastable | |||||||||
timestamp | upcastable | identical | ||||||||||
boolean | upcastable | upcastable | identical | upcastable | upcastable | upcastable | upcastable | |||||
int | upcastable | upcastable | identical | upcastable | upcastable | |||||||
long | upcastable | upcastable | identical | |||||||||
float | upcastable | identical | upcastable | |||||||||
double | upcastable | identical | ||||||||||
struct | upcastable | *compare struct schemas | ||||||||||
array | upcastable | compare array types | ||||||||||
date | upcastable | upcastable | identical |
*Compare Struct Schemas follows this logic:
-
- WHEN hub schema fields CONTAIN input schema fields AND type of each field is UPCASTABLE THEN UPCASTABLE
- WHEN hub schema fields MATCH input schema fields AND type of each field is IDENTICAL THEN IDENTICAL
Sub-Source Settings
Each sub-source contains the same tabs as the parent Source; however, many of the options are disabled and inherited from the parent Source settings. The sub-source can be renamed or given a new description in the settings tab.
For full documentation, visit Sub-Sources.
Updated