Mohamed Fayaz
Mohamed Fayaz

Mohamed Fayaz

Simple Spark Application to run on EMR

Simple Spark Application to run on EMR

Mohamed Fayaz's photo
Mohamed Fayaz
ยทMay 23, 2019ยท

3 min read

Subscribe to my newsletter and never miss my upcoming articles

Apache Spark Application

In this post, we will be looking at how to submit a Spark job in AWS EMR. For this, we have an application that converts any given files to Avro or Parquet format using Apache Spark.

Project Folder Structure

  |-- project/
  |   |-- build.properties
  |   |-- plugins.sbt
  |-- target/
  |-- src/
  |   |-- main
  |   |-- test
  |-- build.sbt

Based on the above structure, the project folder contains all the configurations including build properties and sbt setup. target folder contains the artifacts which include the assembly JAR and other build files. The src folder has all the source code including the business logic to convert file types and unit test suits using the ScalaTest framework.

Scala Functions

Spark session initialisation

To run any Spark application we would need to initialise the Spark session first which is present in the src/main/scala/au/fayaz/Utils.scala file :

def initiateSpark(): SparkSession = {
val spark = SparkSession.builder.master("local[*]").appName("File Converter").getOrCreate()
spark
}

The initiateSpark function will return a session to perform our DataFrame operations. In this example, we will be converting the file to Parquet or Avro based on the parameter.

def writeOutput(dataframe: DataFrame,
outputPath: String, format: String): = {
  if (format == "parquet") {
  dataframe
      .write
      .option("compression", "snappy")
      .mode("overwrite")
      .parquet(outputPath)
      } 
  else
      dataframe
          .write
          .format(
            "com.databricks.spark.avro"
            )
          .mode("overwrite")
          .save(outputPath)
}

The writeOutput function takes the parameters of dataframe, outputPath and format. Based on the format it performs a simple write operation to save the file in an appropriate format with the right compression.

To Use this Code

To make use of this code, you can simply follow the below steps to be able to run in your EMR environment.

Prerequisites

  • Git installed in your local machine
  • Scala Installed in your local machine
  • EMR Cluster is ready for use

Once you have both, all you need to do is a simple clone of this repository and run the steps below:

Step 1: git clone https://github.com/fayaz92/file-converter-spark-scala.git.

Step 2: Open your IDE and import this project, my choice is IntelliJ in this case.

Step 3: From the Command Line, type sbt test which will perform the unit testing to ensure your code is working as expected.

Step 4: From the command line, type sbt assembly which build the artifacts as .JAR file.

Step 5: Now to run this code in your EMR, you would need to copy this .jar file to S3.

Step 6: Once you have the JAR file in S3, now you are ready to run the code. So SSH into the EMR cluster and run the below command.

spark-submit --class au.fayaz.Converter 
s3://yourbucket/converter-assembly-0.1.jar 
s3://yourbucket/inputFiles/sample.csv 
s3://yourbucket/outputFiles/20190524/ 
parquet

๐ŸŽ‰ Now you should get a converted Parquet file in the output location !!

Thank you for reading and click here to download this application.

Did you find this article valuable?

Support Mohamed Fayaz by becoming a sponsor. Any amount is appreciated!

See recent sponsors |ย Learn more about Hashnode Sponsors
ย 
Share this