Apache Spark Essential Training
Apache Spark Essential Training
Apache Spark Essential Training
Understanding Spark
Selecting transcript lines in this section will navigate to timestamp in the video
- [Narrator] First let's take a look and understand what Apache Spark really is. In short,
Apache Spark is a fast and general engine for large-scale data processing. It really was
designed more specifically for data science, but has evolved to support even more use
cases and including real-time stream event processing. There are four main reasons to
use Spark: speed, ease of use, generality, and we'll get into what that really means, and
also because it runs everywhere it's platform agnostic. You can run Spark on top of
Hadoop and all kinds of other platforms so we'll take a look at that here in a
second.Spark has an advanced Dag engine that can achieve up to a hundred times
faster processing than traditional MapReduce jobs on Hadoop. This is largely due to
how Spark distributes the execution across it's cluster and performs many of its
operations in memory. This chart here, for example, shows the run times of logistic
regression, a common task we need to perform in data science, both on Hadoop using
MapReduce and on Spark. Besides running near 100x speeds for in memory, Spark still
boasts a 10x improvement when the operations are performed on-disk. Spark supports
Java, Scala, Python, and R natively, as well as ANSI Standards SQL. It offers 80 high-level
operatorsmaking it fast and easy to build applications, even including parallelization and
streaming. These apps can run interactively from the shell or from popular interfaceslike
Jupyter Notebooks, Apache Zeppelin, as well as the command shell. Depending on your
configuration, many popular analytics platforms like Tableau can connect directly using
Spark SQL. Here's a simple example showing the Python API, called PySpark, on how to
do a simple word count application. As you can see, we're able to use the map function
with lambdas and also use reduced functions to come up with the actual word count
here. It's as simple as that. This two little lines of code here to actually generate that
word count. When we talk about generality, we're talking about all the different
things that Spark can do. Now using the Notebook style interface, Spark offers the
ability to combine all of its components to make a comprehensive data product. Spark
implements SQL, Streaming, machine learning, and graphing using open-source
libraries built into the platform. This is really handy when you're working on an app and
need some other library functionality from a different component of the
platform.Besides running in nearly any environment, you can access the data in the
Hadoop distributed file system, known as HTFS, Cassandra, HBASE (another popular no
SQL database that offers transactional consistency), Hive, or any other Hadoop data
source.In Spark 2.0 you can also connect directly to traditional relational
databases using dataframes in Python and Scala. The idea here is that in order for Spark
to be successful, it has to play well with others. Which is the model I've been preaching
for a while now. Simply put, if a framework or tool doesn't work with the other popular
systems out there, it will fail. Just do a quick Google search for vendor lock-in to get a
better idea of what I'm talking about.
Origins of Spark
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] Let's take a look now at the origins of Spark. To go back to the origins of
our modern data ecosystems which started with Hadoop, we have to start in 2003. In
2003 some developers started working on this new idea of creating an open distributed
computing platform. These guys started a project called Nutch. Later in 2006 those
same developers were hired by Yahoo and released this project as an open source
project called Hadoop. Around the same time Google created a Java interface for
working with its data called MapReduce. As Hadoop grew in popularity to store massive
volumes of data, another new startup with massive volumes of data, Facebook, wanted
to provide their data scientists and analysts an easier way to work with the data in
Hadoop. So, they created Hive. At this point we have two ways of interacting with
Hadoop data, through MapReduce which is Java based, batch oriented and pretty
slow.Then we have Hive which is basically a SQL abstraction on top of MapReduce. So,
while it's easier to write the queries, the same issues of being batch oriented and
slow still persisted. This is where Spark was born. At this point lots of tech
companies were adopting Hadoop and other big data solutions, yet there weren't really
any great interfaces for data scientists to use, so they created their own project which
wasn't dependent upon Hadoop. So, in 2009 a few folks at UC Berkeley started a new
project to provide easier access to big data for data scientists. This is the actual
inception of Spark. A year later the team open sourced Spark using the BSD license and
the world was introduced. A full four years later, actually three years in 2013, the team
donated their code to Apache but it wasn't until 2014 when Spark became an official
top-level Apache project, which is a big deal. This means that Spark is now going to
have a huge community of data professionals supporting and evolving the platform.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] In order to really understand Spark we need to take a look at all of its
components. In this section, we're going to take a closer look at each one of these major
components, so going forward we'll have a better understanding of how all these pieces
fit together. First, we have Spark Core, and this is the main component from which all
the others are based. This is the foundation of the house, if we are thinking of a
framework as a house. Now Spark Core includes task distribution, scheduling, and the
input/output operations. So when we execute something in Spark, whether it be SQL,
machine learning, a graph, or any of the other components, we essentially are using that
one plus Spark Core to actually do and execute the functions that we're calling. After
Spark Core, we have Spark SQL, and this is the way that many data professionals do
their work. And with this component it supports the ANSI standard SQL language, which
is really good, because millions of people know SQL already, so this really opens up the
possibilities with Spark for data science. You need to first create or at least define table
structures in Spark before being able to use SQL, but once you have it, the engine
processes it flawlessly. So another great advantage of having strong SQL support is that
enables tools like Tableau to easily integrate with Spark. Some of the things to
understand about Spark SQL is that use DataFrames. So DataFrames are a familiar term
for data science. In R we have DataFrames themselves, so they're a one-to-one with
what a Spark DataFrame is. In Python, if that's your language of choice, this might be
considered pandas. Very similar concept, they function exactly the same way in Spark as
pandas do in Python. And in SQL, if you're more familiar with that, a DataFrame can be
thought of as a table. Now Spark Streaming is the component that enables our jobs to
process new data as it comes in. To be specific, Spark Streaming runs micro
batches,which operate just like normal batch operations, just much more frequently on
smaller datasets. This provides a streaming result to our user, although it isn't true
stream processing, like some of the other ways that people accomplish the same
feat. This works using an Lambda architecture, in case you're not familiar with that. And I
only mention it because you'll hear people talk about it a lot, but basically it's a
method of taking historical data to start and then as new data comes in you start
aggregating that number, that count, the metric, or whatever it is that you're looking to
actually show to your user. For example, think about Twitter followers or likes on posts
on Facebook,when you first post it Twitter or Facebook will actually query to figure out
how many of those things have already occurred, and then as new people or as new
events occurthey will actually just update that aggregation, it's not going to go requery
the underlying data, because that just takes far too long and you want it to be
realtime. So that Lambda architecture is the traditional way in which most streaming
occurs. The MLlib component of Spark is what enables our machine learning algorithms
to run.Compared to Apache Mahout, which runs on disk instead of memory, benchmark
tests show about a 9 times improvement, or 9x. This component includes many of the
common machine learning functions you'll need and it being open source, you most
likely could find anything else specific you need that isn't out of the box from the
community. Now GraphX is a way to do graph processing in Spark. I'm not talking about
making charts here, but running graph database jobs where you do things like identify
relationships between entities in your data and then query that using those
relationships. GraphX is often referred to as the in-memory version of Apache Giraph,so
if you're more interested in this component you could easily do a lookup on Apache
Giraph and dig into the details there. One thing to note is that this component is based
on DataFrames, so it's not a true graph database implementation, and shouldn't be used
in place of one. SparkR is a new component of the Spark framework as of version
2.0.1and basically provides an interface for connecting your Spark cluster from the R
statistical package. This package provides Distributed DataFrames, which are
comparable to DataFrames in R, just using Spark's method of actually distributing them
on the cluster. This is also how you can integrate R Studio with Spark.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] Let's take a look here at where Spark actually shines. Specific areas that
Spark shines are data integration or ETL, which stands for extract, transform and
load,and this is the process of moving data from one location to another and
standardizing it so it's easy to use by analysts and data scientists. Machine learning,
which is really pushing computing forward with its ability to find hidden patterns in vast
amounts of data, business intelligence and analytics, which I would include data science
in as well,is still the biggest area compared to everything else here, and thankfully it
does it very well. Real-time processing is a new area many businesses are exploring and
some are advancing with things like fraud protection and real-time user feedback. And
last, our recommendation engines where you can help users find more value in your
product or service by bringing relevance to their experience. When it comes to using
Spark, a recent survey done by DataBricks shows that 74% of respondents use two or
more components and 64% use three or more. I bring this up because the way Spark
works,the different components all play well together, so when you're looking at
Spark, you should think of it as a collection of tools and not just one thing you may be
interested in at the moment. Long term, this platform could help with many of your
needs today and going forward. As far as which languages people are using, Scala and
Python top the listwith SQL a close third. This isn't surprising because a lot of big data
professionals are already familiar with either Scala or Python on Hadoop and of
course, SQL being the most universal data language, I'm guessing that over time we'll
see a bit of a shift and SQL might become the most popular language being used with
Spark. Also from the DataBricks survey I wanted to share this and give you some
ideas of what people are actually building with Spark. Again, business intelligence and
analytics top the list, showing yet again that no matter how the tools or systems
evolve, the basic idea of knowing what's going on with your business drives platform
adoption and technology buying decisions. Some other interesting ones on here that
I've mentioned are log processing, user-facing services and that fraud detection and
analysis.
Overview of Databricks
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] Now, let's take a look at Databricks. Databricks is a managed platform for
running Apache Spark. That means that you don't have to learn complex cluster
management concepts, nor perform tedious maintenance tasks to take advantage of
Spark. Databricks also provides a host of features to help users become more productive
with Spark. It's a point and click platform for those that prefer user interfaces like data
scientists or data analysts; however, the UI is a company by a sophisticated APIfor those
that want to automate aspects of their data workloads with automated jobs.Now,
Databricks also includes certain features that we're not going to look at in this course. I
will mention them, but we're going to use the Community Edition, the free version of
it, so that way you can follow along without having to worry about any real set up or
anything. Just log in and go. So, you have essentially two different things to consider
here. Databricks is an implementation of Spark to help reduce complexity of setup and
operation. We're going to look at the Community Edition, and I'll show you those
features here in a second. But there's also the full platform, which you could actually run
your jobs on for your business. Now, Databricks was created by the people that created
Spark at UC Berkeley, so you can guarantee that they are going to have all the
features and everything that you're going to want to use. Apache Spark though, is 100%
open source. So, you don't actually need Databricks to run it, but for the sake of
learning how to use it, I think Databricks is just a much better option. Taking a look at
the different Databricks editions, we have the Community Edition, which gives us a mini
six gigabyte cluster, interactive notebooks and dashboards, and a public environment
that we can share with others. The full platform, in contrast, gives you unlimited
clusters, notebooks, dashboards, production jobs, and APIs, an interactive guide to
Spark and Databricks. You can deploy it to your Amazon web services private
cluster.You can integrate BI tools, and it also comes with a 14 day free trial. Now, in
terms of Databricks terminology, there are some things we need to lay out here. So,
workspaces.This is essentially a collection of where all of your files are. Think of it like a
folder structure. A notebook is an actual piece of paper, if you will. It's a digital version
of that,where you can have all different types of languages being executed. So, I can
have Scala and Python, Markdown, Sequel, all on the same page running and actually
giving youthe narrative of the analysis I'm doing. Libraries are things that we can
actually use to extend the functionality of our Databricks platform. Think NumPy for
Python. We can pull that in and use those functions alongside the other ones that come
out of the box with Spark. Tables are just like they are in databases. There are sequel
tables that we can create, and you can then run your Spark sequel jobs against
them. Clusters are what we need to execute our Spark jobs, and we get a mini six
gigabyte cluster for free.Jobs are the way we can actually schedule our data
workloads. And, that's not available in the Community Edition, but it is available in the
full Edition. So, we won't actually be digging into that in this course, but just to know
this is a way that you can automate a lot of those daily or nightly tasks that you would
normally execute manually. Now, let's take a look. And, let's actually dive in. Log into the
Databricks Community Edition, and then browse some sample files. So, here at
community.cloud.databricks.com, we have our login to the Community Edition. Now, if
you don't have an account, you'll want to click sign up and go through that process, and
then come back and log in here. Once you do so, you're welcomed by Databricks with
introduction to Apache Spark on Databricks,which is a great tutorial that I think you
should go through. The quick start on data frames, and there's this new one here, GSW
passing analysis. So, I want to just take a look here. I'm going to click into this
introduction to Apache Spark on Databricks, and just walk you through. It has a lot of
the info that we're going to cover in this course, but if you want to, you can jump right
in and run some of these jobs and really just get going. So, something to think
about. On the left is our workspace, where we're going to actually create our code. This
is where we can upload libraries. We can also share things. We have our recently used
file list. Tables, and we're going to create a lot of these here in this course. Clusters,
which we don't have any of right now. Then jobs, which I mentioned is not a Community
Edition feature, but that's where you'd find 'em.And then of course, you have a search
capability. So, that is a quick overview of the Community Edition of the Databricks
platform.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] In this section, we're going to get comfortable with the environment we'll
be using for the rest of the course, notebooks. Spark Notebooks are a collection of
cellscontaining code and markdown. These cells are used to perform all of our tasks and
our analysis. Code in Spark Notebooks in addition to markdown can run Scala, Python,
Java, and SQL and we'll be using a number of these, mostly Python and SQL. Now let's
dive in. What we're going to do is we're going to upload our exercise files to our
Databricks Community Edition which we've already signed up for. Then we're going to
explore a sample notebook. And then lastly, we'll create a cluster. And here in my
Databricks Community Edition, what I'm going to do is go to Workspace, I'm going to
right click and choose Import. From here, I'm going to click on where it says Drop File
Here, look for the exercise files from this course and find the Spark Essentials.dbc then
click Open. Once I get my green check mark, I'll click Import. And when I have a new
folder here, I have all the code for this entire course. Now you don't need this to actually
follow along. You can just create your Databricks Community account and then type in
and write the code just like we're going to do here, but if you do have the exercise
files, this is by the far the easiest way to get going because now you have everything
loaded and it's all in the browser from here. So let's take a look at 1.6 Introduction to
Notebooks. As I mentioned, we can use different types of code. In this first one, what
you see is markdown content.Now this is a cell. So if I double click inside of here, you
can see the code actually behind this so I have the percent MD which is a command. It's
known as a magic function. It tells Spark that this cell, whatever I type after that, is going
to be executed using MD which is the magic function for markdown. After that, you can
see I have different markdown code. If you're unfamiliar, it's a simple way of writing
HTML via shorthand. So you can take a look and then once you hit Enter on that, you'll
see that we have our markdown content rendered as a way instead of the code being
displayed itself. Let's take a look at our second cell here. Now this is a simple way that
we can create a list with a thousand integers. We're using xrange instead of range, this is
some Python code, and it's just more memory efficient to do it this way. Now when I
want to execute this, I can hold Shift and hit Enter or I can hit the Play button over here
on the right. Now we have the step where we need to actually create a cluster and I did
thisbecause this is a unique function in Databricks where it will automatically do this for
us.Now this would be using Spark 1.6.2. For most things in this course, we're going to be
using Spark 2.0, but for this case it's fine. Let's just go ahead and click Launch and Run
and then later, I'll show you how to manually create a new cluster using Spark 2.0. Okay
and now that our cluster was created and our code was executed, you can see that it ran
in 0.11 seconds and up top I can click down on the cluster list and you can see exactly
what's going on there. It looks like I had an issue with the library. We'll not worry about
that, but the cluster is up and running and our code is executing. Now if we take a look
to see the length of our data list, I'll run this cell and I should get 1,000 or 10,000
rather. And now we've just basically done the simplest Python functions. Now if we
wanted to actually use Spark here instead of just basic Python, we need to actually
create a Spark context. We can do that just by typing SC and then we give it a function
parallelize. Now what this does is it takes our data, our list of 10,000 integers, and it
spreads them across eight different partitions. So once I run this code here, you'll see
essentially it just runs it, it took 0.7 seconds, really fast, but it did some Spark functions.It
actually created a data frame. And if I want to prove that, down here I can actually run
the collect statement which will print out everything in my data frame. All right so as I
mentioned, we created a cluster automatically with the prompt that Databricks gave
us,but let's go create a new one under Clusters on the left. First I need to close this one
outby clicking Terminate under Actions and once that's complete, I will click Create
Cluster.I'm just going to call this 2. For the Spark version, I'm going to choose 2.0.1 with
Scala 2.1 which doesn't matter 'cause we won't be using Scala. I'll leave everything else
alone and click Create Cluster. And for the rest of this course, what we'll do is use this
cluster here and we'll see that in the dropdown in our notebook interface. Okay and
now that that's completed, we are ready to go.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] First, we should understand how Spark actually provides you with the
waysof working with data in it's data interfaces. Starting with the RDD, or the Resilient
Distributed Dataset, this is the first, and now lowest, level API for working with data in
Spark. RDD's are what makes Spark so fast and can provide data lineage across
processes as they're completed. You can think of an RDD as a container that allows you
to work with data objects. These objects can be of varying types and spread across
many machines in the cluster. While it's important to know about RDD's, they're really
only going to be useful as you get into the more advanced applications of Spark. That
brings us to the DataFrame. If you're familiar with Python, this is analogous to Pandas
and they're similar to DataFrames in R. If you're a sequel person like me, you can think
of DataFrames as tables of data that allow you to query it. These DataFrames are based
on RDD's except they only contain rows, whereas RDD's contain different types of
objects. This is what allows you to interact with it in a familiar Panda Table DataFrame
way. Lastly, the dataset is Spark's latest way of working with data, and it's basically a
combination of RDD's and DataFrames. It lets you type your data like an RDD while also
letting you query it, like a DataFrame. This is going to be how we refer to data going
forward, which is also how Spark is going to be operating. Let's take a quick look at the
Spark 2.0 data interfaces. Here you can see the Unified Apache Spark 2.0 API. As I
mentioned, the DataFrames and the datasets are kind of combining into what is now
known as a dataset. There are different aspects of those two different API's before that
are coming into one. You have the untyped API, which is a DataFrame, which is now
equal to a row of a dataset, sort of an alias. Then you have the typed API, which is the
dataset itself. We need to also understand the different ways that Spark actually allows
us to interact with our data. There are basically two things that we can do. First, we can
use actions to return results to our program. These actions also trigger the execution of
the other operation we can perform, a transformation. This second operation is known
as lazy, since they are only executed upon being called by an action. Some examples we
have here of actions and transformations, on the left we have show, count, collect, and
save. These are things that we'll actually use to execute our
transformations.Transformations look a lot like sequel operations. Select, distinct,
groupBy, sum, there's also other aggregations you can use, orderBy, filter, and limit.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] Now let's take a look at actually working with some data. In this case, we're
going to look at text files, so the different types of text data. First we should consider
totally unstructured data. And when we think about text based data, we should consider
a schema, which is simply a definition of how to work with the data. For totally
unstructured data, what we're talking about are text files, e mails, some types of logfiles,
and often just plain text without any schema definition. We can call these schema-never
files, there never really is a schema applied. Moving to the right a little bit, on our
schema spectrum here, we have this semi-structured data. This is data which is text
based, and tagged or structured in a loose way. On the Web today, this type of data is
often represented by JSON files. These files are just text, but do have a shape to
them, however this shape can be dynamic and change without notice. We can think of
these data as having a schema after we read it from our disk, so we would call these
schema-later files. The last type of data that we may encounter is structured data. This is
often easiest to work with, since it's created with a schema already in mind. This data
lives in databases and some files, like comma separated values files. We can think of this
data as schema-first data. So what we're going to do here is first find some sample
data, read in the text files, read in a directory of text files, and then create a
DataFrame.I'm going to switch over to Databricks now. So, here in Databricks, I've
opened up 2.2from the Exercise Files, and first we'll just kind of go through this and pull
in some data.We want to browse the file system. So, remember, we have these magic
characters here. So what we're going to do is %fs, for file system, and then ls, which, if
you're unfamiliar, is just basically a listing of the directories wherever we are currently. I
need to attach this to a cluster, and I have my cluster 2 running Spark 2.0. And when I
click Play on this, you'll see what I get is a listing of everything basically where I'm
currently executing the code from. So you can see here we have several different
directories,FileStore, databricks-datasets, databricks-results, temp and user. So some of
these have a lot of files and go really, really deep. Let's check out some of that sample
data here. And this is under the databricks-datasets/ bikeSharing/README, so this is a
text file, it's just a flat text file that somebody wrote, like an instruction manual. If I click
on the Play button here, you'll see that the file does exist, which is good. And if we want
to read that in and actually count the lines in the document, all we have to do, give it
the path, use sc for that Spark context, and use a function called text file, pass in that
path,and now we've essentially read in the Read Me file into a DataFrame. And then we
can execute a count function on that DataFrame, to see how many lines are in the Read
Me file. I'll click Play on this cell, and you can see there it went. So it read it in, it
assigned it to a DataFrame, and then it executed the count, giving us 98 lines. Now if we
want to take a look at this I can use the first function, which is operating again on our
DataFrame, and it's going to give us just the first line of that file. Which is, coincidentally,
just a bunch of equal signs, it's most likely the header, so let's take a look at the top 20
lines instead. So instead of using the first function, we're going to use take and put in
20. Click Play on this, and you can see here the first 20 lines of that Read Me file. OK, so,
what we have is that logfile there. Then what we want to do is actually do ita little bit
differently, where we actually count how many times the word bike shows up.So we're
getting a little bit deeper here. And what I have is essentially the logfile, which is just
going to be the same as the path from above, I'm just renaming that to a new
variable. Now I'm caching the actual data, so, I'm reading this into a new DataFrame I'm
calling logData, and I'm adding this .cache function, that way I can run a lot of
operations on it in memory without it having to reach back out to disk, and slow down
at all. Now what I want to do is get the number of times the word bike shows up. So,
this is getting a little bit into the Python world of data analysis, where what I have is a
lambda function, which essentially is an anonymous type function, and I'm passing in s, s
is the string in this case, so it's going to read every line of this Read Me file. And then
I'm going to run a little expression, bike in s.lower so I'm converting the string to lower
case and I'm looking for where the word bike exists, and for all of those results I'm
going to filter those out, and then I'm going to count how many results I have left. I'm
going to pass that into numBikes, and then I'm going to print that down below. If I run
this, you can take a look that the word bike shows up 14 times. Now that may seem kind
of erroneous, or a funny type of way of using Spark, but think about if you're looking
through tons and tons of logfiles, and you want to look for the word error, or a specific
error message. You could use this exact same piece of code to scan entire directories
and thousands of lines of code. OK, now let's work with a whole directory of files. And
here what we're going to do is look for a directory of comma separated values files. You
can see here that we have, and I'll run this code just to verify it's still there, the
databricks-datasets under the R datasets. There is a listing here of a bunch of different
datasets. You can see AirPassengers, you can see Formaldehyde, and HairEyeColor, all
this sample data that they provide us with. And you can see the size of it there, all good
stuff. If I wanted to read in that whole thing, I can essentially do very similar to what I
did before. So instead of passing in the specific file name, I pass in a wildcard character
saying everything in that directory that has a .csv. And I'm going to read in using a
different function, wholeTextFiles. So what happens here, it's actually going to return a
DataFrame that has the file name and the content in the second part of the
DataFrame.So if I run this code, I'll essentially read in every CSV file in that directory, and
I want to know how many there are. If we were working with, say, logfiles that came in
daily, so it'd be really handy to see how many files we got. So you can see there we have
71 files that came in. Then what I can do is I can actually convert this into a DataFrame,
by using the toDF function, and with that I can give it column names, name and
data, and then use the display function to show my results. All right, so with the results
there, what I have now is the name of the dataset, which is essentially the path to the
file, and then I have the actual data itself, this actually is the entire file there. And you
can scroll down and take a look at all of them. And if I wanted, I could do another
function, using select. So with a DataFrame, it works sort of like a table, remember, so I
can do things like select,which would be similar to what we would do in SQL. And I can
say just give me the name now. So I'm taking my filenames dataset, and I'm going to
display my results, and I'm selecting just the names. So, what files do we have in our
directory? And you can see my results here. So, in this little tutorial, what I was able to
do, read in completely unstructured data, count some stuff in there, find how many
times a word appears, then read in an entire directory using a wildcard character, parse
that into a DataFrame, run some basic operations on it, and see here a final listing of all
the files available for us in this directory.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Narrator] Let's take a look now at working with CSV data in DataFrames. Remember, a
DataFrame is similar to the table in SQL, Pandas in Python, or a data frame in R. It allows
for more expressive operations on data sets. What we are going to do here is find some
CSV data then we are going to sample that data, and then create a DataFrame with the
CSV. I'm going to switch over to the DataBricks community edition now, and here I have
loaded from the exercise files 2.3, and it's in a Python notebook. Here in the Databricks
environment, I have 2.3 from the exercise files loaded. I need to attach this to my
cluster, and then I'm going to go ahead and execute my first cell which shows us where
some data lives. This is under the online retail data. This data comes along with
Databricks, so it should be a similar result that you see here. Down below we just need
to specify the path as we've done before. When we read that data in, we use something
different than before. Here, we are using Spark instead of SC for the spark context. This
is how we were doing it in 2.0. Remember, we are combining the RUD and the
DataFrame API into a single API. That is what we are doing here, and we are going to
read it in using that path. Then we are just going to say take 20, so just show me the top
20 rows. When I execute this, you can see I have my results there. I have all of my, looks
like column headers on the first one, and then you have all the values from there on
out.All right, now let's actually create the DataFrame using that first row as the column
headers, and then display it so we can do more advanced things with it if we want. Here,
but we have essentially is a very similar to before, except instead of read.CSV, we are
doing spark.read.load, and then we pass in that same path. We give it the format of
CSV.We tell it header="true", meaning there is a header, and
inferSchema="true", meaning determine what the schema is automatically for me. Then
down below, we just display the actual results. All right, so we have the invoice
numbers, stock code, description, everything there. It looks really, really good. It figured
out apparently the data types and everything. All right, so now what if I just wanted to
see a distinct list of countries? In this case, we are looking at online retail data, so I want
to find out which countries we actually ship to. We are going to use Display to actually
show the results, then DF, which is our DataFrame, and we are going to say select
country. That is the column in our data set. Distinct, so it is going to remove the
duplicates, and orderBy so that they are in alphabetical order. When I run this, you can
see my Spark job executed and I was able to get a distinct list of countries from our data
set.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Narrator] Let's explore now some of the options that we can do with DataFrames.First
we're going to read in some data, then we'll inspect the data a little bit, we'll do some
aggregations, and lastly we'll perform a filter. Here in my Databricks environment I've
loaded the 2.4 example from the exercise files. And we have a lot of the things we did in
the previous clip but I'll just run through them again in case you didn't see that.First we
need to find some data. So we have a CSV file here under the Databricks datasets online
retail data-001/data.csv. We read that data in and we infer the schemaand we use the
first row as the header row, which gives us essentially a nice DataFrame to work with. So
before I run this I need to attach this to my cluster. And then hit the play button. Okay
so now we have our DF, our DataFrame that has the schema and everything loaded
correctly. Let's take a look at that schema. So if I hit play on this one you can see that we
have a function here called printSchema. And it shows essentially the data types of
everything and whether or not they are nullable. So this is a good way to look at your
schema and make sure that Spark didn't mess anything up when you loaded that CSV
file in and you told it to infer the schema. Okay, so if we just wanted to take a look at
one column here I don't want to see the entire DataFrame I just want to see one of
those columns, I can use the select function and then .show at the end of it.So it will
actually show me just that one column. So select is an operator similar to what we do
with SQL where we pass in a list of columns from our DataFrame, and it'll return just
those. So if I wanted to remove duplicates and sort, I can run this operation here as we
did in a previous clip, where we select just the country, we use the distinct function
which again is the same as SQL, and then we use an orderBy. I hit play, run this, and we
have now a distinct list of countries from our dataset. Okay, now let's create an
aggregation. So here what I want to do is calculate the order totals. So I'm using display
to see the results. I have my DataFrame there, then I do select, and I am doing it this
waythere are many different ways you can actually reference these but, I give it the
DataFrame name so DF, and then in brackets I put the column name I want. So invoice
no for invoice number, unit price. And then I actually multiply unit price times
quantity to get the order total so for each invoice, I'm going to multiply unit price and
quantity, then I'm going to group by invoice number and perform a sum operation. So
I'd have multiple results here, and I'm going to group all of those together so I have a
distinct list so only one result per invoice number with this calculation apply. I hit
play. And you can see my results. Good to go. Now if I want to filter this a little bit, let's
take a look and this is essentially like doing a where clause from SQL. I have my
DataFrames so DF.filter, and then I'm going to specify the invoice number equals an
invoice number that I'm interested in. Cause I wanted to take a look and just make
sure that these actually added up. So when I hit play here, what you'll see is
essentially everything from a DataFrame so the full dataset, with just that invoice
number and you can verify that in the first column there. Okay so I have quantity times
unit price. And if I were to manually do those outand then I could go verify it up
above, it should add up to $38.09. Alright, so let's take a look if I wanted to do another
type of filter where I just wanted to see the top 10 products in the UK. So what I have
here, a little bit more advanced function. I select the country, description, then I have a
calculation here, my unit price times my quantity. And I give it an alias of total, I want to
have a nice name that I can use. Then I'm going to actually use a group by of country
and description. So it's going to group by multiple fields, perform a sum operation
meaning that that third one there that total is actually going to be summarized. Then
I'm going to filter where country equals United Kingdom,and make sure you use the
double equals here it may trip some people up especially if you're coming from a SQL
background. Here in this context you need double equalswhich is actually the testing of
whether or not something equals something else. If you only use one you're actually
assigning that which doesn't work in this context you may get an error. I'm going to sort
that using the total, and I'm going to do ascending equals false so it's actually going to
go descending order highest to lowest. And limit it to just 10 results. So there's a lot of
stuff going on in that one little function there. Let me hit play. Our Spark job runs, you
can see our country, the description, and the total. And you can verify that yes indeed it
is in descending order from highest to lowest. You can see Dotcom Postage is the most
popular product or the highest selling product in the UK for our data.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] So, once we've calculated some aggregations, we've filtered things,we've
built a really good dataset, it's a nice idea to save that so we can actually refer back to
it. So, we're going to look now at actually saving our results. And we're going to do this
using something known as the DataFrameWriter and this is what will allow us to save
our DataFrame as a table in Spark. If you're connected to Hive it will create it as a
managed table in the Hive metastore as default. So, what we're going to do is just
review and rebuild our DataFrame, save it as a table and then browse the table in
DataBricks. Here in my DataBricks environment I have my exercise file 2.5 Saving your
Results loaded. So, the first thing we need to do is to attach this to our cluster and then
we're just going to rerun this first cell here which will actually read in our dataset and
create our DataFrame, then we're going to modify that a bit and create the top products
in the UK, and lastly, we need to actually create a new DataFrame that we're going to
use to then save. So, we're going to calculate product sales by country. Similar to what
we did before, we use select to get the country description and then we create our total
alias, which is the unit price times the quantity, and then we display that below, and with
that there, you can see we have how many product sales we have for every country. And
to save that into a table, all I need to do is reference my DataFrame which is called R1 in
this case, dot right, dot save as table. Click play. Alright, looks like the command has
completed. Now I'll click over into Tables and I can see that I have an actual table
here,so when I click on it you can see the schema that it created as well as the data
below.So, it's that easy. You can use Save As Table to convert your DataFrame into a
table.Now you can actually reference it still with Python, Scholar or you could even use
Spark SQL.
Creating tables
Selecting transcript lines in this section will navigate to timestamp in the video
- [Narrator] So let's take a little bit deeper look into actually creating tables in
Spark.There is some syntax we're going to use here and we're actually using the SQL
syntaxso it's Spark's SQL and it's almost identical to the ANSI standard SQL. So you can
see here there's some key pieces. Create table using. You can create partitioning, you
can do clustering, you can do bucketing. A lot of the same stuff you can do with Hive
you can do with Spark. So what we're going to do is upload a CSV and create a
table.Manually create the table with SQL. And create a table with the schema. So here in
my Databricks environment I have 3.1 creating tables notebook up. This is an actual SQL
notebook. So it's unlike the other ones where we've been doing everything in
Python.Now we're actually going to use Spark SQL. So first we need to attach our
notebook to the cluster. Then I'll take a look and make sure that our CSV file exists
here. Under the population versus price, and it looks like it does. So what we can do
now is we cancreate the table without a schema. And what we want to do is just
say create table if not exists meaning, if it doesn't exist go ahead and create it otherwise
ignore it and don't do it. Using a CSV, so for the options we're going to set it in the
path. There is a header so use that to create it. And we should infer the schema so just
like when we created a dataframe using Python, we have a lot of the similar options
here. So I'm just going to execute this guy and it will go create the table for me. And
then I'm going to run an actual SQL statement from it. So I'mma do Select * with means
return everything, from my table population v price, and just limit it to a hundred
results. So as you can see there, we have data. And it is in a table if I go take a look over
at my tables, you can see that it's up on top. Alright, so let's continue on. Now let's
create the table with the schema itself. So what I want to do here is I want to actually
create it explicitly. I don't want it to just automatically create it inferring the schema, I
want to do it exactly how I want it done. So our syntax is a little bit different. It's create
table if not exists, we give it the table name, online_retail. Now I give it every column
and the data type of it. So I have invoice number which is a string. Stock code which is a
string. Description string et cetera et cetera. Unit price is a double, which is an
interesting one and I chose that data type specifically so that way it limited the number
of decimals in the actual data type I know unit price is currency it's a number we use to
actually price an item. Now the options are pretty similar, I say header equals true I give
it a path to this file, and I'm using a CSV file. So when I run this, it's going to go create
the table, it's going to specifythis data source for the CSV, and then I get my results
right there. So you can see everything worked and now I explicitly created that
schema. Okay, now let's actually go create one by manually uploading a file. So if I go
over to tables, I can click on create table, I can click where it says drop file or click here
to upload. And I'll go find the exercise files there. And I can go chose any of these files
here for this one I'm going to choose Cogsley Sales. I just click open, and you can see
down below where it uploaded to. I can click preview table, this is where I'll actually give
it a name. Cogsley Sales. The file type is CSV, it has a column delimiter of a comma
which makes sense, and the first row has the header, check the box here. And it looks
much better. I'll click create table,and we're good to go. So now I can go run SQL
statements against this, I can pull it in to Python or use Scala or whatever language I
want to use using this data. If I take a look at the tables I can see it listed right there at
the top, Cogsley Sales.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] Now that we have some data in a table, we can actually use SparkSQL to
query that data. So what we're going to do is start with just a basic select
statement,then do an aggregation, and lastly, join some different tables together. Here
in Databricks, I have 3.2 loaded and I'mma attach this to the cluster and take a look at
the file that we just created a minute ago called Cogsley Sales. Just doing a simple select
statement, you can see all the data that's loaded here. Everything looks good, even
where we have a comma embedded in the actual column name, which is nice that it
parsed that automatically for us. If I wanted to do an aggregation, I can do some basic
sum operations on the sales amount and round that out. Click Play on this cell. There we
go. And next, if we wanted to do a join, we need to actually upload the other dataset
here that's being used, Cogsley Clients. So let's go do that now. I'll go over to
Tables,Create Table, click on where it says Drop File and I want to load Cogsley Clients
CSV. I'll hit Preview Table, give it a name. Yeah, it's a file type CSV, has a column
delimiter. The first row does indeed have the headers so I'll check that. And you can see
how it came across. Now some of these I can adjust, so Market Cap Amount looks like a
big INT to me instead of a string so I can change that if I want. IPO Year looks like there
are strings in there so I'll leave that and everything else looks like a string here to me so
we should be good. Oh, maybe Last Sale could be a double, looks like a, this is the last
sale of their stock price on the market so we'll keep that as is. Click Create Table and you
can see the preview here. Alright, so everything looks good. Let's go back to our actual
workspace, click on Recent and back to 3.2 and we now want to do a join to get our
client info. So we have Cogsley Sales and then, we're joining over to Cogsley Clients to
get the Company Name, IPO Year, and Symbol. Then we're doing a sum of sales from
the Cogsley Sales one. So we're seeing information about the company as well as
information about their sales with us. Click Play and you can see that the data came
back incredibly fast. If I take a look at the Spark Jobs, you can even go in and dissect
kind of the details of what Spark did. And last, we want to join to get some State Info. So
again, we have a new table here, State Info. Let's go load that one as a table. Same steps
as before. Now we're going to choose this State Info CSV file. We'll preview this
one. State underscore info, we'll call it and the first row does indeed have the
headers.Population Estimate looks like a BIGINT to me, not like a string. House Seats
could be an INT and State Hood looks like a date, but let's leave it as a string for
now because time stamps in loops are much different than dates in things like Excel. So
let's hit Create Table. Looks like it loaded it good. Alright, let's go back to our
notebook, 3.2 Querying Tables and at the very bottom, join to get the State Info. So
what we're going to do now is get the State Code instead of the State Name. Hit Play on
this and you can see our sales by state.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] Data visualization is one of the most powerful ways to communicate our
analysis and our findings. So let's take a look at visualizing data in Databricks
Notebooks. Now just to recap, data visualization is the idea where you can convert text-
based data like a grid or a pivot table into a graphical representation and it really makes
sense because we as humans process visual input much faster than any other type of
sensory input. In order to do this, what we're going to first do is recreate our sales
table.When we did it before, we just went through the basic steps and we need to
retype some of that data so we can actually perform calculations on it. Then we'll build a
bar chart, build a line chart, and build a map and who knows, maybe there'll be even a
bonus in there for you. So back here in my Databricks environment, I have a table
cosgley_sales. And if we take a look at this, you can see that everything is typed as a
string which is fine, it imported the data correctly, but notice there are a lot of things
here that are not strings and we'll typically want to type those correctly so we can
perform aggregations and use them in our analysis. So what I want to do is click on
Tables, click on cogsley_sales and Delete. I'll click back on Tables and I'll click Create
Table and I'm going to upload the cogsley_sales.csv again, I'll hit Preview Table once it's
done loading, give it the same name as before cogsley_sales.csv, it has the column
delimiter of comma, the first row has the actual headers. And so now what I want to do
is go through and type it correctly though. So row ID is an int, order ID is an int, quantity
is an int, quote will put at a double, discount percent as a double, rate as a double, sale
amount as a double, and then we should have one more far down here where we have
days to complete, we'll make that an int, and at the very end we have our hourly
wagewhich we'll put as a double, and the wage margin, that's essentially the profit that
we're making from the consultants here. This is a dataset where it's a consulting
companythat has a lot of sales for a lot of clients in different projects. So after that, we
click Create Table. You can see now the schema looks a lot cleaner and the data is still
represented here. If there are any problems with that, you would see nulls or some kind
of error so the fact that we see data here is really good. Okay, now let's go back and
let's take a look, in our workspace we have 3.3 for this clip and with that we're actually
going to visualize our data. First, I need to attach this to the cluster if it's not done so
already. Then I'll click Play on the first cell which just retrieves the data. And in order to
visualize it, in Spark I have a really cool option where I can click down below in my
notebook and just click on Bar Chart and it will actually give me something out of the
box. I'll click on Plot Options and I can play with this. Instead of sector, if I wanted to get
rid of that, and let's say I wanted region, I can drag region up to keys. You can see what
that's like there. Instead of sale amount, I could use any of the others. So play with this,
do your own, and in fact share it. I'd love to see what you come up with. So there we can
create a bar chart just with a couple clicks. To create a line chart, we'll run that same
query. So we're just reading in the full table here. We're not doing any specific
aggregations in advance or anything. Now I'll click on this and instead of bar chart, I'm
just going to go down and look for line chart and it did a fair job. It shows sale amount
and order/month/year. So if I wanted to adjust that, I could go in here, adjust my keys,
my values. I can even do something like add in the region to the series of groupings and
you can see the different series of groupings and how they go. So let's just leave it at
that for now. And again play around with this, choose a filter, add anything else that you
think might be interesting here and take a look at the data. So for a map what we're
going to do is pull back the actual state code. And this is what Spark wants. Now in
other tools, data visualization softwares, you don't necessarily need this here, but in
order for it to work in our notebook, we need the actual two-digit state code which is
conveniently located in the state info table that we loaded earlier. When I run this
one, all we're going to get is the actual state code and the total sales for that state from
our cogsley_sales dataset. I'll click down on my data visualization options here and
choose Map and you can see there. So we have a color coded map. I'll click on Plot
Options and sales is what's in values and state code. There's only two fields so it's pretty
straightforward and you can choose the display type and all those kinds of things. So
that's how easy it is to actually build a map inside of a Databricks notebook. Okay, I said
there'd be a bonus. Here's the bonus, the box plot. It's more of a statistical analysis
visualization that we might use. You're probably familiar with it if you do a lot of data
science work, but it's built-in to our actual options here so I just choose Box Plot and
you can see all the different options I have. So out of the box, I'm going to drag sale
amount to values, and then product category to keys. You can see it gives me a nice
preview here which is pretty informative. And if it's all scrunched up like this on your
screen as well, you can click on this little handle on the bottom there and drag that out
to get a much richer visualization. Okay, so this is some of the fun ways that you can
look at the data. I'd love to actually see other ways that you can do it so if you come up
with anything, go ahead and share that online and tag it with dataviznotebook and I'll
take a look and we can have a good conversation about it.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] Alright, now it's time to take a look at Machine Learning with
Spark.Machine learning is the way that we can create repeatable and automated
processes for producing expected output from given data. This is typically used to find
hidden patterns in data or make predictions. If we break down machine learning, there
are basically two types. There are Supervised and Unsupervised. Supervised learning is
when you train the algorithm to know what the expected output is and then let it figure
out which model, which statistical model, best produces that output. A classic example
here is looking at sales forecasts. Let's say you have the last three years' sales data by
month and you want to predict next year's sales. What you could do is give Spark a
sample of this data from years past and ask it to predict the remainder of the known
results you have. Once it does that, you can then compare its prediction to what actually
happened to figure out how accurate it was. Now as I mentioned, this is an automated
process. So the way you use this is you will actually have it run lots of different statistical
models to try to come up with the most accurate prediction and then use that going
forward. In unsupervised learning what you have is where you give Spark some data and
ask it to go fishing. Ask it to go find something. You don't give it an expected
output.You don't give it a sample set to go on. It actually uses this to find things that
maybe weren't totally obvious to you or you didn't even expect. So one of the key use
cases here is to identify these hidden patterns within the data without adding your bias
as to where the patterns might be. This is a more abstract application of machine
learning,but you can yield really interesting results and use it for things such as
recommendations for new movies to watch or even things like which planets in the solar
system might be habitable to humans. Let's take a look at Classification first. This is one
way that we can categorize data into groups. And this is a popular machine learning
method. One way that you probably encounter this every day is when your email
provider identifies spam versus non spam email. In classification, the groupings or the
categorization is provided by the user, so in the example of spam versus not spam, and
this is a type of supervised learning. So when we have in this case a list of emails known
to be spam, we provide that to our machine learning algorithm, our classification
algorithm, and say, "Go find ones like this." So we have an expected output and we want
to classify those in two ways. So this is one common machine learning method that
we're going to talk about. Another one is Regression. And regression is all about
variable relationships. How close are these two variables to each other? Now this is used
in prediction and forecasting and again is a supervised type of machine learning. So
imagine if we're predicting sales forecasts as I mentioned. We're giving it an expected
output. We know what we want and we know what the answer should be. And then we
want it to figure out what the most accurate answer will be. Next we have Clustering and
this is an interesting machine learning method that basically takes inputs and converts
them into groups. Now the groups aren't known in advance. And, in fact, there's lots of
math going on here when you do clustering to figure out how many groups there
should be. This is a form of unsupervised learning. And this is really popular in things
likefiguring out the different groups of customers and maybe baskets you can put them
in.Think about customers that spend a lot of money versus a little money. Maybe we
want to have different loyalty programs. You may not know in advance how many
different groups of customers you have or how many clusters of them so you use this
algorithm or this machine learning method to figure that out.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] One of the key steps in doing machine learning is actually getting your
data ready to be used. So we're going to take a look now at preparing data for machine
learning. Step one, we're going to pull in some external data, we'll read that in and
cleanse it, clean up any things or any irregularities that we might find, we'll aggregate it
and convert it to a format that we can use, and lastly we'll build features and labelswhich
we use for regression analysis. I'm over here in databricks now and I've loadedthe
exercise file, 4.2, Preparing Data for Machine Learning. Now, if you are looking for a
cluster and you don't have one because it's died, in the community edition, after 60
minutes of inactivity it will terminate. So you can create a new one directly from
here.And for this one in the machine learning example, we're going to want, a version
that is 1.6. And I do that because not everything from version 1.6 for machine learning
has been ported over to the latest version of Spark 2.0.1. So I want to teach it in this
version so that way it's clean and you can follow along until a later date when all of the
functions and everything have been ported over. So once you've created the cluster
there, you can select it and see that it's pending, and wait for that to finish before we
move on. And with our cluster created, first, as I mentioned, we're going to download
some external data. So here we're using another magic function, the percent sign and
"SH." That's for a shell command, something you would be executing on your terminal
window or command line if you're familiar with Windows. And we're going to issue a
curl command here. Curl is a really old program, where it basically just goes out to a
specific locationand retrieves whatever is at that location. So here, what we're doing is
saying "Curl-0,"and giving it a location for a CSV file that we're going to download. So
we're actually going to go pull out and download that data. I hit Play on this, and you
can see the output down below. So it actually went out and downloaded that data for
us. Now, that data is stored in the databricks slash driver location. And we can go
through and try to find that if you want. You could go take a look at any other files in
there, but this is how you access that data. I have my path. Now I'm going to actually
read that data in to a data frame here. So, sqlcontext.read.format, giving it CSV. Then I
have a backslash here to add a new line character. And I'm going to tell it there is a
header, and further schema, and then load the path that I created already. Next I'll cache
that, so it will actually put it in memory so we can reuse it and kind of perform
operations much faster.We'll call the dropna function, which will actually remove missing
value. So we're cleansing our data a little bit. And then we'll display the results there at
the bottom.Okay. So let's hit Play. You can see the Spark jobs executing. And when
they're done, we have our data set. So this is how we know that we have something that
is going to be good to work with. Next, we're going to aggregate and convert it. So, just
like before, we're going to use the data frame API here to actually perform some
operations. We're going to create a new data frame called summary. And we're going to
select OrderMonthYear, and SaleAmount. And then we're going to group by
OrderMonthYear,summarize it, order by OrderMonthYear, and then convert it to a data
frame. So we're performing several operations here in one line. Now that we've pulled
that data in and aggregated it, we have the monthly sales totals. Next, we want to
convert that first one,and just get the year. So, we have a little function here, and we're
doing a map, which is kind of like a for loop in Python. And we have a lambda function,
which passes in the string, the row itself, and then from there we convert using the int
function. Then we take the OrderMonthYear and replace any dashes. So, whereas
before, you had something like 2010-01-01. So we're going to remove all of that so it's
just numbers.And then the next one, return the SaleAmount, and lastly convert that to a
data frame.So we'll take a look at the results here. We've got two jobs that took about
four seconds,and we now have essentially two different data frames that we can use to
do our machine learning. Okay, last step is to actually convert the data frame to features
and labels. And this is something that is specific to regression. But it's what we're going
to use throughout the rest of this chapter. So, we need to import from
thispyspark.mllib.regression module. We're going to import LabeledPoint, which is a
methodthat we can actually use to come with the labels and points that we use to do
linear regression. So, we import that module, and then what we do, we create a new
data frame. We take the results, the one from above, that we aggregated and converted
everything to integers, and we use this LabeledPoint function here to create the actual
features that we're going to use to then perform our regression. Lastly, we say toDF,
which converts it to a data frame, and we display our results. I click Play on this. You can
see the results there are a little bit different than what you might expect. And that's
exactly what's happening, or what we wanted to happen. Where we created a feature,
and in the feature, for the values, we have that integer which represents the year, month,
and day of our actual sales amount, which is now found in the label column.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] Alright, now let's actually take a look and perform some machine
learning.We're going to build a linear regression model here. Step one we're going to
import the machine learning library, then we're going to define our algorithm, we'll fit
two different models and then we'll make a prediction. So, back here in DataBricks I've
loaded 4.3from the exercise files. I need to attach this to my cluster running Spark
1.6 and then I'm going to execute the first cell which will download the results. I'll
execute the second one which reads in and cleanses them, the third one, which
aggregates and convertsand the fourth one, which creates the DataFrame with features
and labels. So, we'd previously went over all that, that's why I didn't go over it here. In
this step we're just going to build the linear regression model. So, we need to import
the linear regression method from PySpark dot ML dot regression. Then we need to
define our linear regression algorithm, so it's as easy as saying LR equals linear
regression. So, we're going to create two models here, Model A and Model B. In Model
A we're going to fit our data which we created the DataFrame above with the CSV
file, and we're going to pass in the regularization parameter of zero and then on this
100. And so, two different models, simple as that. Then we're going to create some
predictions, so we're actually going to transform the data. So, we create two new
datasets, Predictions A, Predictions B, we take Model A and Model B, we apply
transformation and we pass in the data there.So, we've created the models, and now
we're going to make the predictions, and then we'll just take a look at Predictions A. I'll
hit play, we'll see the Spark jobs executing. I can even expand this if I want and see how
they're going. Okay, once that's all done, you can see the results. This is of my Prediction
A model. So, what I have are the features here and if you recall this where the
month and year of the actual sale amounts, so 2009, January 2009, February etc. The
actual amount is the label, and then you have the prediction which our model gave us
based on the feature and the label that were provided.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Narrator] Okay, now that we've evaluated our linear regression model we've built it,
we've pulled in data we've done everything we need to do it's time to bring it home and
actually visualize our results. So, we'll start by visualizing a linear regression model
hereand we need to first create SQL tableS. So that's the easiest way in Spark to actually
then come to visualization. We'll join those tables together and then we'll set up our
visualization. Here in Databricks I have the exercise file 4.5 loaded and I'm going to
attach this to our cluster running Spark 1.6. I'm going to go ahead and run all of these
steps above here just to load my data pull everything in make sure it's good to go. And
once all that is ready to go I want to create my table with the predictions so I want these
tables to have call names. So I'm going to specify a little array here of the columns and
it's going to be OrderMonthYear, SaleAmount, and Prediction. Then I'm going to create
a dataframe using parallelize and I'm going to look at the predictions results set that we
have, use the map function and Lambda and then we've going to parse it out a little
bit.So, I'm going to convert the first result into a float, so an actual integer and I'm just
going to pull back the first result from the features. Then we'll pull back the label then
we'll pull back the prediction. And we do that again for TableB or the PredictionsB
model. We convert both of those to dataframes using the 2DF and this is where we pass
in those column definitions. Then we actually use the save table as function to create
tables inside of Spark. So we do, So we do tablea.write.saveAsTable We give it a name,
predictionsA We specify the mode, overwrite. So if it was already there. We're just going
to overwrite it. Then we just print out a little statement to let us know that it was
created.So go ahead and run this statement here. Can look at the jobs as they're
going. Okay, and we have now TableA and TableB. So, now we're going to create a
simple visualization. So we use the magic function of %sql to then write a SQL
statementinstead of writing a Python one which is the default specified for this
Notebook. And we just do a really basic statement here so we do select,
OrderMonthYear, SaleAmount and then ModelA and ModelB So a.prediction and
b.prediction. We're joining these togetheron OrderMonthYear go ahead and run
this. And then we need to specify our plot options here. So, if you first ran that, and it
was a table of data looked like this we have the OrderMonthYear SaleAmount, so that's
the actual amount ModelA's prediction and ModelB's prediction, then I can click on this
little drop down and choose line chart and it automatically creates a decent one for
me. I can click on plot options and then kind of change this if I'd like. I have my ModelA
here I could change it to ModelB but I'll just leave it as what it is for now. Then I'll click
Apply. And I can drag on the corner here just to make it a little bit bigger so I can see
it. So linear regression if you recall is pretty flat type of linear trend you can think of it as
sort of as trendline of your data. So when your data looks like this what you typically get
for linear regression is the line that tries to gobasically through the middle of it and
that's what we see here so it did do it correctlyeven though there is a high degree of
error.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Narrator] Alright, now let's take a look at doing real-time processing here, and in
Spark terms, we call this streaming analytics. Now, here's an ideal worldview of how your
big data system could help everyone. Real time events stream in, and data at rest gets
batched in on a regular interval, say nightly. Inside of Spark Streaming, you can
execute both machine learning queries, using MLlib, as well as interactive queries, using
Spark SQL. Data is processed and then exported to either a storage engine or to your
analytics platform. Some common use cases are streaming ETL, where data is
continuously cleansed and aggregated before being stored; detecting anomalies in your
data and triggering further jobs, think fraud protection from your bank; augmented user
experience, where you can change the interface for the user to match their preferences;
and in other good cases, performing real-time analytics, where you're actuallymeasuring
events as they occur. This is particularly useful for system monitoring for websites and
call volume monitoring for call centers. How does Spark Streaming actually do this? It's
something called micro batching. Now, this is a method that Spark uses of achieving
real-time data processing by chunking operations into really, really small batches. The
way this works, if we wanted to visualize this process, it would look something like
this. Streams of data would come in and be handled by the receivers.These receivers
would generate micro batches. Spark would then process these jobsand then produce
the results based on whatever the jobs were defined as. There are two components here
that are particularly key for us, that is setting up the streaming context and then
executing those jobs using Spark MLlib or Spark SQL.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] So in order to use Spark streaming, we need to actually set up a streaming
context. And what we're going to do here is first download some files, and we're going
to download a zip file and we're going to read those files in one by one. So we'll sort of
simulate streaming here. Then we'll perform some basic analysis. One of the things that
we're doing here in Spark 2.0 is called structured streaming, so what it's going to be is
we're going to define the structure after we've done some basic analysis,then set up our
streaming job. All right, let's switch over now, and here in Databricks I have the 5.2 file
loaded. This is from the exercise files. And I need to attach this to a cluster. Well, if I
don't have one, I can create a cluster here, and I'm just going to call it 2B, 2C, you can
call it anything you want. The important part is that Spark 2.0.1 is the edition you're
using. We're not using Scala, so it doesn't matter here. And I'll click Create Cluster. As
that runs, I'll go ahead and choose it from my dropdown list. And once that's done and
we're attached to our cluster, we're good to go, we're going to issue this first command
here and we're going to download some sales data. Once that's done, we can see that
the data was downloaded using that second command there, which actually found it to
be an archive file. So if you're doing this on your ownwith your own data, make sure that
you're creating a valid zip file. That is something that can be confusing when you're
going from Windows to Mac or Mac to Linux, et cetera.Then I need to unzip that file, so
we'll issue another shell command here which just unzips it. You can see the results
there, and what that looks like. So we have, in effect, 60 or 70, almost 80 files here. Then
as I go down, I can take a look at that directory if I want. You can see all the files in
there. But we need to actually read that data in, so now we're actually going to do some
analysis work. We're going to import the SQL types here so we can create our
table. We'll give it the path, and this is the path just to the directory, it's not to a specific
file. So any file in that directory will essentially be read in using this. Then we need to
create our schema. So we create one called StructType and we pass in an array. The
array are all of the columns that we want, so order ID, order date, et cetera. And we give
it the data type. So in the middle we have double type, string type, and then the last one
is whether or not it's nullable, whether or not a null value can exist. And that's set to
True for all of these columns here. Next, we create a dataframe. This is just a regular
thing where we just call Spark and we read in the sales schema and give it the
path. Then we create a temp view here called Sales, and I'll show you what happens here
at the end of this. Then we display it, finally, just to make sure that the data exists. So I'll
hit Play. You can see it running on the bottom. And the data was displayed, which is
good. Now if you go look for that table we created, it's not there, and it's because this is
a temporary table only. So at the end of this session or after Iclose this window out, that
table's gone. But we can prove that it's there just by doing a basic SQL command from
it. So let's run this SQL command here. And there you go, you can see the data actually
exists doing a basic select star from that table. Remember, if you want to use SQL while
you're in a Python notebook or vice versa, you use percent sign and then the magic
function. So SQL or Python or SH for shell command, et cetera.All right, now we want to
actually just do some basic analysis, so I'll just run another command here. This one is
bringing back the product key as products, and then a sale amount, rounded out. You
can see that what I typically get is a result like this, and if you click on the bar chart, it
should create a nice little bar chart for you. If it didn't, you can click on Plot Options and
adjust it accordingly. But there we go. We've done some basic analysis work. We have
now the job that we want to actually then create as a real-time analysis job, but right
now we have the analysis done, so let's set up our streaming job.Okay, this is very
similar to what we did above, except now we're actually going to havea couple different
dataframes. So this one is just the readStream, so what we're going to do is basically do
the exact same thing, except instead of just read, it's readStream. And for Options, we're
going to specify max files per trigger of one. That means that as this data is read in, it's
only going to pull in one file at a time. It's not going to pull in the entire directory. Okay,
then we create the streaming input or the streaming counts. This is the one that actually
calculates that count for us. So select the product key and sales amount, group by
product key, and produce a sum. Lastly, just a test that the thing is actually working, it's
actually streaming the data, I'm going to do streamingCountsDF.isStreaming, and then
that will tell us whether or not it is. So we've done our basic analysis, now we set up the
streaming just basically by creating a couple different dataframes and using essentially
one different word here, readStream. Hit Play on this, and you can see that in fact it is
streaming. It took really only .12 seconds, so it was really, really fast. You may not have
noticed it. But we know now that the data's being read in as we go and it's in a
streaming method.
Querying streaming data
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] All right, at this point, we've already setup our stream, we have our data
loaded, we need to now actually just query our streaming data. So, first, we're going to
recreate the basic analysis, then we'll create our in-memory SQL table, and lastly, we'll
query that table. Here in Databricks, I have the exercise file 5.3 loaded, and if you've
already executed a few of these first steps, you don't need to redo that, and you can
check here by running this third cell, but if you weren't following along with the
previous clips that's fine, execute the first one, which will download this sales_log.zip
file, and then unzip that file so you can actually see the data and all the different files it
has there. Make sure also that you're connected to a cluster and that cluster is running
Spark 2.0 or greater. Okay, so once we have that, let's just make sure that the directory
exists in my environment here and I'll hit play, and it looks like it's there. So, this is where
the data gets downloaded to, databricks/driver/sales_log, and there's a lot of different
csv's here. So what we're going to do is read those in and treat those as if they were
streaming in. To read in the data, we need to first create our schema. So we have our
SQL types that we're importing, the path that we just specified, the actual schema itself
of our streaming table, then we read it in and this is the static dataframe, and then we
create this sales view, and then we display the data. So this has already been done if you
were following along from the previous clips, and I'll just go ahead and hit play, okay,
and make sure everything's good. We'll check the table yet again, make sure that you
expand that out to see the results. Okay, so our sales table exists and we were able to
get that. We can build the chart, which again we did in a previous clip, just verifying
everything still works. Okay, that's perfect. Now we need to setup our streaming job. So,
at the end of this, what we do is, first we create our streaming input dataframe, which is
essentially just like the one above except here it says readStream. We then pull in the
salesSchema and we tell it only read one file at a time. Okay, then we have the actual
counts, which is a streaming job which then just checks to actually do the counts of how
many sales we have coming in, and at the very end we say, is it streaming, and in fact,
once we run that, we see that it is. Okay, so now that we have all of our basic stuff
recreated, we need to create our streaming table. So just like before, what we do is, we
create a streamingCountsDF, and this time we actually do a writeStream. We do it in
memory, so it's not actually loading it down to the disk or anything like that, and the
query is sales_stream, the outputMode is complete, that means that all the counts
should be done before it actually writes them, and then we say start. So this is just the
definition that we're going to use here and what I'm going to do is hit play, and you're
going to see that this is running, and it's awaiting data, and as this go, the Spark Jobs
will begin to execute, and it will continue to do it for every file in our actual directory. So
as that's going, we can then run a query here. So I'll go run my query and I can see what
these results are like now, and then as data starts to get loaded, I'll go ahead and check
it out again and we'll see more and more data coming in. So as the results come in, you
can see up above that the jobs start to actually create new jobs after they're done, you
can see the sources that were read in, and you can see the results of our table down
here. So, that didn't actually show a lot right? So there's a lot more of data that needs to
come in for these numbers to be accurate. So, up above, you can kind of see what's
happening. You can see as the stages are actually executing, how many tasks their are,
and then a new job is created. So the stream is essentially going. Let's go ahead and
refresh this job down here while hitting play again. So right now, remember, the top line
is about 50,000, let's see what it is after I refresh this guy here, and depending on your
cluster configuration, this may take longer or slower, maybe you have other jobs going
on, all that. Once that's done, you can see now that the top line is well above 50,000, so
we have a lot more data in our actual sales_stream table. All right, so this is called
Structured Streaming in Spark 2.0, and it is a really awesome new way that you can do
real-time analytics. And of course, this is just the basics to get you started on this
entirely new area of data processing and data analysis.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] Now so far we've been doing everything in the DataBricks cloud but if
you're working in an enterprise environment you most likely have a cluster which you
need to actually run things on. So, when you development it's common that we want to
actually run things locally first. And so, what we're going to do here is look at setting up
Spark locally. The first thing I want to do is download Spark, then run it from the
shell.So, here on spark.apache.org/downloads and if that link doesn't work go to
spark.apache.org and look for this big green download Spark button, then find the
version you want. So, for this I'm going to use 1.6.3 and the prebuilt version for Hadoop
2.6 and I just click on the link here and it'll download it. Once I've done that, just unzip
it.Also, you'll want to download the Java SDK here, the version eight is the current
version.You'll need to accept the license agreement and then choose whatever platform
you're working on. Once I've done that and I've installed the JDK, what I want to do is
take this actual Spark folder and just actually execute commands from it. So, I'm going
to switch over to Terminal now and here in Terminal what I'm going to do is take a look
at what I have, just the basic folder structure that comes with my Mac and I'm going to
copy that stuff into a new folder. So, first let's make a new directory called Spark. Then I
actually want to copy and recursively from downloads slash Spark, slash star into
Spark. Just to check that it's there, we'll go ahead and relocate ourself into that
directory, and it looks like thing were copied in. So, to run Spark from the shell, I'm
going to type ./bin/pysparkand you can see some messages pop up and then bam, I can
see Spark Version 1.6.3 running, and I can type SC just to check and you can see that
I've returned a Spark context object. That means that Spark is actually up and
running, and I can then execute code using my local Spark environment from the shell.
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] Now let's take a look at connecting Jupyter notebooks to Spark. In order to
do this, first we need to download Anaconda. So if you don't have that installed
already, we'll go through the steps there. Then we'll install Jupyter. Then we'll link Spark
with iPython. And lastly, we'll run PySpark. Okay, so let's switch over to my web browser
now. So here on the continuum website for Anaconda, we need to go down and find my
download here. I'm going to do the graphical installer for version 2.7. And once that's
done, I'll just double click on the package file that was downloaded and click through
the installer. So once the Anaconda install is done, we need to install Jupyter. So I'm
going to go over to my terminal window. So here in my terminal window, I'm going to
type in conda install jupyter. Then I'm going to do conda update jupyter. And with that I
need to actually now link Spark with iPython notebooks, so I'm going to paste in some
commands here that you can find in a text file associated with this module. And the first
one is going to set the path for Spark. And what I need to do is adjust this so it's the
actual correct path to Spark. So that would be ~/spark/bin. That's why I do that so it
simplifies it for me. Then the next one is actually setting up the PySpark driver. And this
one I'll just paste in. And the last one specifies the PySpark options. Paste that in as
well. So that's good. So we've essentially set up all of the environment variables. Now if I
type source.profile it will apply that to the session I'm currently working in. Now we
need to actually run an iPython notebook. So I can do that from wherever I want, but if I
wanted to I could create something called notebooks. So mkdir notebooks, cd
notebooks. And this way any notebooks I create or files I save or anything will be in a
directory. And then from here, I just type in pyspark. And then I should get a browser
now that actually shows my Jupyter notebook and there's no files in here or anything
like that. So I can click New, and then I'll click Python defaults. And from here I can type
in sc and hold shift and hit enter. And I can see that the Spark context is running. So
now I could actually run Spark commands in Jupyter all on my local machine.
Other connection options
Selecting transcript lines in this section will navigate to timestamp in the video
- [Instructor] So, the last thing to talk about here are other ways to connect to Spark.So,
it now supports a lot of BI Tools, and it does that through JDBC and ODBC
connections. So Simba, a company that makes drivers for a lot of other platforms made
one back in 2014 enabling JDBC and ODBC connections from your BI Tools, whether
that be QlikView, Tableau, Power BI, or whatever else you may be using directly into
Spark. So, if you have something that uses ODBC or JDBC you can connect to Spark
using those drivers. Of course, you can also use Spark SQL directly from a cloud
environment like Databricks, from a notebook like Jupyter, or from the command shell.