Source Settings

The Settings tab for a Source allows a user to specify key information about the Source including input types and source data format.


Settings Tab

When creating a new Source, only the Settings tab is available.
 
The Settings tab enables users to configure parameters that apply to the entire source across all Inputs and Processes associated with the Source.
 
Most of the parameters focus on where, how, and when to ingest data into the DataForge managed data lake, how to refresh data, how to track that information over time, as well as any infrastructure related configuration or tuning to help manage performance and cost of processing this dataset.
 
After the Source is created you can access the Settings tab at any time by clicking on the Settings tab in the upper left.
 

Base Parameters

Asterisks (*) mean the Parameter is mandatory and must be specified by users.
  • Name*: The name of the Source. The Name must be unique. This will be displayed on the Sources screen when browsing Sources.
  • Description*: The description of the Source.
  • Hub View Name: Name of view alias of the raw hub table in Databricks
  • Active*: If set to Active, the Source will run as specified.
  • Group: Used as part of Templates and Tokens
  • Source Template: Used as part of Templates and Tokens
  • Process Config*: The Process Configuration. Selected from a dropdown.
  • Cleanup Config*: The Cleanup Configuration.  All sources default to Default Cleanup config.
  • Processing Type*: Batch 
  • Connection Type*: Selector to help filter the Connection dropdown
  • Connection*: The Connection to use for this source

Processing Type: Batch vs. Stream

Sources can either be batch or stream based which is indicated in the source setting processing type. Each processing type has it's own set of parameters that apply.

The following connection types are supported for streaming:

Stream sources assume a None data refresh type. For more information, view the None refresh type details in the Data Refresh Types section below.

Stream sources include two additional parameters:

  • Trigger Type: For more information, visit the Spark Structured Streaming docs in the "Triggers" section.
    • default_micro_batch: Spark decides when to check for new records
    • interval_micro_batch: Check for new records every x seconds based on the trigger interval
    • available_now_micro_batch: Continues to pull records as long as the stream is sending. Shuts down when stream stops sending records.
  • Trigger Interval:
    • Time in seconds to wait before pulling another set of records from the stream. If it is not specified, the system will check for availability of new data as soon as the previous processing has been completed. If a trigger time is missed because the previous processing has not been completed, then the system will trigger processing immediately.

Rules and relations can be used with stream sources to enrich and transform data with the following limitations:

  • Rules:
    • All rules are snapshot recalculation mode
    • No window functions or keep-current rules are allowed
    • No unique flag is available on rules
    • No aggregates are allowed
    • No sub-source rules are allowed
  • Relations:
    • Relations between stream and batch sources need to be of M:1 or 1:1 cardinality with the :1 side of the cardinality being the batch source.
DataForge will always start a new stream for stream sources if ingestion is enabled after maintenance and upgrade windows are complete or if backend services are restarted in the event of an outage. If this is of concern, DataForge recommends using the Available Now Micro Batch trigger in source setting parameters so the stream stops if no new records are available.

Connection Type Specific Parameters

Custom:

  • Connection: Optional custom connection that can be used to upload custom parameters to be referenced in the custom Databricks notebook.
  • Cluster Config*: The Cluster Configuration selected from a dropdown of available configurations.

File:

  • File Mask*: Name of the file within the connection folder/path. Supports further file path definition as an extension of the connection path.  File extension is required in file mask for single file definition (e.g. .csv, .txt).
    • Glob syntax supported example: myfile*.csv
    • Multi-part file syntax (multi-part setting enabled):
      • Multi-part files should end at the directory and not the files within. E.g. /multipart_dir and not /multipart_dir/*
      • If the directory is empty or the file mask is pointed to a single file, the process will fail.
  • File Type*: Serialization format of the file
  • Parser*: DataForge has two supported parsers for certain File Types
    • (Recommended) Spark: Native Spark libraries or extensions used
    • Core: DataForge custom Akka streams parser for delimited files with advanced error handling and malformed file debugging

Loopback:

  • Virtual Output*: Name of the virtual output this source is linked to and will pull data from. Outputs will only be available for selection if they are a Virtual Output. See Output Settings for more details.

Table:

  • Batch Processing
    • Source Query*: Query to run against the connection database.
    • There are tokens available to assist with filtering data on ingest to only records updated or inserted that are not already in DataForge.  These tokens can be used in the WHERE clause of the Source Query (e.g. WHERE my_date > <latest_timestamp>)
      • <extract_datetime>
        • This will be replaced with the timestamp of the last successful ingestion process
      • <latest_sequence>
        • This will be replaced with MAX(s_sequence) from all inputs previously processed
      • <latest_timestamp>
        • This will be replaced with MAX(s_update_timestamp) for Keyed or MAX(s_timestamp) for Timeseries refresh types over all inputs previously processed
      • <latest_watermark>
        • This will be replaced with MAX(max_watermark) for Custom refresh types over all inputs previously processed
  • Stream Processing
    • Source Query*: Rather than a full select query, enter the database.table name to be used for the stream.

Event:

  • Topic Name*: Kafka topic name to subscribe to

For more information on setting up Event sources, refer to the User Manual's Kafka Events Connections documentation.

API:

  • Source Query*: enter the SOQL statement using Salesforce syntax to run against the Salesforce environment listed on the connection attached.
    • Example Query: SELECT * FROM opportunity (SELECT * FROM <object>)
    • DataForge recommends using Select * statements and not combining individual columns with * in the source query.

For more information on setting up a Salesforce connection, refer to the User Manual's Salesforce Connection documentation.

Unmanaged External:

Unmanaged external sources allow users to connect to tables in Databricks that DataForge does not manage or control processing and updates to.  This allows users to create relations and rules within DataForge that point to Delta Lake and Hive tables in Databricks.

No additional parameters are necessary for this connection type.  For more information on setting up an Unmanaged External source, refer to the User Manual's Unmanaged External Source documentation.


Data Refresh Types

A Data Refresh Type specifies how DataForge should handle processing, refreshing, and storing the data. The five types are described below. The parameters available will dynamically change depending on the user's selection.
 

Full:

Full sources assume each batch of data contains the most recent version of all data for all history. It is a full truncate and reload style refresh.
 
Full refresh is the most simple and can process any data. DataForge recommends starting with Full refresh and switching to another refresh type as needed.
 

Key:

Sources with the Key refresh type contain a unique identifier or key tied to a logical entity.
 
Key refresh is often the most convenient logically, but has performance trade-offs at scale when compared to None, Sequence, or Timestamp. These alternatives are preferred, but not always logically possible.
 

Timestamp:

Timestamp sources identify changes in data using a column that contains the date and/or time for each record. This is most commonly used with Event or IOT data that is written once and only once and has a monotonically increasing timestamp tracking field.
 
It is also often useful for performance optimization vs Keyed if the source data has a defined period where records can be updated, after which they are guaranteed to be static, such as in monthly finance and accounting datasets.
 

Sequence:

Sequence sources identify changes in data using a column that contains a monotonically increasing ID tracking field and follows a write-once pattern.
 
It is also often useful for performance optimization vs Keyed if the source data has a defined range where records can be updated, after which they are guaranteed to be static.
 

None:

None is used when it can be assumed that all data from new Inputs can be considered New.
This is useful for datasets that have an upstream CDC process and can guarantee once-and-only-once delivery of Data to DataForge.
 
This is the most performant of the standard Refresh types, as CDC and the expensive portions of Refresh processes are skipped.
 

Custom:

Custom is used when users need the ability to customize the logic used to partition the dataset and define how to update the hub tables.  This may be because one of the other standard refresh types doesn't fit well or for performance optimization.
 
For more information, please refer to the User Manual's Custom Refresh documentation.

 

Choosing a Data Refresh Type:

The diagram below can be used as base guidance for which Refresh Type to select for what types of source datasets. Data Refresh selection is one of the most important decisions for design.

Questions leading to Refresh Type selection:

  1. Is the source data less than 100 mb? If yes, use Full refresh.
  2. Is the source data greater than 10 gb? If yes, use Custom refresh.
  3. Do you need to delete hard deleted records?  If yes, use Full or Custom refresh depending on source size.
  4. Is the source data between 100 mb and 10 gb and has a unique key? If yes, use Key refresh.

If none of these questions fit, start with Full refresh. For optimizing performance, review the matrix below.

 
Refresh type decision matrix.svg

Initiation Type

This determines how new Inputs are generated within DataForge. There are five options to initiate new data into DataForge: Scheduled, Watcher, Manual, SDK, and Rest API. The three most commonly used initiation types are Scheduled, Watcher, and Manual.
 
Most Connections Types only currently support Scheduled pulls of data from the Connection; however, File Connections include a watcher feature that will automatically begin processing any new files moved or generated in the Connection folder that also match the configured File Mask.  Watcher is not recommended for use with multi-part files such as avro and parquet. 
 
With Custom Connection Type Sources utilizing the SDK, it is possible to have a source be both scheduled and/or initialized from outside of DataForge utilizing the methods within the SDK and the Databricks APIs, providing maximum flexibility for any custom integration with 3rd party tools or in-house built applications.
 

Advanced Parameters

Depending on the selections made in the required parameters section, the advanced parameters section will provide various sub-settings to help configurators tune the jobs to their needs. Descriptions for each are included in the UI. Please submit a support request if the descriptions in the UI do not adequately explain the functionality of a specific parameter.
 
Two commonly used and powerful advanced parameters are Select List and Schema Evolution.

Select List

The Select List parameter is found in Source Ingestion Parameters. This parameter provides the ability to further refine the raw attributes that are stored for an input. Select List applied a specified select list to ingested data, allowing to expand or hide columns that exist in the source connection dataset. This parameter does not apply to Agent ingestions. The default value is "*".

Example Select List Description
* Ingest every attribute
*, value.* Ingest every attribute. Create new columns for every attribute within struct field named "value".

Schema Evolution

The Schema Evolution Parameter is found in the Source Parsing parameters. It contains a drop-down with preselected combinations of the below options. The default option is "Add, Remove, Upcast, Extend Complex, Clone".

Option Description Input 1 Schema Input 2 Schema Input 2 Result Hub Table Result Schema
Lock Any schema changes will generate ingestion or parsing error col_a INT

col_a INT,

col_b STRING

Fail col_a INT
Add Allows adding new columns col_a INT

col_a INT,

col_b STRING

Succeed

col_a INT,

col_b STRING

Remove Allows missing (removed) columns in inputs. Missing column values will be substituted with nulls

col_a INT,

col_b STRING

col_a INT Succeed

col_a INT,

col_b STRING

Upcast* Allows retaining of the same column alias when column datatype changes to the type that can be safely converted to an existing type (upcast). col_a STRING col_a INT Succeed col_a STRING
Extend* (Complex/All) Allows converting hub table column data type to match the data type of the same column from the latest set of data refreshed. Extend complex extends data types that are arrays or structs. Extend all includes complex types and standard types. col_a INT col_a DECIMAL Succeed col_a DECIMAL
Clone Allows creation of new column aliased <original column>_2 when column data type changes and it's not compatible with the existing column. col_a INT col_a DECIMAL Succeed

col_a INT,

col_a_2 DECIMAL

*When Upcast and/or Extend are enabled, DataForge will only upcast or extend a source raw column if all downstream rules can be upcasted/extended and will still be compatible with any mapped output column data types.

The following matrix represents the data type compatibility for upcasting.

    Hub Type
    string decimal timestamp boolean int long float double struct array date
Input Type
string identical                    
decimal upcastable identical           upcastable      
timestamp upcastable   identical                
boolean upcastable upcastable   identical upcastable upcastable upcastable upcastable      
int upcastable upcastable     identical upcastable   upcastable      
long upcastable upcastable       identical          
float upcastable           identical upcastable      
double upcastable             identical      
struct upcastable               *compare struct schemas    
array upcastable                 compare array types  
date upcastable   upcastable               identical

*Compare Struct Schemas follows this logic:

    • WHEN hub schema fields CONTAIN input schema fields AND type of each field is UPCASTABLE THEN UPCASTABLE
    • WHEN hub schema fields MATCH input schema fields AND type of each field is IDENTICAL THEN IDENTICAL

Sub-Source Settings

Each sub-source contains the same tabs as the parent Source; however, many of the options are disabled and inherited from the parent Source settings. The sub-source can be renamed or given a new description in the settings tab.

For full documentation, visit Sub-Sources

Updated

Was this article helpful?

0 out of 0 found this helpful