Saturday, December 26, 2015

Introduction : Cassandra Overview

What is Apache Cassandra


  •  Fast distributed Database
  •  High availability 
  •  Linear Scalability
  •  Predictable Performance
  •  No SPOF(single point of failure)
  •  Multi-DC
  •  Commodity hardware
  •  Easy to manage operationally
  •  Not a drop in replacement for RDBMS

Apache Cassandra is a fast distributed database build for high availability and linear scalability. We want to get a predictable performance out of our database. That means we can gaurantee SLA, Very low Latency. We know that as our cluster scales up, we gonna have same performance whether it's 5 nodes or 10 nodes. We know that we are not going to have a single point failure. One of the greatest thing about Cassandra is it is a peer-to-peer technology, so there is no master slave. There is no failover and there is no redirection. With open source we can do multi-DC. When we are talking high availability we want to withstand failure of an entire data centre. Cassandra can do that. You are absolutely not going to get that out of a RDBMS. We also want to deploy everything in a commodity hardware. We talked about how expensive it is to scale things vertically. Cassandra you are going to put on cheap hardware, who just gonna use a whole bunch of it. It is really easy to manage. It is extremely easy to manage operationally. 
One thing to keep in mind it is not a drop in replacement for RDBMS. So you are not going to take data center model and through it in casandra and hope it works. You are gonna have to design your application around cassandra data model and rules. 

Hash Ring:

  •  No master/slave/replica sets
  •  No config servers, zookepers
  •  Data is partitioned around the ring
  •  Data is replicated to RF=N servers
  •  All nodes hold data and can answer queries (both reads and writes)
  •  Location of data on ring is determined by partition key


If you wanna think about cassandra conceptually, you can think about it as a gaint hash ring where all nodes in the cluster are equal. When I say node I mean virtual machine, machine can actually be physical computers all participating in cluster all equal. Each node owns a range of hashes, like a bucket of hashes.  When you define a data model in cassandra, when you create a table one of the things that you specify is a primary key and part of that primary key is called partition key and partition key is what actually used when you insert data into cassandra, the values of that partition key is run through a consistent hashing function and depending upon the output we can figure out which bucket or which range of hashes that value fits into, which node we need to go talk to to actually distribute the data around the cluster. 
Another cool thing about Cassandra is data is replicated to multiple servers and that all of those servers, all of them are equal. There is no master slave, there is no zookeeper, there is no config servers. All nodes are equal and any node in the cluster can service any given read or write request for the cluster.


CAP Trade-offs

  •  Impossible to be both consistent and highly available during a network partition.
  •  Latency between data centres also make consistency impractical
  •  Cassandra choose Availability & Partition Tolerance over Consistency


One of the thing which is very important to understand in a database is how a cap theorem works, so the CAP Theorem says during network partition which means when computer cannot talk to each other either between data centre or a single network, then you can either choose consistency or you can get high availability. 
If two machine cannot talk and you do a write to them and you have to be completely consistent, then if they cannot talk to each other then system will appear as if it is down. So if we give up consistency then we can be highly available. So that's what cassandra chooses. It choose to be highly available in a network partition as opposed to be down. For a lot of application this is way better than down.

Another thing we need to know is from data center to data centre, lets say we were to take three data center around the world, one in US, one in Europe and one in Egypt, it is completely impracticle to try the consistency across data centers. We want to asynchoneously replicate our data from one DC to another, because it takes way to long from data to travel from US to Asia. We are limitted by the speed of light here, it is something we are never gonna go around, its not just gonna happen. Thats why we choose availability and that's how consistency is affacted.

Now we are going to talk about some of the dials that cassandra put into your hands as a developer to kind of control the idea of fault tolerance. So tring to talk about the CAP Theorm earlier, this idea of being more consistent or more available is kind of sliding scale and Cassandra doesn't impose one model on you. You got a couple of dials that you get to turn to configure this idea of being more consistent or being more available.

Replication

  •  Data is replicated automatically
  •  You pick number of servers
  •  Called Replication Factor or "RF"
  •  Data is always replicated to each replica
  •  If machine is down, missing data is replayed via hinted handoff


Replication or replication Factor abbrevated as RF, for people running in production Replication Factor is of 3. Replication Factor means how many copies of each piece of this data should there be in your cluster. So when I do a write to cassandra as can be seen from the slide, Clients writing to node A then A node got the copy, B node got the copy and C node also got the copy. 

Data is always Replicated to Cassandra. You set this replication factor when you get a key space, which is in cassandra is essentially is a collection of tables, is very similar to a schema in oracle or a database in mysql or Microsoft SQL Server. 

Replication happens asynchronously and if a machine is down while this replication is suppose to go on, then whatever node you going to be talking to, is going to save what called it hint and cassandra uses something called hinted handoff to be able to replay when that node comes back up and rejoin the cluster to replay all the writes that that node down missed.


Consistency Level

  •  Per query consistency
  •  ALL, QUORUM, ONE
  •  How many replicas for query to respond OK.


The other dial that Cassandra gives you to developer is something called consistency level. So you get to set this on any given read or write request that you do as a developer from your application talking to Cassandra. So I am going to show you an example of two of the most popular consistency level with the hope that that will kind of illustrate exactly what is consistency level means. But basically a consistency level means how many replicas do I need to hear when I do a read or write, for that read or that write is considered successful, so if I am doing a read how many replicas do I need to hear from before cassandra gives that data back to the client or if I am doing a write how many replicas need to say Yup! We got your data, we have writtne it to disk before cassandra replies to the client. 

So two most popular consistency level are consistency Level 1 which like the name sort of implies just means One Replica, so you can see that first example. Client writing A, A node got its copy and since we are writing with consistency Level of 1, we can acknoledge write back to client immediately 'Yup! we got the data'. The dash lines indicates that just because you write with a consistency level of 1 doesn't mean cassandra not going to owner your replicate factor. 

Now the other most popular Consistency Level people use a lot is QUORUM. QUORUM is essentially means a majority of replica, so in case of Replication factor of 3 this is 2 out of three for 51% or greater. So in this example again we got a client writting A, A node got the copy, B node got the copy and response and that point when we got two out of three replicas that have acknoledge, we can acknoledge back to the client 'Yes we got your Data'. Now again the dash lines indicate we are still going own the replication factor, we are just not waiting on that extra node to reply before we acknoledge back to the client.

You may think why I pick One Consistency Level over another, what kind of impact is going to have. One preety obvious one if you think about it, is how fast you can read and write data is definetely going to be impacted by what consistency level you choose. So if I am using a lower consistency level like say 1 and if I am only waiting on a single server I am able to read or write data very very quickly. Whereas if I am using a higher Consistency Level, it gonna be much slower to read and write data. 

Now the other thing that you gonna have to keep in your mind with consistency level is that it also going to affact your availability. As we talked about CAP Theorm Consistency Vs Availability. If I choose a higher consistency level we have to hear from more nodes where more nodes have to be online to acknoledge read and write then I am gonna be less available and less tolerant to nodes going down. Whereas if I choose a lower consistency level like 1 I am gonna be much more highly available, I am going to withstand 2 nodes going down and still be able to read and write in my cluster. 

Cassandra lets you pick this for every query you do, so it not gonna impose one model, user developer got to choose which consistency level is appropriate for which part of your application.

Multi DC

  •  Typical usage: Client write to local DC, replicates async to other DCs
  •  Replication Factor per key space per data centre
  •  Data centre can be physical or logical

One of the great thing about Cassandra because we got Asynchronous Replication, is that it is really easy to do multiple data centre. So when we do a write to a single data centre we can specify our consistency level, may be we can say ONE or QUORUM. If we get multiple data centre we say local QUORUM or local QUORUM. 

And now when that happen you get your right and happens to your local data centre then returns to the client and then say you have five data centre, the information you wrote to your first data centre is going to be asynchronously replicated to the other data centre around the world. This is how you can get super availability even when entire data centre goes down, so you can specify the replication factor per key space. So you can have one key space that have 5 replicas, one that has three, one that has one. It is completely up to you and it is completely configurable. 

It is important to understand that a data center can be logical or Physical. One of the thing that great about cassandra is that you can run it with a tool like spark and if you are doing that you may wanna have one data center which is your OLTP, serving your fast read to your application and then one data center virtually serving your OLAP queries and then doing that you can make sure your OLAP queries do not impact your OLTP stuff.

 Sources:  Data

Saturday, January 17, 2015

Introduction Big Data: Hadoop for Beginners

What is Big Data

Various scenarios where Big Data is being generated are:

  • Facebook generated 500 Terabyte of data everyday and do lots of analytics per day.
  • 1 Terabyte of data generated by an aeroplane. It keeps all the logs of running of aeroplane. We got to know the reason for crash. We need to analyze the data if a flight crashes so that we can avoid future failures. Huge amount of data is stored which include weather conditions also. We analyze this huge data only mostly when there is a mishap. 
  •  Lot of data being generated by Power Grid. Information of which electrical equipement using how much electricity right from our home. The electricity or power department need to know where exactly electricity is getting consumed. e.g. Resolve fight between with two states sharing same power grid.
  •  Stock Exchange - Which stocks going up and which gone down. Every minute detail of each and evry stock.
  •  Unstructured Data is where data is in the form of file or logs. There are no columns or rows or defined format of data. In Big data we have various unstrcutred data.

Why Distributed File System?

Suppose we need to read 1 TB of data. We have 1 machine which has 4 I/O channel and each channel has read speed of 100 MB/Sec. So time taken by 1 such machine to read 1 TB data will be: (1024x1024)/(4x100x60) = 45 min. 
If we have 500 TB of data it will not be possible to read with 1 machine in short time. It might take days or weeks to read this much data. 

Now we partition the data into 10 parts, store the 10 part in different machine and then we try to see how much time it will take to read. So for 1 TB now it will take 4.5 Min. 

This is same reason why we need Distribute File System.

Storing the data is not a big problem, real challange is reading that data quickly. I/O speed is the challange not storage capacity. To analyze the data we need to read the big data. We can have storage to store large size data but fast I/O is the concern. Reading from single place is not easy. We need a distributed system for fast read/write. So we need a system to store data in Distributed system.

What is Distributed File System

In DFS we build a a file system for machines at different physical level, to make these machine connected at logical level. So at the top of it, it looks like we are accessing a single file system but underlying hardware is sitting at different places. At a logical level they all come in directory structure.

There are these different machine sitting at different physical location so if I create a distributed file system, physically these files are still at different location but logically it looks like they are part of a single file system. We need to have multiple machine from which multiple read can be done in parallel, so to solve that problem we use multiple machine not a single place to do I/O. But at logical level they become part of a single file system.

What is Hadoop

  •  Apache Hadoop is a framework that allows for the distributed processing of large data sets across clusters of comodity computers using a simple programming model (MapReduce). If we have small data sets we should not use Hadoop.
  •  Companies using Hadoop: Yahoo, Google, Facebook, Amazon, AOL, IBM, Linkdn, ebay

Hadoop Core Components:


1. HDFS - Hadoop Distributed File System (For Storage)
2. MapReduce ( For Processing)

Master Slave concept:
NameAdmin Node is Master node, Name node are child nodes.
JobTracker - Associated with NameNode.
TaskTracker - Associated with DataNode.

What is HDFS

  •  Highly Fault Tolerant: - We replicate data on multiple machines ( minimum 3 machines) and that not by using very high Quality/availability machine. Yes it causes data redundancy but redundancy is pursued because this machines are commodity hardware. They are not high quality machines. There is a good chance they will keep on failing in between. That's why we replicate data.
  •  High throughput: - Throughput is the time to read/access the data. DFS model reduce the time. Distributed data makes access faster. It means we are making multiple parallel access to the data. 
  •  Suitable for application with large data sets: - Large sizes of file and small number of file. Huge data in one file is suited instead of small amount of data spread across multiple files.
  • Steaming access to file system data: Write once Read many times kind of operations. Streaming Access is: - Application where time to read the data is more important than writing and we are reading huge data/fetch huge data at once. e.g youtube video - How soon we can get the whole video is important rather than seek to a particular point. Similarly facebook logs, we do not look on what a particular person like we analyse the complete data together. That is when we write once and read multiple times.
  • Can be built of commodity hardware: Without using high end devices we can build HDFS. It can be your Personal Computer also.

The Design of HDFS

HDFS is a file system designed for storing "very large files" with "streaming data access" patterns, running clusters on "commodity hardware".
Very Large Files, Streaming Data Access and "Commodity Hardware" are three things which must be considered while desinging hadoop.

Areas where HDFS is not a Good Fit Today

  •  Low Latency data access - Where we have to quickly read a record (for Bank kind of transactions), we shouldn't move to HDFS even if data is huge.
  •  Lots of small files - Because for each file we have to store a metadata which will make things difficult to seek whenever you want to.
  •  Multiple writes, arbitrary file modification - If updating the same row again and again then traditional RDBMS fits best (Like Oracle, Teradata).
Hadoop is good when you need to read complete data in one shot and then do analysis over it. 

HDFS Components

 NameNode:
    "NameNode is the master node on which Job Tracker runs. Job Tracker is a daemon which run on the NameNode".
  •  Master of the system.
  •  Maintains and Manages the block which are present on the DataNode. It doesn't store any data.

Very reliable hardware, double-triple redundancy h/w, RAID. More expensive machine. So NameNode is like a Lamborgini.
DataNode
    "DataNode is again a commodity hardware on which another daemon called task tracker runs". 
  •  Slaves which are deployed on each machine and provide the actual storage 
  •  Responsible for serving read and write requests for the client.

DataNode are like Local Taxi. Less expenses are made on Hardware of it.

JobTracker and TaskTracker

Client: Clinet is an application which you use to interact with both the NameNode and DataNode which means to interact with JobTrackert and TaskTracker. Interaction from NameNode to DataNode is done by client.

Map Reduce is a programming model to retrieve and analyse data.

HDFS Architecture



Rack:  Storage Area where we have multiple Datanode put together. DataNode can be at different location physically. Together we can have DataNode stored together in a Rack. There could be multiple rack in a single location. 

Client intract with NameNode through HSS.

Replication: We replicate data/store multiple copy of the same data, for performing fault tolerance. Replication had to be maintained for this.

Block ops: Block size in HDFS is min 64 MB. In Linux default block size is 8192 bytes. HDFS is built over Unix.  

NameNode:
NameNode has metadata information. Data About Data. If some data is stored in DataNode, NameNode keep the information of which data in which DataNode. 
NameNode keeps the data in RAM, There is a Secondary NameNode which keep on reading the data from the NameNode (Data from the RAM of the nameNode) and writes into the hard disk.

Secondary NameNode:
Secondary NameNode is not the substitute of the NameNode. If the NameNode fails the system goes down that is the case in geniun hadoop. Secondary NameNode doesn't become active NamdeNode if the NameNode goes down. So in Hadoop there is a single point of failure when NameNode fails. And to recover the system you have to put another NameNode. This is Gen1 Had
In Gen2 Hadoop we have Active NameNode Passive NameNode kind of structure. If active NameNode goes, Passive NameNode can take care. 
NameNode are High availability system. 

Job Tracker Working Explained

Whole Hadoop Distributed file system behave as one file system though it is spread across multiple nodes.

User - Person who need some kind of data, has some queries, who want to analyze some data

Client could be NameNode Application or a Job tracker application. 


1. First user copy some kind of file, logs into DFS.
2. User Submit a job. Job means some kind of data need to be fetched
3. Client tries to Get the data from the DFS, (information like where the file are located, which data block, which rack). This information is supplied by the NameNode.
4. Create Splits: When a job submited, the job may be a big job, that could be broken down into smaller jobs. So Bigger job is broken into smaller jobs.
5. Upload Job Information: Information is uploaded to DFS. (job.xml, job.jar)
6. Client Submit the Job to Job Tracker.


7. Job Tracker go to DFS for file information.
8. Job Queue - Jobs that will be created by Splits and one by one these jobs will be picked. One job is picked from the job queue, then Job Tracker creates Maps and Reduces. These (Map and Reduce) are the programs that will be sent to the datanode for the processing. 

From DFS it has the information where it is stored. These Maps and Reduces are sent across the DataNodes.

Input Splits: Input Splits is a bigger file which has been broken down into bigger chunks.  For these input splits there may be as many maps created as the number of input splits.  If you have 40 Input splits there will be 40 Maps for it and based on that the reduces will be there.
Maps generated the data first, it generates some # key,value # pair. And on that particular key value pair the reduces work  to give us final output. Shorter, shufflers, combiners are some reducers. So this depend on how we write our Map Reduce programs. Map Reduce depend on what we really need to process.

Map and Reduce heart of Hadoop system. Map and Reduce are nothing but programs written in language of your choice. Programm can be written in Java or python or perl, c/C++ or any other language as per your need. 

Map processes the data locally on the DataNode. Key Value pair are generated after Map has processed the data. And on that particular intermediate Key Value pair reduces work and generate the output data.

The logic is entirely your own logic. How you are going to write it and is as per your need.

job.xml and job.jar are files which has information related to job, the job that has to be submitted.

Que: Number of Map equal to Number of Splits. Why?
Ans: Based on the input splits, the file which is splited will be placed on those many Datanodes. And on each Datanode it has to be processed. So for each data node you require a map to process the entire file and then a reducer. So number of Map is equal to Number od Splits.

Split is something created for the file, that is the input Splits. File then placed on the DataNodes. On the DataNode the Map has to sent across on to all DataNodes so that the entire file is processed. That is the reason why you require as many Maps as there are number of input splits. For each split a Map is needed. It is a single program send across by the JobTracker.
One Map program can be distributed to all DataNode but then the Maps are distributed. 



Heartbeat: To check communication between two nodes or health of a node.
Job Tracker picks up the task. All Task Tracker are sending "heartbeat" to Job Tracker to tell the job tracker that we are alive and we can be assinged tasks. 
Tracker knows which are the TaskTracker alives, then it pickup the task and assign it to Task Tracker

Blue color box is the job that need to be assign to the Task Tracker. Blue color are the Map which processes the data. White block reduces the data

DataWareHouse Concepts in Breif

What is DataWareHouse?


A warehouse is a place where something is stored a goods is stored. e.g. An eCommerce Website have a warehouse where they will project a demand for a product they will procureth (get/obtain) from client and store in the warehouse. As soon as the supplier places an order the good is immediately dispatched.

Taking Warehouse out of this equation, what do you get? You have the customer directly going to the eCommerce website, the eCommerce site is not storing any goods, when an order is placed the eCommerce website people will directly go to the supplier and ask for the product. 

Now if 100 supplier go to the manufacturer asking for the product becasue there is demand from the consumer, imagine the strained (adj - showing signs of nervous tension or tiredness) that is placed on the manufacturer to supply those products. That is looking from manufacturer percepective.
Now looking from customer perspective there will be delay in delivering the product which customer has ordered. Nobody would like a delay. If there is delay of 2-3 days it will affect customer satisfaction level and you will loose customer.

Similar to this is the "Data WareHouse", where you will be storing the data that you will precure from the transaction system.

There are two terms:
1. OLTP - Online Transaction Processing.
2. OLAP - Online Analytical Processing.

Analytical processing is where you use the Data Warehouse. Transaction processing is something that you will use to record each and every transaction.

Example 1 - ATM
 We all use ATMs. Every transaction we do in ATM is record in OLTP system. Besides ATM when you go to bank account and perform transaction, even that is recorded. So there are multiple sources feeding into a particular system. Now if you want to perfrom query on the system you will have to join multiple sources. Which have different formating type of their own. Second disadvantage is number of transaction - 100 of customers use an ATM in a given day, and millions of queries are being hit on OLTP system. Imagine the load going into OLTP system. So this system is definetely not used for Query purpose or Analytical purpose. This system is only used to record a transaction. 

Example 2 - Railway Reservation
There are many ways in which you can reserve a railway ticket. You can go through mobile, Directly go to railway station, railway website, go to n number of agents that are spread across the city. These are multiple desperate sources. Format of ticket booking on website is entrily different from booking it from mobile. Data Type and Kind of data is changing. Multiple deseparate sources make it difficult for quering. 

Based on this the enduser, at the end of the day would want reporting. He want data for his reporting purpose. So we create an alternate system. This is called as OLAP system.

As you can see from the diagram there are multiple sources Source 1, Source 2, Source 3 feeding into the Data WareHouse. This is not simple loading. Lots of calculating, varification is done before loading the data into the WareHouse and at the end you have multiple users trying to access this warehouse and trying to get data and generate reports.

Why do we need DataWareHouse

We all know this is an era of competition, it is not just taking smart decision but taking the decision on time.  Assume a super market chain is not implementing any datawarehouse. It is very difficult for the supermarket to analyse:
What product are sold
What product are not selling
When is the time selling goes up
What is the age of person buying a particular product.
None of these analysis is possible. What actually happen is some goods getting out some good getting in without knowing anything.

That is not an ideal scenario. I want to make a decision when I say this particular product hit among the age group 18-25. I will intensify my effort to market that particular product to that particular age group. Also from the data I see this particular product is not selling as it is expected. So I will analyse why and the reason for this. This particular product is not selling during a particular time (like Dec-Feb) or it is not selling throughout the year. Then I will be in a better situation to take the decision of either not to take the product or suggest some improvement so that the product start selling.

These are smarter decision which if you do not take on time customer will move out. 

When I talk about stretegic value  that is given to a company. Taking example of procurement department. Every company procures certain things from suppliers. There will be hundred of suppliers supplying a product to company (for example Desktop, Laptops, stationary etc). Now before making purchase the ompany definetely ink a contract with the supplier, saying if you are charging 100 for a particular product please give us at a discount of 80 Rupees and what are the terms that should be followed when procurement is made etc.
Now what is the gaurantee that the supplier is following all the terms set in the contract. When a particular purchase is made, the suppliers provide an invoice to the company. That invoice is record of that transaction. Now I want to match this invoice data with actual contract data to see whether all the terms are being matched or not. If the invoice says product has been charged 100 instead of 80,how you will analyse if the data is not there. 100 of suppliers are providing such invoices to the company. You cannnot go and sit with each and every suppliers. 
What we will do now is: We will load the data into dimensions and extract reports and see whether there is a match or mismmatch. If there is a mismatch we will schedule a meeting with the supplier and ask him why he has done so. This is stretegic advantage and also it gives the company more negotiating power. You can question for mismatched. But in the absence of data you end up paying more.

  •  Primary reason for a Datawarehouse is, for a company to get the extra edge over its competitors.
  •  This extra edge can be gained by taking smarter decisions in a timely manner.
  •  Smarter decision can be taken only if the executives responsible for taking such decision have data at their disposal.


There was a time when fact based decision and experienced based decision making is much more prevelant. We are moving from that area and going into an area fact based decision have gain importance in our life. So data is very essential.

Some stretegic Questions that a manager has to answer:
Q. How do we increase the market share of this company by 5%?
Q. Which product is not doing well in the market?
Q. Which agent need help with selling policies? 
Example from analysing data of Warehouse you can find an agent not selling policies as per expectations. You will identify whether there is deficiency in company or Employee. Does he need more training. 
Q. What is the quality of the customer service provided and what improvement are needed?

Why is DataWareHouse so important?


Picture: Why is DataWareHouse so important

What approach manager follow to arrive at final decision?

What is the quality of the customer service provided and what improvement are needed?

This is a larger question. Now he will break it down into smaller questions.

Subset Question 1: How many customer feedback we have in last 6 months? 
Now he will fire query on database which has all the customer details.

Subset Question 2: How many customer have given a feedback of Excellent, How many Average and How many Bad?
You will go to the database and by grouping you will find how many excellent, Average or Bad feedback you got.

Subset Question 3: Now I need to know why are people giving bad, Average and Excellent? What are the comments or improvement areas highlighted by customers who have rated us bad or average?
May be one person said your distribution channel is not good, may be some say support is not good, and that is the reason why they have given.  So we can identify why they have given that feedback.

All these questions combined you will have an overall picture of the quality of customer service and what improvement needed.

We will hit the data warehouse to get results of these questions, then consolidate these results and arrive at a final conclusion. In the absense of data there is no way of knowing.

One more important factor is trends. Because data ware house hold the entrire history. Jan 2015 to Aug 2014. It holds data till august 2014. With this data we can see what trend is happening:
  •  A Particular product is being sold at a particular month at higher level.
  •  A Particular product sale goes down at a particular month. 
  •  And Does that happens at a regular interval?

We can spot that trend this history. 

When we spot that trend, because of this history and when I spot that trend, I can make out common factor and I can take a decision to why that is happening. 
An operation system do not provide trends. They are transaction system and you may purge the data in an operation system over time. You cannot maintain from starting till end. But in DataWareHouse the main purpose is to maintain history. You will not be deleting history in immediate future.
Result are provided in ready to access format where you can take the analysis.

What is ETL?


The full form of ETL is Extract Transform and Load.

Now question are: 
  • What to extract? 
  • From where to Extract?
  • What to Transform
  • How to Transform?
  • Where to Load it?

As you know there are multiple desperate sources that are loading the data into the datawarehouse so you need to extract from the multiple different sources. There is no consistency in the data in the OLTP system, so you need to standardize data that is coming in and then you have to load it into the datawarehouse. 

Usually Most of the companies (Banking and Insurance sector) use Mainframe system, which are very legacy system (Very old) and very difficult for reporting. And now they are trying to move to DataWareHouse system.  

Usually in a production environment the files( or data) is extracted from the mainframe and send to a unix or windows server in a file format. Each file will have a specific file format. They can send multiple file as well depending on the requirement. 

Let's say they send the file at 3 AM in the early morning, so we process those files using an ETL tool. Some of the ETL tool are: Informatica and Data Stage. These are not open source. Talent is one open source.

We use any one of these ETL tool to cleanse the data. Cleansing is important because special character might come, emailID not in proper format etc. So the Unwanted spaces, unwanted character can to be removed, Data need to be cleansed using the ETL tools.

Then they(File, data from different sources) are loaded into an area called the Staging area. In the Staging area all the business rules are applied. For example there is a Bussiness rule saying that a particular record that is coming in always be presend in the master table record. If it is not present we will not be moving it further. So we will have to look up on the master table, if the record is present we will move it further next level else left it in Staging.
Then we load in into the Dimention table. Schedular are also available to run the job exactly at 3 AM ( or any particular time) or you can run the job when the file arrives. There can be time dependency as well as file dependency. 

At the end we need to validate whether the job has been executed successfully or the data has been loaded successfully or not.

DataWareHouse Architecture


There are multiple transactional systems Source 1, Source 2. Now these source are not only Mainframes, it can be SAP, Flat files. So there can be combination of sources. ETL tool is used to load the DataWareHouse and the DataMarts.

Difference between DataMart and DataWareHouse is: 
DataWareHouse is used across the organization and DataMart are used for individual customized reporting. For example there are multiple department in the company. Finance department is different from the marketing department. They all draw data from different sources and they need customized reporting. Finance department is mainly concern with the statistics, Marketing department is mainly concern with the promotions. The marketing department may not get the finance information. So for customized report they create subset of the datawarehouse which are called Datamarts.

There are two approaches:
1. First load the DataWareHouse and then load the DataMart.
2. Load the DataMart and then Load the DataWareHouse.

Advantage of these approaches I will be post in upcoming posts.

Repoting (Data Access Layer):
User actually accesses the DataWareHouse and generates the report. All these reporting tool are meant to make the front interface very easy for the consumer because people at decision making level usually not concerned with technical details. They are not concerned with writing queries and extracting reports. They are mainly concern with neat usable report only.

So all these reporting tool does that at the front end but at the backend they generate the queries, hit the datawarehouse and user get the report just in time. These reporting tool can also schedule the job to run and generates the report depending on a particular schedule. 

Advantages of DataWareHouse

  • Standardizes data across an organization.
  • Smarter decision for companies - Move toward fact based decisions 
  • Reduce Cost 
    • Drop products that are not doing well.
    • Negotiate for improvement with suppliers
  • Increase Revenue
    • Work on high selling products
    • Customer Satisfaction - Know what is working and what is not.

Thursday, January 15, 2015

Concept of Hadoop Architecture - Hadoop for Beginners

What is Big Data

  • Lots of Data (Terabytes or Petabytes)
  • Big data is the term for a collection of data sets so large and complex that it becomes difficult to process using on-hand database management tools or traditional data processing applications. The challenges include capture, curation, storage, search, sharing, transfer, analysis and visualization.
  • Systems/Enterprises generates huge amount of data from Terabytes to even Petabytes of information.
  • Lots of unstructured data.


Question: What does Big Data helps in?
Answer: Data insights, Analysis of various parameters of data, Extraction of useful information, Hidden insights about the business, Analytics.

Un-Structured Data is Exploding


Till 1990s there was hardly any un-structured data. The data which we had was Business transaction data which was used in Bank. Most of the data was completely structured.
With the growth of Application Data happened mostly in the new millennium when google, other search engine, Social Media came up, lots of people started analysing that data. 

e.g. Facebook make sure that right ads are shown to right people. If they are not able to do so they won't earn money. They check Activity logs of users like what they share, what they like, comment, pages you liked, Your Location, Work profile etc.

It is said that every year amount of data in universe is doubled.

IBM Definition of Big Data

Characteristics of Big Data - Volume, Velocity and Variety.
  • Volume: Volume is amount of data that is being generated. Data in TB or Petabytes are classified as Big Data.
  • Volocity: It is the speed at which data is being generated. e.g. Facebook generating 500TB of data everyday.
  • Variety: Variety is kind of data that is being generated. e.g. Audio file, Video file 

Common Big Data Customer Scenarios

  • Web and e-tailing
    • Recommendation Engine
  • Web and e-tailing 
    • Recommendation Engines 
    • Ad Targeting 
    • Search Quality 
    • Abuse and Click Fraud Detection 
  • Telecommunications 
    • Customer Churn Prevention 
    • Network Perf8rmance Optimization 
    • Calling Data Record (CDR) Analysis 
    • Analyzing Network to Predict Failure
  • Government 
    • Fraud Detection And Cyber Security 
    • Welfare schemes 
    • Justice
  • Healthcare & Life Sciences 
    • Health information exchange 
    • Gene sequencing 
    • Serialization 
    • Healthcare service quality improvements 
    • Drug Safety 
  • Banks and Financial services 
    • Modeling True Risk 
    • Threat Analysis 
    • Fraud Detection 
    • Trade Surveillance 
    • Credit Scoring And Analysis 
  • Retail 
    • Point of sales Transaction Analysis 
    • Customer Churn Analysis 
    • Sentiment Analysis 

Limitation of Existing Data Analytics Architecture


We collect the data from Instrumentation and store it is Grids. Huge data that will be stored in grid. 
Initially what happened is 90% of the data was getting Archived. After period of time this data become to huge to handle in the grid so they were archiving that data. 
Storage here is Grid of computers that storing in your data. Which you will be archiving after sometime. So at any point of time the active data which is available for processing is around 10% of the data and 90% of the data is archived. So you have limited availability of data to Analyse. 

Archiving here is a task of putting a active data (level 1) to a Level2 data in some tap. It is not actively used for analysis. You looked into Archived data only when there is some problem and you pull that data back. 

So challenge is you cannot store 90% of the data into Active, only 10% will be active so you could analyse only this much. 

Due ETL computer Grid limitation we will not be able to put huge data in it. This data then go to RDBMS and then we run BI Report+Interactive Apps to get insight into customer behaviour. 

Solution to Traditional Analytic Limitation:


Hadoop has solved that problem and you have complete data for analysis. Solution: A combined Storage Computer Layer. 

Issue is not with storing the data. We can store data but can we retrieve that data quickly and do analysis. NO.
 So we introduce DFS. If there is one machine which read data in 45 mins then 10 such machine will read the data in 4.5 mins. This is whole idea for going for DFS technic. 
 Sears moved to a 300 node Hadoop cluster to keep 100% of its data available for processing rather than a meagre 10% as was the case with existing Non-Hadoop solutions. Hadoop Distribute the data and you can read data in parallel from Hadoop. So here we can read from 300 nodes in parallel in compare to one very fast storage area network. 
 With this approach we have all the data available for analysis at any point of time. Besides this it cost lesser and a cheaper solution as we have commodity hardwares.

What is Hadoop:


  •  Apache Hadoop is a framework that allows for the distributed processing of large data sets across clusters of commodity computers using a simple programming model. This simple programming model is called Map Reduce.
  •  In  Hadoop you can scale to any number of node in a cluster. Hadoop gives us the power to distribute a data across cluster of commodity computers and this could be as big as 10,000 nodes, where data can be stored and retrieved in parallel (parallel processing) using programming model MapReduce.
  •  It is an open-source Data Management with scale-out storage & distributed processing.


Hadoop Key Characteristics:


Following are the Hadoop Features:

Flexible: You are not constraint by the number of nodes. You can keep on adding nodes on run time. You can delete or add node without putting the system down.

Reliable: The system has been built in such a way that when there are failures you still have data availability. There are data Replication, Node Replication for this.

Economical: There is no licence cost associated with it. Even if you buy Hadoop from claudera or MapReduce which are different distribution of Hadoop you still have to pay only for support and the distribution is free of cost. It's like RedHat Linux and not Microsoft windows.

Scalable: It is largly scalable. There is no limit on the number of node you can add. Teradata or Exadata cannot work if the data go beyond 10 TB.

Question: Hadoop is a framework that allow distributed processing of "Small Data Sets" or "Large Data Sets"?
Ans:  Hadoop is also capable to process small data-sets however to experience the true power of Hadoop one needs to have data in TB's because this is where RDBMS takes hours and fails whereas Hadoop does the same in couple of minutes. We wont be able to unlease the power of Hadoop if we use it with small data set.

Hadoop Eco-System:


Reason for Hadoop:
  •  If the data size is in TBs then it make sense to use Hadoop. If it is less than that it doesn't help
  •  Hadoop implementation will be cheaper than any other Exadata or Teradata. Hadoop does not require licencing issue because it is free. So millions of dollar which client giving on Oracle Exadata are saved. 
  •  If data runs on more than 10TB then it make sense to use Hadoop. Hadoop implementation will be cheaper than any other like Teradata or exadata.

Hadoop Components:

1. Sqoop: If you have structured data stored in RDBMS and you want to move it to Hadoop you will need "Sqoop" for that. Sqoop is the tool used for moving the data from RDBMS to Hadoop. Its the full form of SQL to Hadoop.

2. Flume: Flume is used to store unstrcutured or semi-structured data into Hadoop.e.g. If you need to load data from websites into Hadoop. 

3. Map Reduce Framework: Programming model which is used to read and write data from the Distributed File System. 

4. Hive: Datawarehousing System in Hadoop. Hive was developed by Facebook.

5. Pig: It is a data analysis tool in Hadoop developed by Yahoo.

6. Mahout: It is a machine learning framework. It is used for all the analysis like recommend the system, clustering analysis. Lot of Ecommerce and social media use Machine Learning. 

7. Apache Oozie: It is workflow maintaining tool used to run job in parallel in Hadoop. 

8. HDFS (Hadoop Distributed File System): Data storage of Hadoop.

Hadoop Core Components

Hadoop has two core components. 
HDFS - Hadoop Distributed File System (Storage)
  Set of cluster or cluster of machines which are combined together where data is stored. 
  There are two things: NameNode and DataNode. NameNode is Admin node while DataNode is where our regular data is stored. 
    Data is Distributed across "nodes" (DataNode).
    Natively redundant - All the data which is present on one node is replicated on other node as well to avoid failure. 
    NameNode tracks location. 
   
 MapReduce ( Processing)  - Programming model. 
For example, suppose elections had been taken place in 5 Booth locations in Delhi A, B, C, D and E location. To calculate the voters and declare the winner we took all the data from each location to a location S. At S location we compute all votes and declare the winner. 
This is a traditional way of voting. Issue with this approach is Moving the data from each Location A,B...E to location S. Cost of moving the data is much more than cost of doing the calculation. It is also time taking.
  Alternate solution could be you send your processing (Vote calculating Program) to these Booth Locations. So we got how many votes each party got at each booth. Then this Vote count data will be send by each Booth to a central location where we consolidate the data, process it and declare the Winner. So this is what MapReduce is. Here Map program is processing done at each Booth level and Reduce is processing done on consolidated data at the central location. 

Any task which can be done independent of other DataNode can be moved into a Map task. Where there is an Aggregation is required, you need a Reduce Job. So it also happen that in your analysis you never have a Reduce Job. You can do everything in a Map task. e.g. Encryption algorithm.
  •   Splits a task across processors.
  •   "near" the data & assembles results
  •   Self-Heading, High Bandwidth
  •   Clustered Storage.
  •   JobTracker manages the TaskTrackers.



On HDFS terms are NameNode and DataNode. On MapReduce terms are: Job Tracker and Task Tracker. 
  •  HDFS will have NameNode which will be Master Node and DataNode will be slave node.
  •  Job Tracker will be Master daemon which will run and distribute the task to Task Tracker. Task Tracker are the task running on the DataNode.

HDFS Architecture


All metadata operations happen at NameNode. Metadata is data about data like file name, location etc. A client will be intracting with NameNode for metadata operations. Then the client will either read or write. 

Rack is physical collection of all the DataNodes (There could 20 to 40 machines in a RAC), that is where these DataNodes are stored. Rack1 and Rack2 can be geo-graphically separated out. 

Client will be either your HDFS command line or it could be Java interface which you can intract with the Namenode. So End user always intract with the client for any operation. 

If I have to find out what DataNodes are free, where I can write, from where I can read a particular block, I need to talk to NameNode. NameNode will give that information then I will go ahead to read that data from either of the data node or write that data. 

There will be multiple RACKs on which DataNodes will be present. There will be multiple blocks on a particular DataNode, the data will be divided into multiple Blocks. These Blocks have specific size. 

If you have to read or write data, you first talk to a NameNode, Find out the availability of the data that you want to read or find the availability of space if you want to write it.  Then NameNode will tell you this is where you can read or write. Then the client go ahead and read/write from a particular location. So Client always read directly from DataNode, it just go to NameNode to find which DataNode keep the data because NameNode keeps the metadata. 

Initially NameNode will be knowing all my cluster is free then whenever you keep on writing there will be an ACK (Acknoledgment) given back to the NameNode based on which NameNode will maintain now which DataNode is occupied and which is still free to write. Data will be divided into block of either 64MB or 128MB depending upon the configuration. 

Cluster will be collection of RACK. At a single Geo-Graphical location we can store multiple DatNode in a RACK. There could be hundred of RACK in a cluster. 

As there DatNode are comodity computer, there are good chance of them failing. Hence a Replication is build-in in the Architecture itself. Each Data Block is replicated three times (default). Replication can be more than three also.

Main Component of HDFS

NameNode: Single machine in complete cluster. There is no replica for a NameNode. It is very high availability and high quality machine. It has the best configuration.

DataNode: Huge cheap machines like ambesder taxies, which could be commodity hardware. Less reliable. Your laptop can also be a commodity hardware. The are bound to fail. Not much quality. That's why overall cost of Hadoop Cluster is economic. 

NameNode Metadata
  • Meta-data in Memory:
    •  The entire metadata is in memory.
    •  No demand paging of FS meta-data. As entire metadata is in main memory all the time. Demand paging happens when we do not have certain data in memory. No Paging.

  • Type of Metadata
    •  List of files
    •  List of Blocks for each file
    •  List of DataNode for each block
    •  File attributes, e.g. access time, replication error

  •  A Transaction Log
    • Records file creations, file deletion etc. 

Secondary Name Node


If NameNode fails in Hadoop 1.o, NameNode is single point of failure if it fails by fire or any disaster.  
There is "Secondary Name Node". 
  • Secondary Namenode is not a hot standby for the NameNode: If the NameNode goes down, system will go down. Secondary NameNode is not going to take over. 
  • Housekeeping, backup of Namenode Metadata: But at the same time all the data which is on NameNode is getting checked-in or replicated on disk/tape by Secondary NameNode. So every few minutes the Secondary NameNode comes and take the metadata of the Namenode and store it in disk.
  • In case of failure, you will looks at the status of Secondary NameNode and restart the system from there. Secondary Namenode does not work as NameNode in case of failure. For recovery you will come back from a specific checkpoint say half an hour back NameNode has taken backup then you will only be able to recover system from half an hour back. There will be loss of data from last backup to time of failure.  Though the probability of failure is very less as we spend much and keep a quality best machine/server as NameNode.


In Hadoop 2.o we have introduced an Active and Passive NameNode. 

Job Tracker


How does the job tracker work:
1. User will first copy file into DFS which is move data in Hadoop DFS.
2. User will will submit the job to the client.
3. Client will get information about the input file.
4. Based on the information client will create input splits. ( Some configurations and consideration are used to create splits). Whole data is devided into multiple splits.
5. On these splits you will be running a job. This job will be MapReduce kind of job/program. job.xml and job.jar contains the actual job or program which will run on this. 
6. Once user submit a job, the job is actually submitted through a Job Tracker. Job Tracker is master daemon which run on the NameNode, which will be responsible for running this job into multiple data node.


7. After client submit the Job through Job Tracker, Job is initialized on the Job Queue. 
8. Maps and Reduces are created ( What is to be written on Map or Reduce is determined) based on what is contained in the job.xml and job.jar which is written in Java. So Job Tracker Read these two job files first.
9. Job Tracker creates Maps and Reduces. 
10. These Maps and Reduces tasks will run on these Input splits. Number of Map are equal to number of Input splits. Each input split will have a map running on it. Output of the Map task will go to the Reduce Task. These Map Tasks run on the DataNodes through Task Tracker. Splits are present on the DataNode.
There will be multiple instance of Map Job and one Map job will be assinged to one input split. 


11. Job Queue has all the Jobs which are associated with the Task Tracker. So Job Queue maintain all Tasks. Job Tracker has the Job Queue.
Job Tracker has to run a perticular task on a particular Data. It will pick Task tracker whichever Task Tracker has data it need to run Task on. As there can be multiple replication of it, it try to pick most local data. Task tracker is the actual that run the Map task on the DataNode.
Maps and Input splits are same. Input split is split of the data. Map is the program which run on the data. Input split will be residing on the DataNode. For each Input Split there will be a Map Task.

12. Now the job has been assinged to the Task Tracker. Job Tracker assign the task to Job Tracker to run on DataNode where the input splits are actually residing.

Heartbeat is associated with a Job Tracker and Task Tracker. Job Tracker keep on sending Heartbeats to Task Tracker to figure out whether it is alive or not. If it get the heartbeat back Job Tracker knows Task Tracker is running. 
If the heartbeat is lost, then the Job Tracker or NameNode will assume that the Machine is gone and it will assign the task to a different machine.

Anatomy of a File Write



1. The client will tell the Distributed File System that I want to write. 
2. Create command will be send to the NameNod5e. NameNode will come back and tell where is the availability. If the replication factor is three, NameNode will tell three DataNode where Client can write.
3. Client is going to pass on the the Packet to the first DataNode which is in the pipeline of the DataNodes. The Packet will contain Actual Data to be written, Location on which it should be written on the first DataNode and it also location of second, third DataNode where it will be written.
In a pipeline fashion Packet will be written to Each DataNode one by one. 
5. Once the write is complete on third DataNode also, last DatNode will send Acknoledgment. This acknoledgment will be send back to NameNode. WIth this NameNode will know the write has happened. Then NameNode will put the information in its metadata that this data is written in these DataNode. So Next time a read has to happen it will read from these DataNodes.
If NameNode doesn't get the Acknoledgement it will re-run the Job again. NameNode will get only one Acknoledgment not three. It will check the heartbeat, if DataNodes are OK, it will send the same Data/Packet to these DataNodes. If there is a failure with the DataNode, NameNode will send the Packet with different DataNode list. Even if data is written on two node, it will re-write that data.

Anatomy of a File Read



For read you go, Get the Block information and Read in Parallel. You go ahead and read that data,

Read happens in parallel. There is no acknoledgment with the read because recieving the data requested from DataNode is itself an acknolegment.

Q: Why do we read in Parallel and Why do we write in sequence?
Ans: Hadoop is a system which is used for write once and Read Many time kind of environment. For example logs which are written once and there are multiple analysis done on it. Read has to be very fast while the write side can be OK. 

Example: Uploading video on YouTube, many times it fails and it takes a longer time. While when we are reading a video from YouTube it is very fast.

Replication and Rack Awareness


We have three different Racks - RACK1, RACK2 and RACK3. Each Rack has multiple DataNode - Rack1 has DataNodes 1,2,3,4 Rack2 has DataNode 5,6,7,8. These Racks can be geo-graphically at different location. 

I have a Block A to write and Replication factor is three. 

First copy will be written to Rack1 in DataNode 1. Second copy of this will be written in different RACK (Rack2 here DataNode5). Third copy will be written in same RACK (Rack2) but in a different DataNode (DataNode6).

Reason for such write:
1. If we write whole data in same Rack then in case of Rack failure you will loose the data. So here if Rack1 fails we will still have data in Rack2.
2. Why don't we write third data in Rack3? Because Inter-Rack communication need huge bandwidth and the probability of two Rack failing at the same time is very low. With this trade of keep the third Data Block to same Rack. 
Based on this we write the first Block in whatever NameNode figure out closest to it and Second, Thirds Block are written to the Rack which is available but in different DataNode.

So NameNode should be Rack Aware. It should know which data in written in which DataNode and in which Rack. 

Q: In HDFS, blocks of a file are written in parallel, however the replication of the block are done sequentially. False/True.
Ans: True. A file is divided into Blocks, these blocks are written in parallel but the block replication happen in sequence.
If there are multiple packet that need to be written then the write will happen in parallel. However single packet will be written to Three DataNodes in pipeline/sequential format.

Question: A file of 400 MB is being copied to DFS. The system has finished copying 250MB. What happen if client try to access that file:
a). can read upto block that is successfully written.
    b). can read upto the last bit successfully written.
c). Will throw an Exception.
d). Cannot see the file untill its finish copying.
Ans: a). Client can read upto block that is successfully written.

Mongodb explain() Query Analyzer and it's Verbosity

First creating 1 million documents: > for(i=0; i<100; i++) { for(j=0; j<100; j++) {x = []; for(k=0; k<100; k++) { x.push({a:...