Big Data: Amazon EMR, Apache Spark, and Apache Zeppelin – Part 1 of 2

Amazon EMR (Elastic MapReduce) provides a platform to provision and manage Amazon EC2-based data processing clusters.

Amazon EMR clusters are installed with different supported projects in the Apache Hadoop and Apache Spark ecosystems. You can either choose to install from a predefined list of software, or pick and choose the ones that make the most sense for your project.

In this article, the first in a two-part series, we will learn to set up Apache Spark and Apache Zeppelin on Amazon EMR using AWS CLI (Command Line Interface). We will also run Spark’s interactive shells to test if they work properly.

What is Apache Spark?

Amazon EMR
Apache Spark is the first non-Hadoop-based engine that is supported on EMR. Known to be more efficient than Hadoop, Spark can run complex computations in memory. It also supports different types of workloads including batch processing and near real-time streaming.

What is Apache Zeppelin?

Amazon EMR
Apache Zeppelin is a web-based notebook for data analysis, visualisation and reporting. Zeppelin lets you perform data analysis interactively and view the outcome of your analysis visually. It supports the Scala functional programming language with Spark by default. If you have used Jupyter Notebook (previously known as IPython Notebook) or Databricks Cloud before, you will find Zeppelin familiar.

Our assumptions

  • We will assume that the AWS CLI tools have been installed.
  • We will also assume that an IAM (Identity and Access Management) user has been created with AmazonElasticMapReduceFullAccess managed policy attached to it, and that CLI has been configured to use its access key ID and secret access key. This policy gives CLI full access to EMR.
  • Make sure that CLI is configured to use the us-east-1 (N. Virginia) region by default as the dataset that we will use in our next article, is hosted on Amazon S3 in that region.
  • And finally, we will assume that a key pair has been created so that we can SSH into the master node, if necessary.

Creating an EMR cluster

We can easily set up an EMR cluster by using the aws emr create-cluster command.

$ aws emr create-cluster --name "Eugene's Spark Cluster" --release-label \
   emr-4.3.0 --applications Name=Spark Name=Zeppelin-Sandbox \
   --ec2-attributes KeyName=cloudacademy-keypair --instance-type m3.xlarge \
   --instance-count 3 --use-default-roles
{
    "ClusterId": "j-ABCDEFGHIJKLM"
}

We will use the latest EMR release 4.3.0. We will install both Spark 1.6.0 and Zeppelin-Sandbox 0.5.5. Using --ec2-attributes KeyName= lets us specify the key pair we want to use to SSH into the master node.
Let’s use one master node and two core nodes of m3.xlarge EC2 instance types. Our data analysis work will be distributed to these core nodes.
There are many other options available and I suggest you take a look at some of the other solutions using aws emr create-cluster help.

Waiting for the cluster to start

After issuing the aws emr create-cluster command, it will return to you the cluster ID. This cluster ID will be used in all our subsequent aws emr commands.
You can view the details of the cluster using the aws emr describe-cluster command.

$ aws emr describe-cluster --cluster-id j-ABCDEFGHIJKLM
{
    "Cluster": {
[...]
        "Name": "Eugene's Spark Cluster",
        "ServiceRole": "EMR_DefaultRole",
        "Tags": [],
        "TerminationProtected": false,
        "ReleaseLabel": "emr-4.3.0",
[...]
        "InstanceGroups": [
            {
[...]
                "Name": "CORE",
                "InstanceGroupType": "CORE",
                "EbsBlockDevices": [],
                "Id": "ig-ABCDEFGHIJKLX",
                "Configurations": [],
                "InstanceType": "m3.xlarge",
                "Market": "ON_DEMAND",
[...]
                "Name": "MASTER",
                "InstanceGroupType": "MASTER",
                "EbsBlockDevices": [],
                "Id": "ig-ABCDEFGHIJKLY",
                "Configurations": [],
                "InstanceType": "m3.xlarge",
                "Market": "ON_DEMAND",
[...]
        "Applications": [
            {
                "Version": "1.6.0",
                "Name": "Spark"
            },
            {
                "Version": "0.5.5",
                "Name": "Zeppelin-Sandbox"
            }
        ],
[...]
}

We are more interested in the state of the cluster and its nodes. It will take some time for the cluster to be provisioned.

$ while :; do aws emr describe-cluster --cluster-id j-ABCDEFGHIJKLM | grep \"State\"\:; sleep 5s; done
            "State": "STARTING",
                    "State": "PROVISIONING",
                    "State": "PROVISIONING",
            "State": "STARTING",
                    "State": "PROVISIONING",
                    "State": "PROVISIONING",
[...]

When the provisioning is completed, the Spark cluster should be WAITING for steps to run, and the master and core nodes should indicate that they are RUNNING.

$ aws emr describe-cluster --cluster-id j-ABCDEFGHIJKLM | grep \"State\"\:
            "State": "WAITING",
                    "State": "RUNNING",
                    "State": "RUNNING",

SSH to the master node

Now we can connect to the master node from remote. Instead of running ssh directly, we can issue the aws emr ssh command. It will automatically retrieve the master node’s hostname.

$ aws emr ssh --cluster-id j-ABCDEFGHIJKLM \
   --key-pair-file ./cloudacademy-keypair.pem
ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=10 \
   -i ./cloudacademy-keypair.pem hadoop@ec2-[redacted].compute-1.amazonaws.com
Last login: Tue Feb 23 14:05:11 2016
       __|  __|_  )
       _|  (     /   Amazon Linux AMI
      ___|\___|___|
https://aws.amazon.com/amazon-linux-ami/2015.09-release-notes/
31 package(s) needed for security, out of 43 available
Run "sudo yum update" to apply all updates.
EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR
[hadoop@ip-[redacted] ~]$

Spark’s Scala shell

We will not cover the Spark programming model in this article but we will learn just enough to start an interpreter on the command-line and to make sure it work.
Spark supports Scala, Python and R. We can choose to write them as standalone Spark applications, or within an interactive interpreter.
For Scala, we can use the spark-shell interpreter.

[hadoop@ip-[redacted] ~]$ spark-shell
16/02/23 14:11:44 INFO SecurityManager: Changing view acls to: hadoop
16/02/23 14:11:44 INFO SecurityManager: Changing modify acls to: hadoop
16/02/23 14:11:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
16/02/23 14:11:44 INFO HttpServer: Starting HTTP Server
16/02/23 14:11:44 INFO Utils: Successfully started service 'HTTP class server' on port 34274.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/
Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.7.0_95)
[...]
16/02/23 14:12:13 INFO SparkILoop: Created spark context..
Spark context available as sc.
[...]
16/02/23 14:12:31 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

To make sure that everything works, issuing both sc and sqlContext should return to you the addresses to the respective objects.

scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@201229dd
scala> sqlContext
res1: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@42dbc279

Spark’s Python shell

For fellow Pythonistas, we can use pyspark instead. The Spark APIs for all the supported languages will be similar.

[hadoop@ip-[redacted] ~]$ pyspark
Python 2.7.10 (default, Dec 8 2015, 18:25:23)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
[...]
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/
Using Python version 2.7.10 (default, Dec 8 2015 18:25:23)
SparkContext available as sc, HiveContext available as sqlContext.
>>> sc
<pyspark.context.SparkContext object at 0x7f2b08e60bd0>
>>> sqlContext
<pyspark.sql.context.HiveContext object at 0x7f2b09677dd0>
>>>

Spark’s R shell

And for R developers, you can use sparkR.

[hadoop@ip-[redacted] ~]$ sparkR
R version 3.2.2 (2015-08-14) -- "Fire Safety"
Copyright (C) 2015 The R Foundation for Statistical Computing
Platform: x86_64-redhat-linux-gnu (64-bit)
[...]
Launching java with spark-submit command /usr/lib/spark/bin/spark-submit "sparkr-shell" /tmp/RtmprThwAD/backend_port27937c957383
16/02/23 14:14:30 INFO SparkContext: Running Spark version 1.6.0
[...]
 Welcome to
    ____              __
   / __/__  ___ _____/ /__
  _\ \/ _ \/ _ `/ __/  '_/
 /___/ .__/\_,_/_/ /_/\_\   version  1.6.0
    /_/
 Spark context is available as sc, SQL context is available as sqlContext
> sc
Java ref type org.apache.spark.api.java.JavaSparkContext id 0
> sqlContext
Java ref type org.apache.spark.sql.SQLContext id 1
>

Terminating the EMR cluster

Always remember to terminate your EMR cluster after you have completed your work!

$ aws emr terminate-clusters --cluster-id j-ABCDEFGHIJKLM
$ aws emr describe-cluster --cluster-id j-ABCDEFGHIJKLM | grep \"State\"\:
            "State": "TERMINATING",
                    "State": "TERMINATING",
                    "State": "TERMINATING",
            "State": "TERMINATED",
                    "State": "TERMINATED",
                    "State": "TERMINATED",

What’s next?

We have learned to install Spark and Zeppelin on EMR. I also showed you some of the options for using different interactive shells for Scala, Python, and R. These development shells are a quick way to test if your setup is working properly. Anyone who is new to Spark, or would like to experiment with small snippet of code can use these shells to test code interactively. If you have programmed in either one of these three languages before, it is very likely that you would have used an interactive shell before. The experience should be the same.
Of course, this is not the only way to develop for the Spark. In our next article, we will learn to use Zeppelin to develop code interactively on the web browser. We will look at a simple data analysis example using Scala. I welcome your comments and questions, and will do my best to integrate them into the next article if you post in time. Chandan Patra published a related post back in November, Amazon EMR: five ways to improve the way you use Hadoop that you will find useful and interesting.

 

Cloud Academy