adsensecode1
1. Objective
This tutorial will take you through Apache Spark shell commands list to perform common operations of Apache spark. You can perform various transformations and actions like creation of RDDs, filter operation, partitions, cache, count, collect, etc. We will also discuss about integration of spark with hadoop, spark will read the data from hdfs and write to hdfs. This is a Apache spark beginners guide with step by step list of basic spark commands/operations to interact with spark shell.
Before starting you should understanding What is Spark and you must have spark installed, to install Apache spark you can follow this tutorial.
Before starting you should understanding What is Spark and you must have spark installed, to install Apache spark you can follow this tutorial.
2. Spark Commands
Start the Spark Shell:
Spark is shipped with an interactive shell / scala prompt, as spark is developed in Scala. Using the interactive shell we will run different commands (transformation / action) to process the data
1
| $bin /spark-shell |
2.1.1 Create a new RDD
Read File from local file-system and create a RDD
1
| scala> val data = sc.textFile( "data.txt" ) |
Note: sc is the object of SparkContext
Note: You need to create a file data.txt in Spark_Home directory
2.1.2 Create a new RDD
1
2
| scala> val no = Array(1, 2, 3, 4, 5,6,7,8,9,10) scala> val noData = sc.parallelize(no) |
These are two methods to create the RDD. The first method is used when data is already available with the external systems like local filesystem, HDFS, HBase, Cassandra, S3, etc. RDD can be created by calling textFile method of Spark Context with path / URI as the argument. While the second approach can be used with the existing collections.
2.3 Number of Items in the RDD
Count the number of items available in the RDD. To count the items we need to call Action
1
| scala> data. count () |
2.4 Filter Operation
Filter the RDD and create new RDD of items which contains word “DataFlair”. To filter, we need to call transformation filter, which will return a new RDD with subset of items
1
| scala> val DFData = data.filter(line => line.contains( "DataFlair" )) |
2.5 Transformation and Action together
For complex requirements, we can chain multiple operations together like filter transformation and count action together
1
| scala> data.filter(line => line.contains( "DataFlair" )). count () |
2.6 Read the first item from the RDD
1
| scala> data.first() |
2.7 Read the first 5 item from the RDD
1
| scala> data.take(5) |
2.8 RDD partitions
An RDD is made up of multiple partitions, to count number of partitions:
1
| scala> data.partitions.length |
Note: Minimum no of partitions in the RDD is 2 (by default). When we create RDD from HDFS file then number of blocks will be equals to number of partitions
2.9 Cache the file
Caching is the optimization technique. Once we cache the RDD in the memory all future computation will work on the in-memory data, which saves disk seeks and improve the performance
1
| scala> data.cache() |
RDD will not be cached once you run above operation, you can visit the web UI: http://localhost:4040/storage, it will be blank. RDDs are not explicitly cached once we run cache(), rather RDDs will be cached once we run the action, which actually needs data read from the disk.
Let’s run some actions:
1
| scala> data. count () |
1
| scala> data.collect() |
Now as we have run some actions on the data file, which needs to be read from the disk to perform those operations. During this process Spark will cache the file, so that for all future operations will get the data from the memory (no need of any disk interaction). Now if we run any transformation or action it will be done in-memory and will be much faster.
2.10 Read Data from HDFS file
To read data from HDFS file we can specify complete hdfs uri like hdfs://IP:PORT/PATH
1
|
2.11 WordCount Operation
One of the most popular operation of MapReduce – Wordcount. Count all the words available in the file
1
| scala> val wc = hFile.flatMap(line => line.split( " " )).map(word => (word, 1)).reduceByKey(_ + _) |
Read the result on console
1
| scala> wc.take(5) |
It will display first 5 results
2.12 Write the data to HDFS file
1
|