This article will provide you a step by step walkthrough of how to create a custom ingestion using Databricks and DataForge to ingest an excel file.
Follow along with the steps below and modify as needed for your own purposes.
1. Upload file to storage (S3 in this example)
2. Create a cluster in Databricks
Follow the instructions in this article to create a cluster and install the DataForge SDK.
Be sure to set the instance profile.
3. Install DataForge SDK and Excel library on cluster in Databricks
In the newly created cluster, open the Libraries tab and install both the Excel library (maven) and DataForge SDK (jar). Refer back to this article for installing the DataForge SDK.
The Excel library will use Maven as Library Source and coordinates are "com.crealytics:spark-excel_2.13:3.5.0_0.20.3". For more information on versions, see https://mvnrepository.com/artifact/com.crealytics/spark-excel
Excel Library Installation
Once you hit Install for each library, you will know it is successful when the Status shows as "Installed".
4. Create notebook and test display output in Databricks
Create a new notebook by using the left hand navigation panel to open a workspace and Create -> Notebook like the screenshot below.
Give the notebook a name ("Excel Ingestion" in this example) with default language of Scala. Select the new cluster that you created in step 2 and then select Create.
In the new notebook, you will now write your Scala code to import the correct libraries, define a dataframe using the excel library and options, and display the dataframe result. The below screenshot is an example of this.
Lines 1-4: Import the DataForge SDK, Spark SQL framework, and Excel library.
Lines 6-12: Create a dataframe of the excel data by using Spark to read the file. Line 12 is where you will enter the file path to your excel file.
Line 14: Displays the dataframe to test that your code works.
In the above screenshot, I have included a fairly simple example of an excel file and how to read it. Below are additional options you can choose to use to read your excel file as needed. Please refer to the Spark-Excel github page for more information or examples.
Option | Required or Optional | Description and Example |
dataAddress | Optional |
Identifies start of data set
.option("dataAddress", "'Sheet1'!A1:B3") |
header | Required |
Identifies if the data has a header row or not. True or false. .option("header", "true") |
treatEmptyValuesAsNulls | Optional |
Default: True. If true, treats empty cells as NULL values. .option("treatEmptyValuesAsNulls", "false") |
setErrorCellsToFallbackValues | Optional |
Default: False. If true, any ERROR cell values (#N/A) will be converted to the zero values of the column's data type. .option("setErrorCellsToFallbackValues", "false") |
usePlainNumberFormat | Optional |
Default: False. If true, format the cells without rounding and scientific notations. .option("usePlainNumberFormat", "true") |
inferSchema | Optional |
Default: False. If false, all columns are Strings. .option("inferSchema", "true") |
addColorColumns | Optional |
Default: False. .option("addColorColumns", "false") |
timestampFormat | Optional |
Default: yyyy-mm-dd hh:mm:ss[.fffffffff] .option("timestampFormat", "MM-dd-yyyy HH:mm:ss") |
maxRowsInMemory | Optional |
Default: None. If set, uses a streaming reader which can help with big files (will fail if used with xls format files). .option("maxRowsInMemory", 20) |
maxByteArraySize | Optional |
Default: None. See POI API Documentation for more information. .option("maxByteArraySize", 2147483647) |
tempFileThreshold | Optional |
Default: None. Number of bytes at which a zip entry is regarded as too large for holding in memory and the data is put in a temp file instead. .option("tempFileThreshold", 10000000) |
excerptSize | Optional |
Default: 10. If set and if schema inferred, number of rows to infer schema from. .option("excerptSize", 10) |
workbookPassword | Optional |
Default: None. Requires unlimited strength JCE for older JVMs. .option("workbookPassword", "pass") |
schema | Optional |
Default: Either inferred schema, or all columns are Strings. .schema(myCustomSchema) |
It is easiest to test the logic and script directly in Databricks first by displaying the output. Tweak your code as needed to get the result you would want to pass into DataForge.
You are now ready to switch over to DataForge and configure a new cluster configuration and source to utilize your Databricks notebook.
5. Create a new cluster configuration in DataForge
Within your DataForge environment, navigate through the menu to System Configuration -> Cluster Configuration and select New + in the top right corner.
Give your new cluster configuration a name (Excel Ingestion Cluster in this case) and description.
Select Job Task Type as Custom Notebook which will give you the ability to enter the notebook path and change the parameters available below.
Paste in the full path to your custom notebook. An easy way to find this is to revert to your Databricks notebook and hover over the notebook name to see the file path (example in screenshot below).
Copy the whole path and you can paste it in to the DataForge screen Notebook Path.
Open the Job Configuration parameters and in the Libraries parameter enter the below json text:
"[{"maven":{"coordinates":"com.crealytics:spark-excel_2.12:3.5.0_0.20.3"}}]"
Save your new cluster configuration and move on to creating a new source.
Custom notebook cluster configuration
6. Create a new source and attach custom cluster
Navigate through the DataForge menu to Sources and click New + in the top right. Give the new source a name and description.
Select Custom as the Connection Type and then use the drop-down on Custom Ingest Cluster Config to choose the cluster configuration you just set up in step 5.
Change your Data Refresh type and set schedules as needed. You will also see additional parameters below that may be used but for this example we will leave all of the defaults.
Save your source and navigate back to your notebook to make a few adjustments and specify the name of your source in the notebook.
7. Update notebook to include spark session and session ingestion
Once you have a functioning notebook in Databricks showing you the data you want, you will add a few lines of code to the notebook to tell DataForge to start an ingestion session and which source you want to send the data to. You will also replace the display(df) function that was previously in row 13 with return(df). Below is an example of the updated notebook.
Line 6 (addition): Be sure to add the name of your DataForge Source you created in step 6 inside the quotes like I have "Example Excel File". This line creates a new DataForge ingestion session. When this line is run, a new input record and a new process record will be created in DataForge to track the ingestion process. It will also begin a heartbeat that constantly communicates with DataForge to ensure the job has not crashed.
Line 8 (addition): Uses def method to define readData as the dataframe function we will call in the ingestion session.
Line 16 (substitution): Replaces display(df) with return(df) to return the result as the value of readData()
Line 19 (addition): This line runs the custom ingest. It pulls the data as specified in the readData() function, normalizes it, and sends it to the DataForge Datalake.
8. Navigate back to DataForge Source and run Pull Data Now in Inputs tab
Select the three vertical dots on the right side of the header row and use the Pull Data Now option to create an input and begin ingestion. This will kick off the process to bring the excel data into DataForge.
If you encounter any errors, please refer to the rest of the DataForge documentation to resolve the errors and pull in data.
Potential Errors
Depending on the file being ingested, there may be a potential issue where the input does not run successfully in DataForge and throws the error:
Custom Ingestion failed with error: java.io.IOException: Zip bomb detected! The file would exceed the max. ratio of compressed file size to the size of the expanded data. This may indicate that the file is used to inflate memory usage and thus could pose a security risk. You can adjust this limit via ZipSecureFile.setMinInflateRatio() if you need to work with files which exceed this limit. Uncompressed size: 40815631, Raw/compressed size: 408142, ratio: 0.010000 Limits: MIN_INFLATE_RATIO: 0.010000, Entry: xl/pivotCache/pivotCacheRecords2.xml
There is a minimum ratio limiter that is set to block potential attacks from compressed files. The minimum ratio is calculated as (compressed size / uncompressed size) and your file ratio must exceed the minimum ratio which is set to 0.01 as shown in the error message.
If your file is safe to ingest and you need to workaround this, you can put the following code in your Databricks notebook before you open the xlsx file (example: add in line 7 of Step 7 screenshot). This sets the minimum ratio to 0 which will allow you to ignore the error and ingest any file.
shadeio.poi.openxml4j.util.ZipSecureFile.setMinInflateRatio(0)
Updated