Custom Refresh

Overview and Benefits

DataForge provides users the flexibility to define a Custom Refresh process that determines how to process, partition, refresh, and store the data in the Data Hub table. 

Custom Refresh is a great option to use for more complex use cases where the standard refresh types do not fit well or advanced partitioning is needed.  A good example of this is if the Source will be ingesting large, changing datasets on a frequent basis. Rather than relying on a standard refresh type to dictate how to partition the data, users can define partition columns and a delete query that are used to update and keep the data in order.

The main benefits of custom refresh include:

  • A custom way to handle data updates when standard refresh types do not work
  • Faster, more efficient processing with targeted partitions and delete queries
  • No hub history table is maintained which leads to even faster processing

With custom refresh, there is no Capture Data Changes (CDC) process that runs so users will see this step skipped when reviewing the Processing tabs.  

How does the Custom Refresh process work?

Step 1: New input data is ingested into the source

Step 2: When the custom refresh process runs, for every row of the hub table, the Delete Query is evaluated and records are deleted where delete query result = true.

Step 3: If an input append filter is specified, the input batch records are filtered to meet the append filter criteria.

Step 4: Custom Refresh completes by appending filtered (or un-filtered) input batch data to the hub table.

Incremental Queries with Custom Refresh

To set up incremental queries using Custom Refresh, there are two tokens available to use int he source query.  The most useful option is to use the Watermark feature to filter on a specific column of data.  The Extract Datetime parameter can be used in the WHERE clause of the source query to filter on the latest input extract datetime.

Latest Watermark

Users can define a Watermark column in the CDC Parameters of Custom Refresh type sources. When set, table connection source queries can include a <latest_watermark> token to be replaced during ingestion with the value from MAX(Watermark Column) from the source hub table.

  • Watermark Column: column in source attributes is substituted as MAX(Watermark Column) in source queries when <latest_watermark> token is used
  • Watermark Initial Value: initial value to substitute into <latest_watermark> when there is no data ingested yet (e.g. 1900-01-01)

Example Source Query:

SELECT * FROM sales.salesorderheader WHERE modifieddate >= <latest_watermark>

Extract Datetime

Users can use the <extract_datetime> token in source queries.  When the query is run during ingestion, this parameter is replaced with the timestamp (UTC) of the latest input that was run.  

Be sure to account for timezone differences as this field is stored in UTC but dates/timestamps compared to it in source systems may be local time.  
DataForge recommends setting an overlap period on the source query by reducing the extract_datetime by 2 hours to ensure there are never data gaps due to times data is updated vs. when inputs are run. 

Here is an example of how to write an incremental query and convert the parameter to local time zone in SQL Server:

SELECT * FROM table WHERE last_modified >= DATEADD(hour, -2, <extract_datetime>)

The conversion syntax will vary depending on the flavor of SQL used in the source system.

If no conversion is needed, the parameter can be referenced as is with single quotes around it.

SELECT * FROM table WHERE last_modified >= <extract_datetime>

Delete Query

Custom Refresh allows users to define a custom delete query that is run while the hub table is updated. Each time refresh runs, records matching the Delete Query will be deleted from the hub table before appending records.  This delete query is written in the form of a WHERE clause as part of DELETE FROM <hub table> WHERE statement. 

[Input] refers to the input batch of data that is currently being processes

[This] refers to the current hub table that is being refreshed

NOTE: Any time [Input] is referenced, it needs to be wrapped using IN(sub-query) [recommended] or EXISTS() [use with caution - performance impact].  Using the patterns below, EXISTS() is much more expensive than IN(sub-query) like [This].attribute >= (SELECT MIN(attribute) FROM [Input]) because EXISTS() is evaluated for every row.
Delete Query Behavior
TRUE Acts like Full refresh, delete and reload all
FALSE Acts like None refresh, no deletes and insert all
[This].salesorderid IN (SELECT [Input].salesorderid FROM [Input]) Delete from the hub table where records in hub table match key column in input batch records.
[This].modifieddate >= (SELECT MIN(modifieddate) FROM [Input]) Delete from the hub table where records in the hub table have a modifieddate value greater or equal to the minimum modifieddate in the latest input batch
EXISTS(SELECT 1 FROM [Input] WHERE [This].salesorderdetailid = [Input].salesorderdetailid) Delete from the hub table where records match on salesorderdetailid between the hub table and the input batch
([This].salesorderdetailid >= (SELECT MIN(salesorderdetailid) FROM [Input]))
OR
([This].modifieddate >= (SELECT MIN(modifieddate) FROM [Input]))
Delete from the hub table where records in the hub table have a salesorderdetailid value greater than or equal to the minimum salesorderdetailid value from the input batch OR have a modifieddate value greater than or equal to the minimum modifieddate from the input batch.

(Optional) Append Filter

During the Custom Refresh process, input batch records are appended to the hub table after the Delete Query is complete. If an append filter is specified, the input batch records are filtered before they are appended to the hub table.

Example filter:

delete_status <> 'D'

Partition Column Guidance

Databricks recommends setting specific Partition Columns only when the table is over a terabyte of data. Databricks also recommends all partitions contain at least a gigabyte of data. Tables with fewer, larger partitions tend to outperform tables with many smaller partitions.  

For more guidance, refer to the Databricks documentation on Partition Columns.

Examples of Custom Refresh Setup

Below are two examples of when Custom Refresh would be a good option to use. These are for configuration example purposes and are not the limit of Custom Refresh capabilities.

Example 1

Using Custom Refresh with an Append Filter to handle hard deletes from a source database based on a delete_status identifier column. The Delete Query deletes any records from the hub table that have a key column ID matching the same ID in the input batch records. The input batch records are filtered to only include records where delete_status <> 'D' and the resulting records are appended to the hub table.

In this 

Example 2

Using Custom Refresh with a Watermark Column and Partition Columns to optimize very large tables with targeted incremental updates.

In this example, the source data is a large, slowly changing transactional table that has a consistent pattern for updates over time with occasional corrections to previous data.  Each new data pull will primarily consist of changes to data in a limited number of months of the year.  For that reason, custom refresh is a good option to use along with setting partition columns for Month and Year. 

The Watermark column allows DataForge to pull incremental data based on the source query defined.

The Partition Columns allow DataForge to only update the partitions that match the latest dataset rather than potentially a large number of partitions, saving processing time and compute spend.

In this case, the source data does not already have raw attributes for Month and Year so enrichments should be added to the Source first before the partition columns are set.

First, pull a limited amount of data to ingest the raw attributes into the source.  Below is an example of how the source settings could look for the initial pull.  Not all the data is needed for this first step so add an optional LIMIT or TOP n to the source query to speed up the data pull.

Sample data source settings for first data pull

Sample data pull to get Raw Schema for enrichment creation

After data is ingested, add enrichments for Month and Year based on a date or timestamp attribute from the source data.

Enrichments to use for partitions after pulling sample data

Delete the sample Source Data that was already ingested. This data was only brought in for the raw attribute table metadata to create enrichments for partitioning.

Delete existing source data

Update the source settings to change the Delete Query and set the Partition Columns of Year and Month.

The Delete Query represents what will be deleted from the hub table.  In this example query, any records from the hub table that have a modifieddate greater than the minimum modifieddate from the new input data pull are deleted so there is no overlap.

Save the updated Source Settings and begin processing the data.  If a different refresh type was used to ingest the sample data, a warning will appear to Save and Reset CDC.  Since the sample data has already been deleted this is ignored after saving.

Begin processing the data.

 

Example 2

Using Custom Refresh to combine multiple sets of refresh logic in the delete query

In this example, the data being ingested should be refreshed with either a Sequence-like process or Timestamp-like process depending on the data ingested.  This is an example of how a user might combine multiple criteria for Refresh with an OR condition in the delete query.

Delete Query:

([This].salesorderdetailid >= (SELECT MIN(salesorderdetailid) FROM [Input])) 
OR
([This].modifieddate >= (SELECT MIN(modifieddate) FROM [Input]))

This will allow DataForge to delete existing data based on salesorderdetailid if there are IDs in the hub table that overlap with the latest input data.  Similarly, existing records will be deleted from the hub table if the latest input contains records with modifieddate earlier than what was already processed. 

 

Set up Change Data Feed for audit trail of record level changes

Databricks provides a Change Data Feed utility for Delta tables that allows users to track changes to specific records over time.  Record level changes can only be tracked as of the date that Change Data Feed is enabled on the table.

Find the source ID and set up Change Data Feed on the hub table in a Databricks notebook with the following command, replacing the source ID in the query:

ALTER TABLE `hive_metastore`.` dataforge `.`hub_<source_id>` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

After enabling this table property, users can query specific record changes in the hub table by using the Change Data Feed commands like so:

SELECT * FROM table_changes('`hive_metastore`.`dataforge`.`hub_17625`', 6) where salesorderid = 43960 order by `_commit_timestamp` desc

This provides a result set like the following where we can track what commit version on the table the records were updated, including the original attributes and values:

In this example, the change data feed property was enabled on the table in version 6 so that is when we can first query the change data.  Use DESCRIBE HISTORY to identify what version the Change Data Feed property was enabled on the table for your query like so:

DESCRIBE HISTORY `hive_metastore`.`dataforge`.`hub_17625`

 

Updated

Was this article helpful?

0 out of 0 found this helpful