Thursday, March 30, 2017

Example Apache Spark ETL Pipeline Integrating a SaaS

Augmenting a Simple Street Address Table with a Geolocation SaaS (Returning JSON) on an AWS based Apache Spark 2.1 ETL Pipeline via a (Free) Databricks Community Account

Version: 2017.03.29

This post as a .pdf

When I first started out on this project and long before I had any intention of writing this blog post, I had a simple goal which I had assumed would be the simplest and most practical use case for using Apache Spark:

  • Create an ETL pipeline which starts with a very simple table
  • Perform some fairly basic ETL actions
    • Including composing calls to a SaaS (which returns JSON)
    • Extracting values
    • Rejoining them to emit a new table
  • Emit a final table as a .tsv (Tab Separated Value) file
    • A join of the original table, the SaaS extracted values, and the raw SaaS JSON response (from which the values were extracted)

Man, I couldn’t have been more wrong if I had tried.

Before you read further, this isn’t about the super sexy side of Spark with machine learning, AI, wowzer graphic charts, etc. This covers the everyday practical boring drudge work most of us must slog through to get simple things done on our daily projects. Continue reading if you are really looking for pragmatic solutions to the simpler challenges of just getting things to work in Spark with a relatively simple SaaS. If you are looking for more than that, you likely need to be going to your local Data Science Meetup. There, you can enjoy all sorts of Big Data and Data Science porn.

So, if you remain interested in us low level code and data sloggers...

After +30 hours of Googling my fool head off, falling down numerous rabbit holes, reading through dozens of very sparsely written blog posts (many of which seem to copy the same code from each other...and every single one started by reading JSON from a file, not from a table column value...which I would have thought was by far the primary use case), pouring over the marginal Spark SQL documentation (and I am being _very_ diplomatic here), arriving at dead ends while exploring many links containing content which no longer worked, and then just patiently experimenting with Spark SQL (and Scala) in the Databricks Notebook, I finally was able to piece together the example solution below. I sincerely hope it saves you the time it most certainly didn’t save me.

In attempting to find the answers to produce this solution, it was further complicated by the fact that Apache Spark has been and continues to drop new versions fairly quickly (several a year). So, it became apparent as I was researching, I had to look at each blog post’s publishing date to attempt to figure out to which version of Spark the post was implicitly referencing so I could further infer to what degree the content remained technically relevant. IOW, while the solution I provide below might look simple, even trivial, it was anything but.

The good news is that I now have a fully working solution which is mostly composed of Spark SQL. The bad news is there are several points where it is required I dip under the covers into Scala to directly manipulate the Spark API (Apache Spark is written in Scala, hence the reason Scala was used instead of Java, Python or R). I have worked to minimize the number and scope of these Scala dips and keep as much as is possible in Spark SQL.

Table of Contents:

Obtaining a Databricks Community Edition Account

Creating a Cluster

An Overview of the ETL Pipeline Project

Implementing the ETL Pipeline Project

1. Creating and Populating the “geolocation_example” Table

2. Creating the “GeolocationEtlPipeline” Spark SQL Notebook

3. Displaying the Contents of the “geolocation_example” Table

4. Creating the Scala UDFs (User Defined Functions) to Support the Microservice SaaS URLs

5. Creating the “saas_request_parameters_urlencoded” Table

6. Creating the “saas_request” Table

7. Creating the “saas_response” Table with Call to SaaS

8. Creating the “saas_response_json” Table

9. Creating the “saas_response_json_extraction” Table

10. Creating the “saas_response_final” Table

11. Creating the “saas_response_final” TSV File

Summarizing

Obtaining a Databricks Community Edition Account

Assuming you plan to play around with this yourself, the first thing you will need to do is set up a Databricks Community Edition account. The process involves filling out an online form. This account enables interactively working with Apache Spark itself via a (SQL) Notebook (described below).

Disclaimer: I am not in any way related to or compensated by Databricks. My recommendation you get a Databricks community account is entirely driven by my desire for you to be able to directly follow along the solution as I present it, and nothing else.

Creating a Cluster

In order to do anything useful with a Databricks account including using a Notebook, we must create a “cluster”.

DatabricksBase.png

Figure CC1

Once you have logged into the Databricks account, a vertical menu will appear on the left side of the web page with the “databricks” icon appearing and selected at the top (Figure CC1)

Clusters.png

Figure CC2

  • In the vertical menu to the left, select the “Clusters” icon (third from the bottom). The “Clusters” page will appear (Figure CC2).
  • Figure CC3

  • Select the “+ Create Cluster” button just to the right of the “Active Clusters” label. The “Create cluster” page will appear (Figure CC3).
    • Enter “Test1” into the “Cluster Name” text box.
    • Select “Spark 2.1 (Auto-Updating, Scala 2.11)” from the “Apache Spark Version” dropdown box.
    • Click the “Create Cluster” button appearing to the right of both the “New Cluster” label and the “Cancel” button towards the top of the page. This will take you back to the Clusters page.

    Figure CC4

  • A new entry in the “Active Clusters” table named “Test1” will appear with a small rotating circle to the left of it (Figure CC4). This indicates the cluster is initializing.
    • You can confirm this by seeing the value “Pending” in the State column (4th column).

ATTENTION: Because this is a HUGE and very generous value being offered by Databricks at cost to themselves and no cost to you, I would sincerely appreciate and respectfully request you acknowledge and honor their generosity by ensuring you minimize the costs to them as best as you can.

The primary way you can help minimize costs for Databricks is to explicitly terminate (i.e shut down) your cluster when you are finished with a session. IOW, while Databricks will automatically terminate your cluster after 2 hours of inactivity, they continue to be charged by AWS the entire time the cluster remains up and is sitting idle. So, please do the respectful and responsible thing and terminate the cluster you are using when you finish a working session. 

An Overview of the ETL Pipeline Project

Now that a cluster exists with which to perform all of our ETL operations, onward to the construction of said ETL pipeline.

The ETL pipeline will start with a .tsv file which is loaded into Databricks as a table. And then via a Databricks Spark SQL Notebook, a series of new tables will be generated as the information is flowed through the pipeline and modified to enable the calls to the SaaS. Then, the SaaS responses will be collected, specific data points extracted from the JSON. Then the original table, the extracted data points and the SaaS’s JSON response are joined to produce the final table. And the final table is then emitted as a .tsv file which can be downloaded from a specific URL.

Implementing the ETL Pipeline Project

Now that a cluster exists with which to perform all of our ETL operations, we must construct the different parts of the ETL pipeline.

1. Creating and Populating the “geolocation_example” Table

Figure IEPP1.1

  • On the vertical menu to the left, select the “Tables” icon. This will activate a pop-right menu with the topmost item being the “+ Create Table” button (Figure IEPP1.1).
  • Figure IEPP1.2

  • Click the “+ Create Table” button. This will display the “Create Table” page (Figure IEPP1.2). Within this page...
    • From the “Data Source” dropdown should show “File” (if not, set it to “File”).
    • Click in the file drop target area “Drop file or click here to upload” to bring up a file dialog box.
    • Navigate to and select the “geolocation_example.tsv” text file
    • Figure IEPP1.3

    • The file upload progress will be shown and then a message will appear looking something like “Uploaded to DBFS ?  /Filestore/tables/mppv0m221489612343295/geolocation_example.tsv” (Figure IEPP1.3). Text in bold will vary from upload to upload.
    • IEPP4.png

      Figure IEPP1.4

    • Click the “Preview Table” button which has appeared (Figure IEPP1.4).
    • Enter “geolocation_example” into the “Table name” text box.
    • The “File type” should show as “CSV” (if not, set it to “CSV”).
    • The “Column Delimiter” should show as “\t” for the tab character (if not, set it to “\t”).
    • If the “First row is header” checkbox is not selected, select it (which will cause a pause while the dataset is rescanned and redisplayed).
    • Figure IEPP1.5

    • Click the “Create Table” button. A new page titled “Table: geolocation_example” should appear (Figure IEPP1.5). This page should summarize the “geolocation_example.tsv” we just finished uploading.

2. Creating the “GeolocationEtlPipeline” Spark SQL Notebook

Figure IEPP2.1

  • On the vertical menu to the left, select the “Home” icon. This will activate a pop-right menu with the topmost item being the “Workspace” dropdown menu (Figure IEPP2.1).
  • IEPP2.2

  • Click the “Workspace” menu’s arrow and select “Create -> Notebook” (Figure IEPP2.2). This will cause the ”Create Notebook” popup modal dialog to appear.
    • Enter “GeolocationEtlPipeline” into the “Name” text box.
    • The “Language” dropdown should show as “SQL” (if not, set it to “SQL”)
    • The “Cluster” dropdown should show as “Test1 (...” (if not, set it to “Test1 (...”)

    IEPP2.4

  • Click the “Create” button (in the lower right) to close the dialog. This will create the new Notebook, and then change to the “GeolocationEtlPipeline (SQL)” Notebook page (Figure IEPP2.4).

3. Displaying the Contents of the “geolocation_example” Table

Paste the following Spark SQL into the empty cell. The cell should be the first one and empty if step 2 above was just successfully completed. Assuming the cursor is still in the cell following your paste, execute the query by pressing Shift+Enter:

SELECT *

  FROM geolocation_example

Figure IEPP3.1

A table should appear below the SQL query showing a few rows from the “geolocation_example” table (Figure IEPP3.1). Additionally, a new empty cell should have appeared below this cell.

Technically, this SQL query step is optional. It is a useful step to perform to validate the results of the operation. It will be marked as “OPTIONAL STEP:” when this appears multiple times across later steps.

Paste the following Scala code into the new empty cell. And instead of pressing Shift+Enter, use the mouse to click the small “play” icon. The “play” icon is the leftmost icon in the group of four icons found in the upper right corner of this cell:

%scala

val df1 =

  spark.sql("SELECT * FROM geolocation_example")

df1.printSchema()

Figure IEPP3.2

An ASCII tree representation of the “geolocation_example” table’s schema should appear below the Scala cell (Figure IEPP3.2).

Technically, this Scala code step is optional. It is a useful step to perform to validate the results of the operation. It will be marked as “OPTIONAL STEP:” when this appears multiple times across later steps.

Figure IEPP3.3

Because we clicked the “play” icon to execute the cell (as opposed to pressing Shift+Enter), a new empty cell did not appear. So, we must now create a new empty cell. To do this, either use the “+” button at the center bottom edge of this cell (Figure IEPP3.3 - you must hover your mouse pointer over the area for the button to appear).

Figure IEPP3.4

Or click the cell’s “drop down menu” icon and then select the item “Add Cell Below”. The cell’s “drop down menu” icon is the second from the left icon in the group of four icons found in the upper right corner of this cell

4. Creating the Scala UDFs (User Defined Functions) to Support the Microservice SaaS URLs

Figure IEPP4.1

Paste the following Scala code into the empty cell and execute (Figure IEPP4.1):

%scala

val functionUrlEncodeValue: (String => String) = (url: String) => {

  java.net.URLEncoder.encode(url, "UTF-8")

}

val functionUrlGet: (String => String) = (url: String) => {

  val httpClient = org.apache.http.impl.client.HttpClientBuilder.create().build()

  val httpGet = new org.apache.http.client.methods.HttpGet(url)

  val httpResponse = httpClient.execute(httpGet)

  val basicResponseHandler = new org.apache.http.impl.client.BasicResponseHandler()

  basicResponseHandler.handleResponse(httpResponse).toString  

}

val sparkSession: org.apache.spark.sql.SparkSession =

  spark

val unitFunctionUrlEncodeValue: Unit =

  sparkSession.udf.register("functionUrlEncodeValue", functionUrlEncodeValue)

val unitFunctionUrlGet: Unit =

  sparkSession.udf.register("functionUrlGet", functionUrlGet)

These Scala UDFs will be used later within following Spark SQL Notebook cell functions.

ATTENTION: The Scala UDF code above was hard earned and is REALLY VALUABLE! IOW, this was one of the things I spent a number of hours hunting down and tweaking until it finally “just worked”. It wasn’t obvious and it wasn’t a trivial “google it” and find the solution (although, I hope this ends up being so for those who now follow me).

Advance to the the next empty cell following this on (create one if it didn’t automatically appear).

5. Creating the “saas_request_parameters_urlencoded” Table

Figure IEPP4.2

Paste the following Spark SQL into the next empty cell and execute (Figure IEPP4.2):

DROP TABLE IF EXISTS saas_request_parameters_urlencoded;

CREATE TABLE saas_request_parameters_urlencoded AS

  SELECT accountId

       , 'https://www.bamsaas.com/api' AS baseUrl

       , concat(

             '?transaction.endPoint.name=byStreetAddress'

           , '&transaction.endPoint.version=20170320'

           , '&transaction.endPoint.kind=Payload'

           , '&transaction.endPoint.format.output=JSON'

         ) AS serviceEndpoint

       , concat(

             '&transaction.clientCredentials.accessKey='

           , functionUrlEncodeValue("test.drive@assetlocationintelligence.com")

         ) AS accessKey

       , concat(

             '&transaction.clientCredentials.secretKey='

             , functionUrlEncodeValue("not.a.real.email.address")

         ) AS secretKey

       , concat(

             NVL2(

                 accountId

               , concat('&request.locations.0.trackingLabel=', functionUrlEncodeValue(accountId))

               , ''

             )

           , NVL2(

                 addressLine1

               , concat('&request.locations.0.componentized.street=', functionUrlEncodeValue(addressLine1))

               , ''

             )

           , NVL2(

                 addressLine2

               , concat('&request.locations.0.componentized.streetSecondary=', functionUrlEncodeValue(addressLine2))

               , ''

             )

           , NVL2(

                 city

               , concat('&request.locations.0.componentized.city=', functionUrlEncodeValue(city))

               , ''

             )

           , NVL2(

                 state

               , concat('&request.locations.0.componentized.stateProvinceRegionCode2=', functionUrlEncodeValue(state))

               , ''

             )

           , NVL2(

                 zip

               , concat('&request.locations.0.componentized.postalCode=', functionUrlEncodeValue(zip))

               , ''

             )

           , '&request.locations.0.componentized.country=USA'

         ) AS geolocationParameters

    FROM geolocation_example

   ORDER BY accountId

Note: The SaaS credentials (accessKey and secretKey values) which hardcoded within the above SQL are limited to 500 lookups within 24 hours. So, if you are using this Notebook and start receiving “lookup maximum count exceeded” errors (within the JSON itself), you can wait for up to 24 hours for the counter to reset to 0. Or, you can immediately request your own credentials (free 500 lookups a day) either via the SaaS Registration web page or by sending an email to contact@qalocate.com specifying you are wanting to register an account using the email address from which you sent the registration request. The process is currently manual. So, expect it to take anywhere up to to 24 hours before you receive an email indicating the account has been created.

OPTIONAL STEP: Paste the following Spark SQL into the next empty cell and execute:

SELECT *

  FROM saas_request_parameters_urlencoded

If you executed the optional step above, a table should appear below the SQL query showing a few rows from the “saas_request_parameters_urlencoded” table.

OPTIONAL STEP: Paste the following Scala code into the next empty cell and execute:

%scala

val df1 =

  spark.sql("SELECT * FROM saas_request_parameters_urlencoded")

df1.printSchema()

If you executed the optional step above, an ASCII tree representation of the “saas_request_parameters_urlencoded” table’s schema should appear below the Scala query and look something like this:

root

 |-- accountId: string (nullable = true)

 |-- baseUrl: string (nullable = true)

 |-- serviceEndpoint: string (nullable = true)

 |-- accessKey: string (nullable = true)

 |-- secretKey: string (nullable = true)

 |-- geolocationParameters: string (nullable = true)

Advance to the the next empty cell following this one.

6. Creating the “saas_request” Table

Paste the following Spark SQL into the next empty cell and execute:

DROP TABLE IF EXISTS saas_request;

CREATE TABLE saas_request AS

  SELECT accountId

       , concat(

             baseUrl

           , serviceEndpoint

           , accessKey

           , secretKey

           , geolocationParameters

         ) AS getUrl

    FROM saas_request_parameters_urlencoded

   ORDER BY accountId

OPTIONAL STEP: Paste the following Spark SQL into the next empty cell and execute:

SELECT *

  FROM saas_request

If you executed the optional step above, a table should appear below the SQL query showing a few rows from the “saas_request” table.

OPTIONAL STEP: Paste the following Scala code into the next empty cell and execute:

%scala

val df1 =

  spark.sql("SELECT * FROM saas_request")

df1.printSchema()

If you executed the optional step above, an ASCII tree representation of the “saas_request” table’s schema should appear below the Scala query and look something like this:

root

 |-- accountId: string (nullable = true)

 |-- getUrl: string (nullable = true)

Advance to the the next empty cell following this one.

7. Creating the “saas_response” Table with Call to SaaS

Paste the following Spark SQL into the next empty cell and execute:

DROP TABLE IF EXISTS saas_response;

CREATE TABLE saas_response AS

  SELECT accountId

       , functionUrlGet(getUrl) as jsonString

    FROM saas_request

   ORDER BY accountId

Figure IEPP7.1

OPTIONAL STEP: Paste the following Spark SQL into the next empty cell and execute (Figure IEPP7.1):

SELECT *

  FROM saas_response

If you executed the optional step above, a table should appear below the SQL query showing a few rows from the “saas_response” table.

OPTIONAL STEP: Paste the following Scala code into the next empty cell and execute:

%scala

val df1 =

  spark.sql("SELECT * FROM saas_response")

df1.printSchema()

If you executed the optional step above, an ASCII tree representation of the “saas_response” table’s schema should appear below the Scala query and look something like this:

root

 |-- accountId: string (nullable = true)

 |-- jsonString: string (nullable = true)

Advance to the the next empty cell following this one.

8. Creating the “saas_response_json” Table

Paste the following Scala code into the next empty cell and execute:

%scala

val df1 =

  spark.sql("SELECT jsonString FROM saas_response")

val df2 =

  spark.read.json(df1.as[String].rdd)

df2.createOrReplaceTempView("saas_response_json")

The above code transforms the JSON String into a Spark Dataframe. This enables using Spark SQL to query the contents of the JSON directly.

ATTENTION: The Scala code above was hard earned and is REALLY VALUABLE! Specifically, the “val df2 =...” implementation. IOW, this was one of the things I spent a number of hours hunting down and tweaking until it finally “just worked”. It wasn’t obvious and it wasn’t a trivial “google it” and find the solution (although, I hope this ends up being so for those who now follow me).

OPTIONAL STEP: Paste the following Spark SQL into the next empty cell and execute:

SELECT *

  FROM saas_response_json

If you executed the optional step above, a table should appear below the SQL query showing a few rows from the “saas_response_json” table. The columns will be the three names from the root JSON object; “request”, “response” and “transaction” (appearing in alphabetical order even though the raw JSON will show them in the order of “transaction”, “request” and “response”).

OPTIONAL STEP: Paste the following Scala code into the next empty cell and execute:

%scala

val df1 =

  spark.sql("SELECT * FROM saas_response_json")

df1.printSchema()

If you executed the optional step above, an ASCII tree representation of the “saas_response_json” table (not shown). It will be very long and show the JSON schema outline down to the edge nodes regardless of how deep the nesting might go.

9. Creating the “saas_response_json_extraction” Table

Paste the following Spark SQL into the next empty cell and execute:

DROP TABLE IF EXISTS saas_response_json_extraction;

CREATE TABLE saas_response_json_extraction AS

  SELECT response.locations[0].trackingLabel AS trackingLabel

       , response.locations[0].status.code AS status

       , response.locations[0].inputDelta.grade.letter AS letterGrade

       , response.locations[0].inputDelta.grade.number AS numberGrade

       , NVL2(

             response.locations[0].detail.aliLocatorValue.implicit

           , response.locations[0].detail.aliLocatorValue.implicit

           , response.locations[0].detail.aliLocatorValue.explicit

         ) AS aliLocator

    FROM saas_response_json

   ORDER BY trackingLabel

Note: The above Spark SQL query has just extracted data out of a column value containing JSON into a set of “flattened” column values. This makes Spark SQL very powerful and extraordinarily valuable.

OPTIONAL STEP: Paste the following Spark SQL into the next empty cell and execute:

SELECT *

  FROM saas_response_json_extraction

If you executed the optional step above, a table should appear below the SQL query showing a few rows from the “saas_response_json_extraction” table.

OPTIONAL STEP: Paste the following Scala code into the next empty cell and execute:

%scala

val df1 =

  spark.sql("SELECT * FROM saas_response_json_extraction")

df1.printSchema()

If you executed the optional step above, an ASCII tree representation of the “saas_response_json_extraction” table’s schema should appear below the Scala query and look something like this:

root

 |-- trackingLabel: string (nullable = true)

 |-- status: string (nullable = true)

 |-- letterGrade: string (nullable = true)

 |-- numberGrade: long (nullable = true)

 |-- aliLocator: string (nullable = true)

Advance to the the next empty cell following this one.

10. Creating the “saas_response_final” Table

Paste the following Spark SQL into the next empty cell and execute:

DROP TABLE IF EXISTS saas_response_final;

CREATE TABLE saas_response_final AS

  SELECT ge.accountId

       , ge.addressLine1

       , ge.addressLine2

       , ge.city

       , ge.state

       , ge.zip

       , ge.description

       , srje.status

       , srje.letterGrade

       , srje.numberGrade

       , srje.aliLocator

       , sr.jsonString

    FROM geolocation_example AS ge

       , saas_response AS sr

       , saas_response_json_extraction AS srje

   WHERE (ge.accountId == srje.trackingLabel)

     AND (ge.accountId == sr.accountId)

Figure IEPP10.1

OPTIONAL STEP: Paste the following Spark SQL into the next empty cell and execute (Figure IEPP10.1):

SELECT *

  FROM saas_response_final

If you executed the optional step above, a table should appear below the SQL query showing a few rows from the “saas_response_final” table.

OPTIONAL STEP: Paste the following Scala code into the next empty cell and execute:

%scala

val df1 =

  spark.sql("SELECT * FROM saas_response_final")

df1.printSchema()

If you executed the optional step above, an ASCII tree representation of the “saas_response_final” table’s schema should appear below the Scala query and look something like this:

root

 |-- accountId: string (nullable = true)

 |-- addressLine1: string (nullable = true)

 |-- addressLine2: string (nullable = true)

 |-- city: string (nullable = true)

 |-- state: string (nullable = true)

 |-- zip: string (nullable = true)

 |-- description: string (nullable = true)

 |-- status: string (nullable = true)

 |-- letterGrade: string (nullable = true)

 |-- numberGrade: long (nullable = true)

 |-- aliLocator: string (nullable = true)

 |-- jsonString: string (nullable = true)

Advance to the the next empty cell following this one.

11. Creating the “saas_response_final” TSV File

ATTENTION: The code below was hard earned and is REALLY VALUABLE! Specifically, emitting the JSON into a .tsv without all the double quotes within the JSON being escaped by a backslash (that is a stupid amount of backslashes). And also attempting to figure out how to extract the .tsv results as a file from Databricks to a local file was challenging (especially the very last bit where it emits the HTTPS URL as a value in a table). IOW, this was one of the things I spent a number of hours hunting down and tweaking until it finally “just worked”. It wasn’t obvious and it wasn’t a trivial “google it” and find the solution (although, I hope this ends up being so for those who now follow me).

Paste the following Scala code into the next empty cell and execute:

%scala

import org.apache.hadoop.fs._

case class Content(asDownloadUrl: String, asDbfsPath: String)

def tableWriteFileTsv(

    pathRootFileStore: String

  , subPathDirectory: String

  , tableName: String

): org.apache.spark.sql.Dataset[Content] = {

  //returns a tuple:

  //  (

  //       pathDirectory containing one more fileparts

  //     , pathFile to single merged tsv file

  //     , url from which to download final single file

  //   )

  def tableWriteTsvParts: (String, String, String) = {

    val pathDirectory =

      s"/$pathRootFileStore/$subPathDirectory"

    val pathDirectoryTable =

      s"$pathDirectory/$tableName"

    val pathDirectoryFileTsv =

      s"$pathDirectoryTable.tsv"

    val unitDirectoryRemove =

      dbutils.fs.rm(pathDirectoryTable, true)

    val unitFileRemove =

      dbutils.fs.rm(pathDirectoryFileTsv, true)

    val dataFrameTable =

      spark.sql(s"SELECT * FROM $tableName")

    val unitWriteTsv =

      dataFrameTable

        .write

        .option("header", "true")

        .option("delimiter", "\t")

        .option("quote", "\u0000")

        .csv(pathDirectoryTable)

    (

        pathDirectoryTable

      , pathDirectoryFileTsv

      , s"https://community.cloud.databricks.com/files/$subPathDirectory/$tableName.tsv"

    )

  }

  def fileSystemMerge(sourcePathDirectory: String, destinationPathFile: String): Unit = {

    val configuration =

      new org.apache.hadoop.conf.Configuration()

    val fileSystem =

      FileSystem.get(configuration)

    FileUtil.copyMerge(

        fileSystem

      , new Path(sourcePathDirectory)

      , fileSystem

      , new Path(destinationPathFile)

      , false

      , configuration

      , null

    )

  }

  val (sourcePathDirectoryFileParts, destinationPathFileMerged, urlDownloadTsv) =

    tableWriteTsvParts

  val unitMerge =

    fileSystemMerge(sourcePathDirectoryFileParts, destinationPathFileMerged)

  List(

    Content(

        urlDownloadTsv

      , destinationPathFileMerged

    )

  ).toDS

}

val fileStoreRoot =

  "FileStore"

val directoryTsv =

  "tsv"

val tableName =

  "saas_response_final"

val dataFrameUrlDownloadTsv =

  tableWriteFileTsv(fileStoreRoot, directoryTsv, tableName)

display(dataFrameUrlDownloadTsv)

The above code emits the “saas_response_final” table as a set of (tab) delimited files into a folder. Then, it merges those various files into a single file. And finally, it generates a Dataset which is displayed at the conclusion of the operation. The value in the first row and column can be used in a browser to download the file locally (Google’s Chrome will offer to do so directly if you highlight the full URL’s value and then select the right mouse button pop-up menu. Selecting the second item in that menu which starts with “Go to https://…” will result in the file automatically being downloaded into your default local download folder on your PC or device.

If you just read through the steps and didn’t follow along on your own Databricks community account, you can peek at what the final results would look like in this “saas_response_final.tsv” file which was a direct result of following all the steps above (and which also includes the extension renaming to “.tsv” from “.csv”).

Summarizing

Just to repeat, because it is so important:

ATTENTION: Because this is a HUGE and very generous value being offered by Databricks at cost to themselves and no cost to you, I would sincerely appreciate and respectfully request you acknowledge and honor their generosity by ensuring you minimize the costs to them as best as you can.

The primary way you can help minimize costs for Databricks is to explicitly terminate (i.e shut down) your cluster when you are finished with a session. IOW, while Databricks will automatically terminate your cluster after 2 hours of inactivity, they continue to be charged by AWS the entire time the cluster remains up and is sitting idle. So, please do the respectful and responsible thing and terminate the cluster you are using when you finish a working session. 

Now that you have worked through all of the steps above, you should be able to see just how easy it is to get Apache Spark SQL (with bits of help from Scala) to attach to an external SaaS for trivial extraction and augmentation to existing internal data sets. And you get all of the benefits of Databrick’s incredibly scalable Apache Spark implementation on AWS, you also get all of the benefits of a cloud based solution instead of having to endure the costs of attempting to stand up an Apache Spark instance yourself and/or begging your CFO (or CIO) for funds to get started and then going through all the work of getting the IT department’s data warehouse team to support your experimenting and exploring possible solution pathways. Enjoy all the power that is Apache Spark on AWS!

Disclaimer: I am not in any way related to or compensated by Databricks. My recommendation you get a Databricks community account is entirely driven by my desire for you to be able to directly follow along the solution as I present it, and nothing else.