Amazon AWS Certified Data Analytics Specialty – Domain 3: Processing Part 7
- [Exercise] Elastic MapReduce, Part 1
For our next hands on activity, we’re going to build a product recommendation system for Cadabra. com. And the good news is that we’ve already built out most of this system way back in Exercise One. So we already have an EC Two instance that is Populating server logs that get consumed by Kinesis data firehose, which in turn dumps that data into an Amazon S Three bucket. And this has already been set up. And you might recall that we already put 500,000 rows of data into S Three, just sort of make a little data lake, if you will, of order day that we can then play with. So all we have to do now is spin up an Amazon EMR cluster and run Apache Spark using ML Lib to actually generate a recommendations model based on that order data in S Three. So let’s dive in and make that happen. Back to the management console in AWS. We’re going to select EMR.
Just type it in here if you don’t see it in your list and create cluster. Give it a name like Cadaver or Rex or something. Default logging is fine. We’re going to use the latest EMR release and we’re going to select Spark for the applications here because we are fundamentally building a Spark application using ML Lib to create these recommendations. You can see we also have options here for something optimized for presto HBase or just straight up Hadoop.
Spark is on top of Hadoop, however, so we are still sitting on top of an HDFS file system here. And we have the Zeppelin notebook here. If we want to actually have an interactive notebook on top of Spark that we could play with hardware configuration, you have to make sure you select an instance type that is both appropriate to your application and available in the Availability Zone that you’re launching within. And sometimes you have to kind of figure that out the hard way. Generally speaking, though, the most recent instance type will be available in most Availability Zones.
So let’s go ahead and change that M Three to M Five just to be safe. Now, I want to stress again here that we are incurring real charges here. An EMR cluster does not fall within the free usage tier. So if you are squeamish about incurring any charges as part of this course, do not follow along here because once we create this cluster, you will start getting billed. It’s not much, we’re just going to be running it for about less than an hour. But if you don’t want to spend any money, just watch. We will stick with three instances because we do want to run on a real cluster here and not just a single machine. We will select an EC Two key pair. We can just reuse the one that we made for our EC Two instance before that contains our log data and hit Create Cluster. It will take a few minutes for this cluster to spin up.
So we’re just going to have to wait for it until starting says something a little bit more encouraging about being able to actually connect to it and actually work with it. So we’ll pause here and come back when that’s ready. All right, it took several minutes, but it looks like our cluster is finally up and running and waiting to go. Here a quick word about the instant types here a little bit more. So we did successfully provision a m five x large for our master node and two core slave nodes. There also m five x larges. Now, you want to generally think a little bit more deeply about what instance type makes sense for your application. In this case, we’re using Apache Spark doing machine learning, and machine learning tends to be a very CPU heavy operation, so maybe a C five would have been a better choice there. It just turns out that that instance type isn’t actually available in my availability zone. So sometimes you just have to go with what’s available and a general purpose m five Xlarge is usually good enough for most applications. Now, if you were doing something with deep learning, for example, that would be more of a GPU intensive operation.
So if you’re building big neural networks using Tensor Flow or something on this cluster, you’d want to use a GPU optimized cluster instead. But we’re going to stick with m five x large. What we’re doing isn’t actually all that taxing, so it doesn’t really matter that much for our little example here. So before we can actually connect to our master node and run our spark script from it, first we need to open up that node to SSH from our PC here at home.
So by default, that’s not set up for you and they don’t really tell you about that. Go down to the security groups from Master and click on that. And we’re going to select the Elastic MapReduce master group name here. And if you go to Inbound, you’ll see the port 22 is not in here. So there’s really no way to connect to this from a terminal right now. So let’s fix that. Hit edit, scroll down. We’ll add a rule, custom TCP rule TCP, port 22. And let’s set this to my IP.
Of course, I’ll be different for you. Hit save. And now we should have some hope of actually connecting to that master node. So go back and click on Master public DNS here. If you click on SSH, it will give you the instructions you need from Putty. We’ll just going to use that for our connection. Host there. Copy that. Fire up Putty. Put that in the host name.
And as before, go to SSH, click on off and select our key file that we use when creating this cluster. The host key is fine. You can click OK on that. And we’re logged in. So cool. We are logged into the master node of our Hadoop cluster here on Amazon Elastic MapReduce. And it’s all set up for Apache Spark for us already. Pretty easy. All right, so before we start writing our own machine learning algorithms for recommender systems from scratch, that’s a whole other course. So let’s just start by using one of the samples that actually comes with Spark, that produces recommendations using an algorithm called ALS alternating least squares and that lives under the samples directory of Spark itself. Let’s make a copy of that to our home directory so we can start from that and just mess around with it.
So we’re going to type in CP the copyuserlib Spark examples source main python ML. Now we’re getting the machine learning stuff. It’s getting interesting. And ALS underscore example PY to copy it right here into our home directory. And there it is. Let’s take a look at it. Nano ALS example PY and we can just kind of scroll through here and see what’s going on. Okay, so you can see we’re importing PY Spark. So this is a Python script. That is a Spark driver script. It kicks off a Spark session, calls it ALS example reads in data here from something called Sample Movie Lens Ratings Text. So by default, it’s using some movie ratings data that’s part of a public data set called Movie Lens, and it’s looking for it in the data. ML Lib ALS Sample movie lens ratings text path now that is not a local file path. Notice if there’s no file protocol in front of that, there’s no S three in front of it either.
So it’s actually going to be looking in the HDFS file system on this cluster. Remember, we’re running Spark on EMR on top of a Hadoop cluster here. And it does have an HDFS file system that’s shared amongst all the different nodes in this cluster. So very important that we’re reading from a location that every node in the cluster can read from. If that was reading from a local directory, all the other nodes on the cluster wouldn’t be able to get at that, right? So it’s implicit that this data lives in the HDFS file system, not on your local file system. So we’re about to make sure that data is there before we do anything else. Remember that from there. We don’t really want to get into the details of Spark so much that’s not going to be covered on the exam, but at a very high level, it’s reading every line of that movie ratings data. It’s mapping it out to actually split that up. Based on its individual fields, it looks like it’s delimited by double colon characters. It then creates rows into an RDD by splitting out the user ID, movie ID rating and timestamp for each movie rating. Creates a data frame from that, splits that into a training and test set, and then it creates this ALS recommendation model, which is just part of ML lib. It does all the heavy lifting for you, using some parameters that make sense for it. And after that, we just train the model based on the training data that we extracted from that Ratings RDD. And then we can make predictions from that to actually try to see if we can actually predict the ratings of a given movie that the model doesn’t know about yet.
And we measure the root main square error as a measure of measuring the quality of those predictions. And just to see some actual results, we generate both the top ten movie recommendations for each user and flipping it on its head, the user recommendations for each movie. So we can actually go in both directions here. Not only can we recommend movies for users, we can recommend users for movies, which is kind of an interesting take in our case.
For order data, we’re going to be recommending orders like items, actual physical products, and not movies. But otherwise it’s the same thing. So with that high level understanding, let’s exit out of this control. X and like we said, we need to give it some data to play with. I just want to make sure that this script runs before we start messing with it. So we’re going to actually copy that sample data into Hadoop’s HDFS file system. Let’s do the following. Hadoop Smoker we need to give this thing a home to go into pusrhdudoopdata malleable so that created that directory in the HDFS file systems that’s shared across this cluster.
Now let’s copy that data into it. Hadoop FS copy from localuserlibsparkdatasdata ML Libs as ample movie lens ratings Text so this is copying from the local path and putting it into HDFS under the same place, more or less. User with an E. Hadoop data ML Libal Movie Lens underscore ratings. Text, double check everything. Here one small typo we’ll mess everything up. So let’s see. Looks good to me. And that has copied that sample data into the HDFS file system where the entire cluster can get at it, see if it works. So to actually run a Spark script from the master node, you need to use the Spark submit command, followed by the script itself. ALS example PY and let’s see what happens. That should, in theory, distribute that script across the entire cluster of three machines, and that entire cluster will just start chewing on the data and building that recommendation model and see what happens.
- [Exercise] Elastic MapReduce, Part 2
So right after this Spark session is created, right there, let’s add a new line and say Spark, spark. Context the capital C set log level. Again, pay attention to Caps there. Error just like that. So this means that I only want to see log messages that are at the error level. Everything else, please suppress for me. Now, this is only going to take effect after the Spark session has actually been created. So we’re still going to get a bunch of info messages until this script actually starts running. But once it starts doing interesting work, those messages will be suppressed. So we should be able to see our final output more easily. Control O to write that out, enter and control X. And let’s try running that again. Spark submit ALS example PY and give that another go.
So again, we’re still seeing info messages while it’s spinning things up, but once it actually starts running the driver script, those should go away. Oh, cool, a result. So it actually printed out our root mean square error there. This is not a machine learning class, so I’m not going to get into what that means. It’s just a metric of how well it does and look results. Awesome. So we can finally see them with all those info messages out of the way. And you can see there, for example, user ID 26 had Movie ID 90 recommended with a predicted rating of six, which doesn’t make sense because it goes from one to five. But it’s really, really confident about Movie ID 90, whatever that is for User ID 26 and flipping it on its head. We also have our Movie ID with user recommendations and predicted ratings from them as well. Cool. So we have something that works here.
We can build upon that now to actually work with our data set instead of this fabricated sample movie ratings data set. So to do that, to make life easier for you, I have actually given you that code in the course materials. But first we need to make sure that our S Three bucket that contains the order data that we want to use instead is accessible to this script. So let’s head back to the AWS console here, go back to S Three and go into the bucket that we created originally back in Exercise One. Now, for you, this will be different depending on when you actually ran the script. It buckets up based on when you actually ran it and generated those logs from. So for me, that’s going to be here. Just go to the very first date that you see and the very first hour that you see under that because that’s going back to Exercise One, where we actually output 500,000 rows of data.
So that’s a pretty meaningful data set there. All right, let’s go back up one level here and actually select that final folder here and make that public. Normally, this is a horrible thing to do, of course, but this isn’t sensitive data. It’s a public data set, so I’m not too worried about it. All right, so now that data should be accessible to our spark script, let’s go back to our cluster here. And if you go to the course materials, we can see the changes that we want to make. Just go into the Product Recommendations folder here and open up Alsmodifications text. These are the snippets of code that we want to replace. So we’re changing the source of the data here to S Three, where we actually stored that data. So the first thing you need to do is change that to where your S Three bucket is. Remember, you have a unique name, so you’re going to have to change that to match your bucket name. It should be still up here in the URL for S Three, where you’re messing around with that. So let’s make sure that matches up with this. So instead of 211 16, I’m going to change that to 225 20.
Again, make sure this matches the folder within the bucket that you created for the 500,000 rows of order information that you created back in Exercise One. It will be different from what you see here. From there, we need to change a few more things. First of all, the delimiter is not double colons. It’s just a comma for us. We’re also going to do a little bit of data clean up here. So it turns out that there’s a lot of junk in this data set, and if we don’t filter that out, we’re going to get some weird results. So what these two lines here are doing are just making sure that we’re only taking in order IDs and customer IDs that actually have digits in them. There’s a lot of, like, special cases in the data that deal with giving people free shipping or postage or dealing with discounts and commissions and bank charges. Those don’t reflect people actually buying things. They don’t reflect an actual indication of interest. So we need to clean that data and filter that data out ahead of time. That’s what’s going on here. It’s a good reminder that in any machine learning data, cleaning is a big part of what you’re doing. Garbage in, garbage out, guys, right? So you want to make sure that the behavior data that you’re training your systems on really is behavior data from real people for real items. And we had to do some work to make sure that happens. After that, we just constructed our rows differently. We do make sure that we extracted our Customer IDs, item IDs, and ratings.
Thusly, since these aren’t really ratings, we’re treating them as implicit ratings. We’re saying anytime that somebody buys something implicitly, we’re saying that person liked that thing and it corresponds to a rating of 1. 0 as opposed to a rating of nothing if they didn’t buy something. So let’s go ahead and copy this into our clipboard, back to our EC two master node here on the EMR cluster. Let’s edit our driver script. All right, so let’s get this code side by side so I can see where it has to go. So basically, we’re starting with this bit of code here and replacing everything down to where it constructs ratings. RDD, right? So basically, those three lines are going to get replaced. So control k CTRL K, CTRL Kontrol K. And I’m going to right click to paste that in. All right, looks good. So that should work. Let’s hit CTRL O to write that out, enter curl and kick that off again and see what happens. So once again, we’ll do spark, submit ALS example PY, kick that off and let it do its thing again. It is going to distribute this across our entire cluster of three nodes and process those 500,000 lines of order information to construct a recommendation model and actually make some predictions for some real users based on that model. Give that a minute to finish. All right, I actually got an error here, so apparently I missed something. Another learning opportunity here.
The error message, fortunately, is pretty straightforward. It’s complaining that we’re looking for something called User ID, and that doesn’t exist in our data. So I must have missed something. Let’s go back and edit that script again. Debugging is part of the process here, guys. So it’s just what you do. Well, there’s where User ID shows up, and it makes sense. Now, I basically forgot to change this model to specify what the actual column names are that I’m using for User Identifiers and Item Identifiers. So for me, it’s not user ID. It’s customer ID. Let’s go ahead and change that to Customer instead of User. And we’re not dealing with Movie IDs either. We’re dealing with item IDs. So let’s go ahead and save that again. CTRL O, enter curl and try it again. Spark, submit ALS example PY and see what happens this time. All right, looks like we’re getting some results here.
So our script is working. RMC error was pretty low there. That’s encouraging. And here are some actual results. Cool. All right, we got a few sample results here for some sample customers. So customer ID 13098, apparently, we think would really like item ID 79342, whatever the heck that is. So this isn’t very intuitive. Like, what does item ID 21899 really mean? And apparently user ID 15299 would really like it. We can look that up if you’re curious. So just open up another session here to our actual EC two node that contains the source log data. And if we go into Varlogcadabra, that’s where our original logs exist, we can actually search there to see what that ID means.
For example, grip, what is it? 21899 star dot log. And you can see that 21899 actually is a Garage key Fob. So apparently the person in the data set that would most love a Garage key Fob is user ID 15299, according to our recommender algorithm here. So, some real results here, guys. We actually built a real recommender system using Apache Spark on a real EMR cluster. That’s kind of awesome. Let’s clean up our mess here, exit out of that EC two node, and let’s also exit out of the master node for our EMR cluster. And again, this cluster does cost real money. So do not forget to shut this down. Let’s go back to our console here, go back to EMR, click on our cluster and terminate it. All right, do not forget to do that, guys, or else it will add up fast. These are expensive computers and there’s three of them, so don’t want to keep those running for too long. All right, so that is terminating. So we’ve cleaned up our mess, and again, you have successfully created a recommender system using Apache Sparks ML Lib on an EMR cluster on top of an S Three data lake that was populated from Amazon Kinesis Firehose. How cool is that?
- AWS Data Pipeline
Next, let’s do a really quick overview of AWS data pipeline. You’re not going to need a whole lot of depth on this, but you need to know what it is and how it fits into big data workflows. Basically, data pipeline lets you schedule tasks for processing your big data. You’re not going to need a lot of depth on it, but you need to know what it does. So let’s just look at an example here. Let’s imagine that we have log files that are published on EC Two instances, and we need to publish those log files into S Three and later analyze them using EMR, some process running on our EMR cluster, maybe Spark or something. So data pipeline can schedule a daily task or whatever frequency you want to copy those log files from EC Two into S Three. Again, this is just an example. It can do many different things and then maybe on a weekly task basis, launch that data analysis on the data that’s been stored up in S Three. So if you’re familiar with Hadoop, it’s kind of like Uzi. It’s basically a task scheduling framework where you can run different operations that depend on each other on different schedules.
Basically, it’s a web service that helps you reliably process and move data between different AWS compute and storage services at specified intervals. In this case, we’re copying data from EC Two instances to S Three on a daily basis. And then on a weekly basis, we might analyze that data that’s accumulated using an EMR cluster. That’s just one example of how data pipeline might be used. Very simple concept. Let’s go into a little bit more depth about data pipeline and what it can do. So destinations that can be used by data pipeline include S Three like we talked about. It can also work with RDS, the relational database service, DynamoDB, Redshift and EMR.
So all of those things can be controlled by data pipeline. It manages all the task dependencies for you, and it will automatically retry things as necessary and notify you when something fails. It can also have pipelines that work across regions, and it also has features for precondition checks so you can make sure that the data that you’re trying to process is really ready before you try to do something with it. Here are some examples of precondition checks that you might be able to do with data pipeline. One is called DynamoDB. Data exists so that can check for the existence of data inside a DynamoDB table before you try to move it or process it somewhere else.
There’s also a DynamoDB table exists that can check for the existence of an entire table. For S Three, we have S Three key exist and S Three prefix exists which can both be used to test for the existence of either a specific path in S Three or a specific prefix, sort of a path that exists within S Three itself. There’s also the shell command precondition and this runs an arbitrary script of your own on your resources and checks that the script succeeds. So with that precondition check you can pretty much do anything you want. You can write a script that checks for whatever you want to and your data pipeline won’t kick off until that precondition has been satisfied. You can also use on premises data which is kind of interesting for your data sources. You can data pipeline data sources which are residing on on premise machines. The way that works is you install a task runner on your on premise machines, kind of like the Kinesis agent that we installed for Kinesis and then AWS data pipeline will communicate with the task runner and manage the flow of data and schedules accordingly. Now, in order to ensure that your pipeline activities are highly available, you can optionally assign multiple task runners to pull for a given job as well. So, very highly reliable system, a lot of different ways you can use it, but at the end of the day it’s used for chaining things together on a schedule and doing so in a reliable manner.
There are several different activities that a data pipeline can do and an activity is an action that AWS data pipeline initiates on your behalf as part of a pipeline. Different types of activities include EMR, which allows you to run arbitrary Amazon EMR jobs. Remember, it’s possible to spin up an EMR instance and run a sequence of steps automatically and then automatically terminate the cluster when it’s done. So that’s the sort of thing that a data pipeline event or activity rather might kick off. There’s also a Hive activity which allows you to execute hive queries on a schedule. There’s also a Copy activity that can be used to copy data between Amazon S Three and JDBC data sources or to run a SQL query and copy its output into Amazon S Three.
You can also have SQL query activities which do exactly what they sound like, they run SQL queries. And there are also command line scripts that can be triggered by activities. That activity allows you to run arbitrary Linux shell commands or programs as part of your data pipeline. You can specify this as a custom activity or will be automatically added if you’re using any of the templates specified by AWS during the pipeline creation. So a lot of times it can automate the creation of these things for you. Now by default an activity will retry three times before entering a hard failure state. You can increase the number of automatic retries up to ten if you want to. After an activity exhaust its attempts, it will trigger any configured on failure alarm and it will not try to run again unless we map annually issue a rerun command via the command line interface or the API or the console button. So that’s data pipeline in a nutshell, very simple concept all you need to remember is what it does, how it’s used, and the systems that it can integrate with.
- AWS Step Functions
Let’s do a highlevel overview of AWS step functions, very similar to data pipelines and concept here. Basically, its purpose in life is to let you design workflows throughout AWS. So as you’re building a system to analyze your data and preprocess that data, step functions might be what connects all those steps together to create a larger system. It also lets you visualize these workflows very easily, as we’ll see shortly rates, these nice little graphs to show you the order that things actually execute within. And it also has an advanced error handling mechanism and a retry mechanism that lives outside of the code that you’re actually running within your AWS step function. So that’s a good utility for being able to manage portions of your workflow that don’t work.
You don’t have to rely on the code of each step actually being able to monitor itself. AWS step functions can take a look at it from the outside and deal with errors when things don’t do what they’re supposed to do. It also maintains an audit history of all the workflows that have run. So you can go back and see what happened. Historically, if you need to troubleshoot something, it allows you to wait an arbitrary amount of time between steps. So if you just want to place some time in between these steps and sync up with some business workflow that you have, you can do that. And the maximum execution time of a given state machine that’s a given workflow is one year. So you can actually define a workflow that spans an entire year if you want to.
Can’t imagine a situation where you need to do that. But if you need to, it can handle very long lived processes and workflows. The syntax of how it all works really is not important. They’re defined using the JSON based Amazon States language or ASL. The exam will not expect you to be able to parse or write this kind of code or no ASL. You just need to know what it does. So as long as you know what step functions are for and some examples of what you might use it for, that’s what’s important for the exam. So we’re going to go through a few examples of how you might use step functions in the real world. Here’s an example of using it to train a machine learning model.
So you can see that when we start, we generate the data set. In this example, it’s doing that by kicking off a lambda function. It then trains the model using Sage Maker’s XGBoost algorithm. And when it’s done, it saves that trained model from Sage Maker and applies a batch transform to that model from some data set that we had. At that point, it’s done. So you can see here we’ve automated the entire process of generating our data, training a machine learning model, using that data, saving that trained model, and then applying data to that trained model. All in one set of steps here that are managed through step functions. We can also use it to tune a machine learning model. So in this example, we’re generating a training data set and then we’re going to kick off a hyper parameter tuning job in Sage Maker that just tries different parameters on the machine learning model to figure out what the right set of parameters are for this given data set.
When it’s done, it extracts that model path and then it saves the tuned model that it’s settled on. We extract the model name of the tuned model and then use that to apply a transformation of a bunch of data in a batch format to that tuned model automatically. So, again, just an example of what you might use step functions for. It’s not important for the exam to understand how machine learning works, but it is important to know that step functions can automate the process of generating your data set and feeding it into a machine learning model.
You can also use it, for example, to manage a batch job. So in this example, we’re submitting a batch job and then we’re just going to use step functions to notify us if it succeeds or fails. Very simple. So step functions don’t have to be complicated. They can just be used to monitor a single job and use those features that it has to actually monitor and alert us when something succeeds or fails from a given step. So sometimes step functions just have one step and that’s really all the depth you need for AWS step functions. You just need to know that they manage workflows and they give you a nice graphical representation of those workflows and they can monitor whether individual steps succeed or fail and notify you based on what happens there.