I. Introduction
After we have done the configurable processing flow in Azure Databricks, we only need to add a JSON file for each processing flow we want to create on system.
Recently I got a chance to work in a Big Data project. The system is required to transform data from different data sources, each with separate processing flow. The creation and update of the flow should be simple. Transformation features could be created and updated without impact.
Azure Databricks is the Big Data Analytics service we decided to work with. JSON is the format we choose for configuration file. We need to think of a way to connect these two points to provide a flexible and maintainable system. And we come up with a configuration processing mechanism.
In this article, I will describe the mechanism, together with sample code.
II. Databricks Overview
Azure Databricks is the deployment of Databricks on Azure, which provides autoscale, collaborative and interactive workspace. Azure Databricks supports multiple languages such as Java, Python, Scala… (we mainly use Scala and Python in our project).
To get more information, please read the following links:
- Databricks website: https://databricks.com/
- Azure Databricks overview: https://azure.microsoft.com/en-us/services/databricks/
III. Dynamic processing flow with Databricks
One way to design Databricks is to use one notebook for each data flow. But is has some disadvantages:
- If data flow is added frequently, development team need to create new notebook from scratch.
- It takes a lot of time to update a flow that doesn’t meet the need of business.
- Same functions may be implemented differently (by different developers), it is hard for maintenance.
For that reason, instead of creating separate notebook for each processing flow, we will create a mechanism as below:
Explain the mechanism:
- Input: We create one configuration file for a certain processing flow. When respective data file is processed, the processing flow will be taken from configuration file.
- Coordinate notebook: This notebook is in charge of downloading input file, checking configuration file and call feature notebooks according to configuration to process data file.
- Feature notebooks: These notebooks provide different processing abilities and can be extended based on business in future.
Now, we will implement a sample of this mechanism with 2 feature notebooks: format phone number and remove sensitive column. The implementation will be done in Azure with Azure Databricks and Azure Storage Blob (file storage).
First, we create common notebooks to download and write file to Azure Storage Blob.
Download file notebook (scala):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
val storageAccountName = "<your_storage_account_name>" val storageAccountKey = "<your_storage_account_key>" // Get information from parameters val containerName = dbutils.widgets.get("containerName") val fileName = dbutils.widgets.get("fileName") val outputFile = dbutils.widgets.get("outputFile") val fileType = dbutils.widgets.get("fileType") // Configure connection string // In reality, you should use Key Vault instead of using key directly conf.set("fs.azure.account.key."+ storageAccountName + ".blob.core.windows.net", storageAccountKey) // Remove the file if it exists fs.rm(outputFile, true) // Download csv file if(fileType == "csv") { val dataDf = spark.read .option("header","true") .option("inferSchema", "true") .csv("wasbs://"+ containerName + "@"+ storageAccountName+ ".blob.core.windows.net/" + fileName) write.parquet(outputFile) } elseif(fileType == "json") { // Download json file val dataDf = spark.read.json("wasbs://"+ containerName + "@"+ storageAccountName+ ".blob.core.windows.net/" + fileName) write.json(outputFile) } else{ // Download text file val dataDf = spark.read.textFile("wasbs://"+ containerName + "@"+ storageAccountName+ ".blob.core.windows.net/" + fileName) val result = dataDf.collect().mkString("\n") notebook.exit(result) } |
Write file notebook (python):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
val storageAccountName = "<your_storage_account_name>" val storageAccountKey = "<your_storage_account_key>" // Get information from parameters dataPath = dbutils.widgets.get("dataPath") output_container_name = dbutils.widgets.get("output_container_name") outputPath = dbutils.widgets.get("outputPath") # Configure blob storage account access key globally # In reality, you should use Key Vault instead of using key directly conf.set("fs.azure.account.key.%s.blob.core.windows.net"% storage_name, storage_key) df = spark.read.parquet(dataPath) output_container_path = "wasbs://%[email protected]%s.blob.core.windows.net/"% (output_container_name, storage_name) output_blob_folder = "%stmpFolder"% output_container_path # write the dataframe as a single file to blob storage (df .coalesce(1) .write .mode("overwrite") .option("header", "true") .format("com.databricks.spark.csv") .save(output_blob_folder)) files = dbutils.fs.ls(output_blob_folder) output_file = [x forx infiles ifname.startswith("part-")] fs.mv(output_file[0].path, "%s"% (output_container_path + outputPath)) |
Next, we create notebook to convert phone number to standard format, and remove corrupted phone number value.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
importutil.matching.Regex importapache.spark.sql.functions._ // define function check def isPhoneNumberFormat() = udf((data: String) => { var pattern: Regex = "^[+]*[(]84[)]\\d{8,10}$".r if (pattern.findFirstMatchIn(data).mkString.length() > 0) { 1 // correct pattern } else { val data2 = data.replaceAll("[+()-]|\\s", "") // remove special character if(data2.length() > 11 | data2.length() < 10) { 0// not a format of phone number } else{ 100// can be corrected } } }) // define corrected function def correctPhoneNumber() = udf((data: String, check: Int) => { if(check == 1 | check == 0) { toString // no need to correct } else{ if(check == 100) { // replace first digit to +(84) replaceAll("[+()-]|\\s", "").replaceFirst("[0-9]", "+(84)").toString } else{ data } } }) // read paramter val dataPath = dbutils.widgets.get("dataPath") val colName = dbutils.widgets.get("phoneCol") val keyCol = dbutils.widgets.get("keyCol") val tmpColumn = dbutils.widgets.get("tmpColumn") var df = spark.read.parquet(dataPath) // execute check, return data frame with 3 column: key Column, corrected column, check column df = df.withColumn(tmpColumn, isPhoneNumberFormat()(df(colName))) df = df.withColumn(colName, correctPhoneNumber()(df(colName), df(tmpColumn))) .select(keyCol,colName, tmpColumn) show() // write to temp file and result val outputPath = dataPath + System.currentTimeMillis().toString write.parquet(outputPath) notebook.exit(outputPath) |
Next, we create notebook remove sensitive column.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
importapache.spark.sql.functions._ // read paramter val dataPath = dbutils.widgets.get("dataPath") val keyCol = dbutils.widgets.get("keyCol") val colNameStr = dbutils.widgets.get("column") var df = spark.read.parquet(dataPath) val colNameLst = colNameStr.split("//") df= df.select(keyCol) for( w <- 0to colNameLst.length - 1) { df= df.withColumn(colNameLst(w), lit(1)) } df= df.drop(keyCol) // write to temp file and result val outputPath = dataPath + (current_timestamp()).expr.eval().toString write.parquet(outputPath) notebook.exit(outputPath) |
Then, we create coordinate notebook.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
1. import org.apache.spark.sql.functions._ 2. import org.apache.spark.sql.types.{ 3. StructType, StructField, StringType, IntegerType} 4. import org.apache.spark.sql.Row 5. import org.apache.spark.sql.Column 6. 7. 8. // Configure common feature notebook 9. val downloadFileNotebook = "<path_to_download_file_notebook>" 10. val writeFileNotebook = "<path_to_write_file_notebook>" 11. val contraintFeaturesFolder = "<path_to_feature_notebook_folder>" 12. 13. val tmpFolder = "/tmp/" 14. 15. // These values are hard coded here, but you can update to get it from parameters 16. val dataFile = "user.csv" 17. val dataFileContainer = "input" 18. val validationFile = "user-configuration.json" 19. val validationFileContainer = "configuration" 20. val keyCol = "User_ID" 21. 22. 23. val hashString = (current_timestamp()).expr.eval().toString 24. // Download data file 25. val outputFileData = tmpFolder + "dataTemp" + hashString 26. dbutils.notebook.run(downloadFileNotebook, 60, Map("containerName" -> dataFileContainer, "fileName" -> dataFile, "outputFile" -> outputFileData, "fileType" -> "csv")) 27. 28. //download validation file 29. val outputFileValidation = tmpFolder + "validation" + hashString 30. dbutils.notebook.run(downloadFileNotebook, 60, Map("containerName" -> validationFileContainer, "fileName" -> validationFile, "outputFile" -> outputFileValidation, "fileType" -> "json")) 31. 32. // Read validation file 33. val constraintDF = spark.read.json(outputFileValidation) 34. 35. // Read data file 36. var dataDf = spark.read.parquet(outputFileData) 37. val dataOrginDf = spark.read.parquet(outputFileData) 38. 39. //Read constraint check 40. val conDf = constraintDF.select($"action", $"param.*") 41. 42. val tmpColumn = "tmpCheck" 43. 44. //create empty failzone dataframe 45. val schema = StructType( 46. StructField(keyCol, StringType, false) :: Nil) 47. 48. // var failDF = spark.createDataFrame(sc.emptyRDD[Row], schema) 49. var constOutputLst = Array[String]() 50. // execute constraint check 51. conDf.collect().foreach(row => { 52. var paramMap = scala.collection.mutable.Map[String, String]() 53. paramMap("dataPath") = outputFileData 54. paramMap("keyCol") = keyCol 55. paramMap("tmpColumn") = tmpColumn 56. // create param map 57. for( w <- 1 to conDf.columns.size - 1) 58. { 59. if (row(w) != null) { 60. paramMap(conDf.columns(w)) = row(w).toString 61. } 62. } 63. // execute Check 64. val outputPath = dbutils.notebook.run(contraintFeaturesFolder + row(0).toString, 60, paramMap) 65. // add output file path to list for process later 66. constOutputLst :+= outputPath 67. }) 68. 69. var failDF = spark.createDataFrame(sc.emptyRDD[Row], schema) 70. 71. //Process constraint output file 72. constOutputLst.foreach (file => { 73. val outDF = spark.read.parquet(file) 74. // check to remove column, if returned dataframe not have key Column => remove column 75. if (outDF.columns.filter(_ == keyCol).length > 0) { 76. // get all column but keycol and tempCheck column 77. outDF.columns.filter(_ != keyCol).filter(_ != tmpColumn).foreach(colName => { 78. // update or add new column from output file 79. val tmp = outDF.select(keyCol, colName) 80. dataDf = dataDf.drop(colName).join(tmp, dataDf(keyCol) === tmp(keyCol), "left").drop(tmp(keyCol)) 81. }) 82. // add fail data to fail df 83. failDF = failDF.union(outDF.filter(col(tmpColumn) === 0).select(keyCol)) 84. } else { 85. // remove column 86. outDF.columns.filter(_ != tmpColumn).foreach(colName => { 87. dataDf = dataDf.drop(colName) 88. }) 89. } 90. 91. }) 92. 93. //save to correctZone 94. val correctZoneDF = dataDf.join(failDF, dataDf(keyCol) === failDF(keyCol), "left_anti") 95. if (correctZoneDF.collect().size > 0) { 96. val dataPath = outputFileData + "output" + System.currentTimeMillis().toString 97. val output_container_name = "output" 98. val outputPath = dataFile 99. correctZoneDF.write.parquet(dataPath) 100. dbutils.notebook.run(writeFileNotebook, 60, Map("dataPath" -> dataPath, "output_container_name" -> output_container_name, "outputPath" -> outputPath)) 101. } 102. 103. // Remove temp file if it exists 104. dbutils.fs.rm(outputFileValidation, true) 105. dbutils.fs.rm(outputFileData, true) 106. 107. // return result check 108. dbutils.notebook.exit("1") |
Lastly, we test our flow by uploading data file and configuration file to Azure Blob Storage.
The sample data file:
User_ID,Phone_No,User_Name,Password 24306,303-555-0011,achigeol,8[gxXv*QDt9sTQX 65824,225-556-1923,hermathe,Q7#CDYrr?hdxnth6 14506,219-557-3874,stashero,Vq8#upVE7qj9_M+n 71463,215-558-9821,inghthlo,WXzshf8rU^ts8CUN 36808,262-559212-212,adeldona,[email protected]%Dws5 69170,319-660-9832,wdyalbow,r3^T8++f9MhVJe5h 17255,229-661-2134,introsgo,eXH8ENa8J!c*d^P4 56940,216-662-8732,burienti,BSZC_vxPgTm^q4J% 52720,210-663-8724,itereart,A$F3Rtnc4b%Rtk |
The sample configuration file:
[{“action”:”remove-column”,”param”:{“column”:”Password”}},{“action”:”format-phone-number”,”param”:{“phoneCol”:”Phone_No”}}] |
Testing process is describe as below:
- Upload data file with name “user.csv” into “input” container
- Upload configuration file with name “user-configuration.json” into “configuration” file
- Create “output” container
These above name value are hard coded in coordinate notebook for demo purpose. In reality, you should get these value from parameter or configuration.
After files are prepared, run coordinate notebook. Output file with name “user.csv” should be outputted to “output” container with expected content:
User_ID,User_Name,Phone_No 65824,hermathe,+(84)255561923 24306,achigeol,+(84)035550011 56940,burienti,+(84)166628732 71463,inghthlo,+(84)155589821 17255,introsgo,+(84)296612134 14506,stashero,+(84)195573874 69170,wdyalbow,+(84)196609832 52720,itereart,+(84)106638724 |
Phone number column is formatted, and Password field is removed from content.
IV. Experience
1. Configurable and reusable
Advantages of above mechanism:
- Only one configuration file is needed for a new processing flow. With UI/UX, even business users can create/update the flow themselves.
- New process feature can be added to system without affecting others. The development cost for one feature is reduced compare to implement whole new processing flow.
- Features can be reusable for different processing flow.
2. Trigger Databricks with Azure Logic App
If you would like to create a full integration flow, you can use Azure Logic App to call Databricks when a file is uploaded to targeted blob container. The step is as below:
- Create Databricks Job to publish coordinate notebook, you can trigger job to run notebook using Azure Databricks REST API
- Create Logic App to call Azure Databricks REST API to trigger notebook. The Logic App will be triggered when a file is uploaded to blob container
- Configure blob container to call Logic App when a file is uploaded
3. Improve performance
According to my experience with Azure Databricks, run a notebook from another notebook may be slower than process in the same notebook. That is because a notebook need time to startup before run. For that reason, if there are too many features the processing flow need to run, you may consider these points to improve performance:
- Instead of separating notebook by feature, you can separate by business flow, or use one notebook for each flow. All required features are implemented in the same notebook. This method may take away the “reusable” capability but will improve overall performance.
- Build features into libraries and import the libraries into coordinate notebook to call without referencing to other notebooks. Although each time we need to update or add a new feature, we need to update the libraries (and may result in updating the coordinate notebook), but if the business is not update frequently then it is a considerable method to improve performance.
V. Conclusion
This article introduces a mechanism to “generalize” processing data flow. Though applying into practice has many difficulties, I hope this article can help you in your processing data work and Databricks implementation.
Tran Gia Quoc Hung – Solution & Technology Unit, FPT Software
Related posts: