SDK Overview and Development Approach
The DataForge SDK allows users to define their own Scala code in a Databricks notebook and attach the notebook for automatic processing in DataForge. The SDK can be used for Custom Ingestions, Custom Parsing, and Custom Post Output processes. Notebooks used for Custom Ingestions and Parsing need to return a Spark DataFrame as a result of the code. To set up a custom process, users need a notebook in Databricks, a custom cluster configuration in DataForge, and a Source or Output in DataForge depending on the process type.
Details of the DataForge SDK can be found in the following scala docs: https://docs.dataforgedev.com/com/dataforgelabs/sdk/index.html
Below is the recommended approach to developing a new custom notebook for processing.
- If no cluster has been previously set up, create an all-purpose cluster in Databricks and attach the DataForge SDK library.
- Create a new Databricks notebook, attach the Databricks cluster, and write the custom code. Test that the notebook returns the result as expected before moving on.
- Create a cluster configuration in DataForge and specify the Databricks notebook path along with any optional parameter/config changes.
- Create a Source or Output in DataForge and attach the Custom Cluster Configuration.
- Update Databricks notebook with optional SDK parameters and add the SDK Session logic, wrapping around the existing custom code
- Test the notebook with a new Source data pull (Custom Ingest/Parse) or a new Output (Custom Post Output) to ensure the notebook runs correctly in conjunction with DataForge processes.
Creating an All-Purpose Cluster in Databricks
Follow the steps in the Setting up a Cluster to run Custom Notebooks guide if a cluster has not already been created in Databricks.
Create a Databricks Notebook
In Databricks, select the New + button and choose the Notebook option. Give the newly created notebook a name and set the default code language to Scala. Connect the All-Purpose cluster that was created or choose an existing one. For more information on how to manage notebooks, visit the Databricks documentation.
Below is a sample of custom ingestion notebook code that sets up an ingestion session (required for DataForge SDK) and then returns a Spark DataFrame of the numbers 1 through 5. Note that the IngestionSession and session.ingest lines are commented out until the notebook is ready to be run with a custom ingestion Source later on.
The first two lines of code are standard package imports. The first import is required in every custom notebook to interact with the DataForge SDK. The second import is required to utilize Spark DataFrames for Custom Ingest and Parse processes and optional for Post Output depending on the code written.
import com.dataforgelabs.sdk._
import org.apache.spark.sql.DataFrame
// val session = new IngestionSession("<DataForgeSourceName>","<DataForgeProjectName")
def ingestDf(): DataFrame = {
val values: List[Int] = List(1,2,3,4,5)
val df: DataFrame = values.toDF()
return df
}
display(ingestDf)
// session.ingest(ingestDf)
Create a Custom Cluster Configuration in DataForge
Follow the instructions in the Setting up Custom Processes for Automatic Processing guide to create a new Custom Cluster Configuration. This cluster configuration will be attached in the next step
Attach the Custom Cluster Configuration to a DataForge Source or Output
In DataForge, open the Source (for Ingest/Parse) or Output (for Post Output) and attach the custom cluster configuration. Save the settings changes when complete.
Update Databricks Notebook with optional parameters and SDK Session
Now that the source and cluster config are set up, the portions of the Databricks notebook that interact with the DataForge SDK can be uncommented. Optionally, custom parameters and custom connection parameters can be used in the notebook to generalize the notebook code and make it reusable and adjustable across sources or outputs. For more information, visit the Custom Parameters documentation and the Custom Connections documentation.
WARNING: It is recommended to NEVER save a sensitive value into a val in a notebook or save session.connectionParameters into a val in a notebook. Doing so allows the sensitive parameters to be printed or found in a stack trace.
Below is an example of the updated notebook code.
import com.dataforgelabs.sdk._
import org.apache.spark.sql.DataFrame
val session = new IngestionSession()
def ingestDf(): DataFrame = {
val values: List[Int] = List(1,2,3,4,5)
val df: DataFrame = values.toDF()
session.log("My first custom ingestion!")
return df
}
// display(ingestDf)
session.ingest(ingestDf)
SDK Session - Line 4
The new Ingestion Session code is now uncommented. This line of code will initiate a new session within Dataforge and syntax changes depending on the custom process type. Note that since we are going to initiate the notebook from DataForge via the custom cluster config that is attached to the source, the source name does not need to be written inside the IngestionSession() any longer.
Adding optional Log messages to the process logs in DataForge - Line 9
Developers can add optional messages to the DataForge process logs when the ingestion runs by adding lines of session.log("<message>").
Executing the Ingestion - Line 14
The session.ingest() code is now uncommented. This will finish the session that's created by line 4 in the notebook when it runs. Note the ingestDf function can be named anything so long as it's properly defined and called within session.ingest().
Test the process from the DataForge Source or Output
In DataForge, open the custom Source or Output and run a new data pull (Ingest/Parse) or reset output (Post Output). As an example for custom ingestion, select the option to Pull Now to start a new data ingestion.
When the new input is created, it will start with launching the Custom Ingest Cluster Config to run the notebook. After the process finishes, the status will change to a Success (green check mark) or Failure (red exclamation point).
If the custom process fails, the best way to troubleshoot is to click the status icon to open the process log (or navigate to Process tab), expand the process header row and click the job run hyperlink which gives the ability to open the Databricks job run page. The Databricks page will provide a view of the notebook code along with the error stack trace to troubleshoot.
Custom Ingestion
Writing the Custom Ingestion code in a Databricks notebook
Below is example code running a custom ingestion process.
import com.dataforgelabs.sdk._
import org.apache.spark.sql.DataFrame
val session = new IngestionSession("<DataForgeSourceName>","<DataForgeProjectName")
def ingestDf(): DataFrame = {
val values: List[Int] = List(1,2,3,4,5)
val df: DataFrame = values.toDF()
return df
}
session.ingest(ingestDf)
Imports - Line 1-2
The first two lines are standard package import statements. They are needed to utilize all of the DataForge SDK functionality. The first line imports the DataForge SDK. The second line imports the Apache Spark DataFrame package to allow users to write code to return a DataFrame.
Ingestion Session - Line 4
This line initiates a new ingestion within a DataForge Source. If the notebook is started directly from DataBricks, the DataForge Source name is required so the SDK knows where to send the data. If the notebook is started from DataForge, the "<DataForgeSourceName>" portion can be removed.
Creating the DataFrame Function - Line 6-10
This section of code is the crucial part of the ingestion where the custom user code will go. This code defines a function that gathers data and returns a DataFrame. In the example, the code will create and return a DataFrame with one column and rows for numbers 1 through 5.
Executing the Ingestion and closing the session - Line 12
The last line is used to receive the DataFrame and complete the custom ingestion. It pulls the data as specified in the ingestDf function, normalizes it, and sends it to the DataForge Data Lake.
Attaching the Custom Cluster Configuration to a Source
In DataForge, open the Sources page from the main menu and use the New + button to create a new Source or open an existing custom source.
Select the following settings to set up the source to run the custom ingestion:
- Connection Type: Select Custom
- Custom Ingest Cluster Config: Select the custom cluster configuration
Save the source settings when finished filling out the rest of the options. Below is an example of what this looks like.
Advanced Patterns
Latest tracking fields are available for each session. These fields provide information about which data has been pulled into DataForge in the past. They include:
-
Latest Timestamp
- For a Timestamp or Key Source, this is the most future timestamp that has been seen in the Date Column of the Source.
- Use it to filter down your data pulls and avoid sending repeat data to DataForge.
- Access it in your Databricks notebook with session.latestTrackingFields.sTimestamp
-
Latest Sequence
- For a Sequence Source, this is the largest number that has been seen in the Sequence Column of the Source.
- Use it to filter down your data pulls and avoid sending repeat data to DataForge.
- Access it in your Databricks notebook with session.latestTrackingFields.sSequence
-
Extract Datetime
- For all sources, this is the most recent timestamp that data was pulled into the Source.
- Use it to filter down your data pulls and avoid sending repeat data to DataForge.
- Access it in your Notebook with session.latestTrackingFields.extractDatetime
-
Input Id
- For all Source, this is the most recent input ID of the Source.
- Access it in your Notebook with session.latestTrackingFields.inputId
Custom Parse
Writing the Custom Parse code in a Databricks notebook
Below is example code running a custom parse process. Custom Parse is only available on File Connection Type Sources.
import com.dataforgelabs.sdk._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
val session = new ParsingSession()
def parseDf(): DataFrame = {
val inputId = (session.processParameters \ "input_ids").as[Vector[Int]].head
val rawFileName = spark.sql(s"select source_file_name from meta.input where input_id = ${inputId}").select("source_file_name").collect().map(_.getString(0)).mkString("")
val fileName: String = session.filePath
//access custom parameters if they're saved in source settings
//val customParameters = (session.customParameters \ "param1").as[JsValue]
val df: DataFrame = spark.read.csv(fileName).withColumn("source_file_name", lit(rawFileName))
return df
}
session.run(parseDf)
Imports - Line 1-3
The first three lines are standard package import statements. The first line imports the DataForge SDK. The second line imports the Apache Spark DataFrame package to allow users to write code to return a DataFrame. The third line imports the Spark SQL functions so we can use the lit() function later in the code.
Parsing Session - Line 5
This line creates a new DataForge parsing session. If the notebook is started directly from DataBricks, the DataForge Input ID is required so the SDK knows where to get ingested data and what input to send the parsed data to. If the notebook is started from DataForge, the <inputId> portion can be removed.
Creating the DataFrame function - Line 7-16
The 7th-16th lines are the crucial part of the parsing where the custom user code will go. These lines define a function that returns a Spark DataFrame. In the example, the code will get the rawFileName that was originally ingested along with the fileName of the processed raw file. Then it reads in the ingested data using spark.read.csv and attaches a new column to write the rawFileName to each record and returns the result as a DataFrame. Note that the file type may need to be changed to spark.read.parquet or other types depending on the type of file that was ingested/selected in the source settings.
In the future, replace the code within parseDf with custom code in order to run your custom code. Custom parameters can be configured in the "Parsing" parameters on the Source settings page in DataForge. The custom parameters should be saved in a JSON format that contains at least one key value pair, like {"param1":"example"}. If a JSON array is being used, the JSON array needs to be saved as a key value as well, such as {"array-param":[{"param1":"example"},{"param2":"example"}]}
Executing the Parse - Line 18
This line runs the custom parse. It parses the data as specified in the parseDf function, normalizes it, and sends it to the Dataforge Data Lake.
Attaching the Custom Cluster Configuration to a Source
In DataForge, open the Sources page from the main menu and use the New + button to create a new file Source or open an existing file source.
Select the following settings to set up the source to run the custom parse:
- Connection Type: Select File
- Parser: Select Custom
- Custom Parse Cluster Config: Select the custom cluster configuration
Save the source settings when finished filling out the rest of the options. Below is an example of what this looks like.
Custom Post Output
Writing the Custom Post Output code in a Databricks notebook
Below is example code running a custom post output process. Custom Post Output notebooks can be attached to any type of output.
import com.dataforgelabs.sdk._
val session = new PostOutputSession("<DataForgeOutputName>", "<DataForgeChannelName>", "<DataForgeProjectName>")
def postOutput(): Unit = {
session.log("Hello World!")
println("Hello World!")
}
session.run(postOutput)
Import - Line 1
The first line is a standard package import statement. It is needed to utilize all of the DataForge SDK functionality.
Post Output Session - Line 3
This line creates a new DataForge Post Output session. If the notebook is started directly from DataBricks, the DataForge Output and Channel names are required so the SDK knows where to grab the data. If the notebook is started from DataForge, the "<DataForgeOutputName>", "<DataForgeOutputChannelName>" portion can be removed.
Creating the postOutput function - Line 5-8
The 5th-8th lines are the crucial part of the post output where the custom user code will go. These lines define a function that is of Unit type. In the example, the code will write a log to the DataForge process, then print Hello World. In the future, replace the code within the postOutput function with your custom code.
Executing the postOutput function - Line 10
The last line runs the custom post output. It pulls the data, or runs the code, as specified in the postOutput function and completes the post output process.
Attaching the Custom Cluster Configuration to a Source
In DataForge, open the Outputs page from the main menu and use the New + button to create a new output or open an existing output.
Select the following settings to set up the source to run the custom post output:
- Post Output Commands: Select Custom Notebook
- Custom Post Output Cluster Config: Select the custom cluster configuration
Save the output settings when finished filling out the rest of the options. Below is an example of what this looks like.
Advanced Patterns
Advanced Loopback - Custom Post Output triggering a Custom Ingest
At times, users may need to run both a custom post output notebook and trigger a new ingestion on another source. This is achieved by creating a custom ingestion session within a custom post output session. Below is an example of this code.
import com.dataforgelabs.sdk._
import org.apache.spark.sql.DataFrame
//Create Post output session
val postOutputSession = new PostOutputSession("<DataForgeOutputName>","<DataForgeChannelName>", "<DataForgeProjectName>")
//Process driven parameters
val projectName = (postOutputSession.processParameters \ "project_name").as[String]
val viewDatabase = (postOutputSession.processParameters \ "view_database").as[String]
val virtualView = (postOutputSession.processParameters \ "view_name").as[String]
//User defined parameter on Output custom parameter setting
val destinationSourceName = (postOutputSession.customParameters \ "output" \ "destinationSourceName").as[String]
//Post output logic
def postOutput(): Unit = {
spark.sql(s"CREATE OR REPLACE VIEW customPostOutput AS SELECT * FROM ${viewDatabase}.${virtualView}")
//Ingestion Session to loopback source
val ingestSession = new IngestionSession(destinationSourceName, projectName)
//Ingestion Logic
def ingestionCode(): DataFrame = {
spark.sql("SELECT * FROM customPostOutput")
}
//Run ingest
ingestSession.ingest(ingestionCode)
}
//Run post output code
postOutputSession.run(postOutput)
Post Output Session - Line 6
This line creates a new DataForge Post Output session. If the notebook is started directly from DataBricks, the DataForge Output and Channel names are required so the SDK knows where to grab the data. If the notebook is started from DataForge, the "<DataForgeOutputName>", "<DataForgeOutputChannelName>" portion can be removed.
Reading Output Process Parameters - Line 9-10
There are a number of available process parameters that can be used through the SDK. These lines are calling the view database and view name of the virtual output to use in the notebook logic. The pattern for process parameters is (session.processParameters \ "param").as[String].
Reading User Defined Output Custom Parameter - Line 13
This line reads a custom JSON parameter that is saved in the parameters of the Output settings. In this example, we are using the destinationSourceName to determine which source to start a new ingestion on when the post output notebook runs.
Creating the postOutputCode function - Line 16-29
These lines define a function that is of Unit type and are where the custom code should go for the output. In the example, the post output code creates or replaces a view in Databricks named customPostOutput with the latest data from the virtual output. Inside the postOutputCode function the ingestion session is started and finished as well to start a custom ingestion on the destinationSourceName.
Ingestion Session - Line 20
This line initiates a new ingestion within a DataForge Source. In the example, we are using the parameter that defines the destinationSourceName to tell DataForge which Source to create the new ingestion in.
Creating the DataFrame to Ingest - Lines 23-25
These lines define a function that retrieves data from the customPostOutput view and returns the result as a DataFrame.
Executing the Ingestion - Line 28
This line ingests the DataFrame, normalizes the data, and completes the ingestion process.
Executing the Post Output - Line 32
This line triggers the Post Output session to run, completing the post output code along with the custom ingestion code within it.
Updated