Apache Spark:
language used scala:
to start spark
spark-shell for scala
pyspark for python
include modules :
import org.apache.spark.sql.SQLContext
import sqlContext.implicits._
import com.databricks.spark.xml._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types.{StructType, StructField, StringType,
DoubleType};
type(df.describe())
df.select("_id","author","description").show()
hdfs://quickstart.cloudera:8020/user/cloudera/
MAP and FlapMaP example:
mountain@mountain:~/sbook$ cat words.txt
line1 word1
line2 word2 word1
line3 word3 word4
line4 word1
scala> val lines = sc.textFile("words.txt");
...
scala> lines.map(_.split(" ")).take(3)
res4: Array[Array[String]] = Array(Array(line1, word1), Array(line2, word2, word1), Array(line3,
word3, word4))
A flatMap() flattens multiple list into one single List
scala> lines.flatMap(_.split(" ")).take(3)
res5: Array[String] = Array(line1, word1, line2)
REDUCE :
val rdd1 = sc.parallelize(List(20,32,45,62,8,5))
val sum = rdd1.reduce(_+_)
ReduceByKey:
val words = Array("one","two","two","four","five","six","six","eight","nine","ten")
val data_RDD = sc.parallelize(words)
val mapped_RDD = data_RDD.map(w => (w,1))
mapped_RDD.take(10)
val reduced_RDD = mapped_RDD.reduceByKey(_+_)
reduced_RDD.take(10)
FilTER:
val data_RDD =
sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/temperature_2014.csv")
data_RDD.take(100)
val FL_mapped_RDD = data_RDD.flatMap(lines => lines.split(","))
FL_mapped_RDD.take(20)
RDD to DATAFRAME:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql._
/ this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
/ load the data into a new RDD
val ebayText = sc.textFile("/home/jovyan/work/datasets/spark-ebook/ebay.csv")
// Return the first element in this RDD
ebayText.first()
//define the schema using a case class
//class name starts with capital letter
case class Auction(auctionid: String, bid: Float, bidtime: Float,bidder: String,
bidderrate: Integer, openbid: Float, price: Float,item: String, daystolive: Integer)
// create an RDD of Auction objects
val ebay = ebayText.map(_.split(",")).map(p
=>Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,
p(6).toFloat,p(7),p(8).toInt))
/ Return the first element in this RDD
ebay.first()
// change ebay RDD of Auction objects to a DataFrame
val auction = ebay.toDF()
/ How many bids per item?
auction.groupBy("auctionid", "item").count.show
auction.select("auctionid").distinct.count()
// Get the auctions with closing price > 100
val highprice = auction.filter("price > 100")
highprice.show()
/ register the DataFrame as a temp table
auction.registerTempTable("RDD_table")
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// How many bids per auction?
val results = sqlContext.sql("SELECT auctionid, item, count(bid) FROM
RDD_table GROUP BY auctionid, item")
referrences:
https://mapr.com/ebooks/spark/05-processing-tabular-data-with-spark-sql.html
https://www.supergloo.com/fieldnotes/spark-sql-csv-examples-python/
http://sparktutorials.net/Opening+CSV+Files+in+Apache+Spark+-
+The+Spark+Data+Sources+API+and+Spark-CSV