I. Introduction

After we have done the configurable processing flow in Azure Databricks, we only need to add a JSON file for each processing flow we want to create on system.

Recently I got a chance to work in a Big Data project. The system is required to transform data from different data sources, each with separate processing flow. The creation and update of the flow should be simple. Transformation features could be created and updated without impact.

Azure Databricks is the Big Data Analytics service we decided to work with. JSON is the format we choose for configuration file. We need to think of a way to connect these two points to provide a flexible and maintainable system. And we come up with a configuration processing mechanism.

In this article, I will describe the mechanism, together with sample code.

II. Databricks Overview

Azure Databricks is the deployment of Databricks on Azure, which provides autoscale, collaborative and interactive workspace. Azure Databricks supports multiple languages such as Java, Python, Scala… (we mainly use Scala and Python in our project).

To get more information, please read the following links:

III. Dynamic processing flow with Databricks

One way to design Databricks is to use one notebook for each data flow. But is has some disadvantages:

  • If data flow is added frequently, development team need to create new notebook from scratch.
  • It takes a lot of time to update a flow that doesn’t meet the need of business.
  • Same functions may be implemented differently (by different developers), it is hard for maintenance.

For that reason, instead of creating separate notebook for each processing flow, we will create a mechanism as below:

Explain the mechanism:

  • Input: We create one configuration file for a certain processing flow. When respective data file is processed, the processing flow will be taken from configuration file.
  • Coordinate notebook: This notebook is in charge of downloading input file, checking configuration file and call feature notebooks according to configuration to process data file.
  • Feature notebooks: These notebooks provide different processing abilities and can be extended based on business in future.

Now, we will implement a sample of this mechanism with 2 feature notebooks: format phone number and remove sensitive column. The implementation will be done in Azure with Azure Databricks and Azure Storage Blob (file storage).

First, we create common notebooks to download and write file to Azure Storage Blob.

Download file notebook (scala):

Write file notebook (python):

Next, we create notebook to convert phone number to standard format, and remove corrupted phone number value.

Next, we create notebook remove sensitive column.

Then, we create coordinate notebook.

Lastly, we test our flow by uploading data file and configuration file to Azure Blob Storage.

The sample data file:

User_ID,Phone_No,User_Name,Password

24306,303-555-0011,achigeol,8[gxXv*QDt9sTQX

65824,225-556-1923,hermathe,Q7#CDYrr?hdxnth6

14506,219-557-3874,stashero,Vq8#upVE7qj9_M+n

71463,215-558-9821,inghthlo,WXzshf8rU^ts8CUN

36808,262-559212-212,adeldona,[email protected]%Dws5

69170,319-660-9832,wdyalbow,r3^T8++f9MhVJe5h

17255,229-661-2134,introsgo,eXH8ENa8J!c*d^P4

56940,216-662-8732,burienti,BSZC_vxPgTm^q4J%

52720,210-663-8724,itereart,A$F3Rtnc4b%Rtk

The sample configuration file:

[{“action”:”remove-column”,”param”:{“column”:”Password”}},{“action”:”format-phone-number”,”param”:{“phoneCol”:”Phone_No”}}]

Testing process is describe as below:

  • Upload data file with name “user.csv” into “input” container
  • Upload configuration file with name “user-configuration.json” into “configuration” file
  • Create “output” container

These above name value are hard coded in coordinate notebook for demo purpose. In reality, you should get these value from parameter or configuration.

After files are prepared, run coordinate notebook. Output file with name “user.csv” should be outputted to “output” container with expected content:

User_ID,User_Name,Phone_No

65824,hermathe,+(84)255561923

24306,achigeol,+(84)035550011

56940,burienti,+(84)166628732

71463,inghthlo,+(84)155589821

17255,introsgo,+(84)296612134

14506,stashero,+(84)195573874

69170,wdyalbow,+(84)196609832

52720,itereart,+(84)106638724

Phone number column is formatted, and Password field is removed from content.

IV. Experience

1. Configurable and reusable

Advantages of above mechanism:

  • Only one configuration file is needed for a new processing flow. With UI/UX, even business users can create/update the flow themselves.
  • New process feature can be added to system without affecting others. The development cost for one feature is reduced compare to implement whole new processing flow.
  • Features can be reusable for different processing flow.

2. Trigger Databricks with Azure Logic App

If you would like to create a full integration flow, you can use Azure Logic App to call Databricks when a file is uploaded to targeted blob container. The step is as below:

  • Create Databricks Job to publish coordinate notebook, you can trigger job to run notebook using Azure Databricks REST API
  • Create Logic App to call Azure Databricks REST API to trigger notebook. The Logic App will be triggered when a file is uploaded to blob container
  • Configure blob container to call Logic App when a file is uploaded

3. Improve performance

According to my experience with Azure Databricks, run a notebook from another notebook may be slower than process in the same notebook. That is because a notebook need time to startup before run. For that reason, if there are too many features the processing flow need to run, you may consider these points to improve performance:

  • Instead of separating notebook by feature, you can separate by business flow, or use one notebook for each flow. All required features are implemented in the same notebook. This method may take away the “reusable” capability but will improve overall performance.
  • Build features into libraries and import the libraries into coordinate notebook to call without referencing to other notebooks. Although each time we need to update or add a new feature, we need to update the libraries (and may result in updating the coordinate notebook), but if the business is not update frequently then it is a considerable method to improve performance.

V. Conclusion

This article introduces a mechanism to “generalize” processing data flow. Though applying into practice has many difficulties, I hope this article can help you in your processing data work and Databricks implementation.

Tran Gia Quoc Hung – Solution & Technology Unit, FPT Software

Related posts: