Saturday, 25 October 2014

Hortonworks Data Platform

Overview

Hortonworks Data Platform (HDP) provides a data platform for multi-workload data processing. It is aligned to the following functional areas:

  • Data Management
  • Data Access
  • Data Governance and Integration
  • Security
  • and Operations.

Hortonworks is based on Apache Hadoop.

Apache Hadoop

Apache Hadoop is a framework that provides to collect, store, analyze and manipulate massive quantities of data. It is a community driven open-source project goverened by the Apache Software Foundation (ASF).

Apache Hadoop, with the advent of YARN, supports many diverse workloads such as:

  • Interactive queries over large data with Hive on Tez
  • Realtime data processing with Apache Storm
  • Super scalable NoSQL datastore like HBase
  • in-memory datastore like Spark
  • and the list goes on.

The core of the framework is based on the following modules:

  • HDFS
  • MapReduce (aka YARN, aka "MapReduce 2.0 (MRv2)")

Cluster

A set of machines running HDFS and MapReduce is known as a Hadoop Cluster.

Nodes

Individual machines are known as nodes.

Cluster and Nodes

A cluster can have as few as one node to as many as several thousands. For most application scenarios Hadoop is linearly scalable, which means you can expect better performance by simply adding more nodes.

Master Node

Haddop Master Nodes oversee the two key functional pieces that make up Hadoop:

  • Storing lots of data (HDFS), and
  • Running parallel computations on all that data (Map Reduce)

MapReduce

MapReduce is a method for distributing a task across multiple nodes. Each node processes data stored on that node to the extent possible.

A running Map Reduce job consists of various phases such as Map -> Sort -> Shuffle -> Reduce

MapReduce Program

MapReduce is usually written in Java. But it can also be written in any scripting language using the Streaming API of Hadoop.

JobTracker

A job is a program with the ability of complete execution of Mappers and Reducers over a dataset. MapReduce jobs are controlled by a software daemon known as the JobTracker. The JobTracker resides on a 'master node'. Clients submit MapReduce jobs to the JobTracker. The JobTracker assigns Map and Reduce tasks to other nodes on the cluster.

TaskTracker

A task is the execution of a single Mapper or Reducer over a slice of data. The JobTracker assigns Map and Reduce tasks to other nodes on the cluster. These nodes each run a software daemon known as the TaskTracker. The TaskTracker is responsible for actually instantiating the Map or Reduce task, and reporting progress back to the JobTracker.

If a task attempt fails, another will be started by the JobTracker. Speculative execution can also result in more task attempts than completed tasks.

The MapReduce Mapper and Hadoop

Hadoop attempts to ensure that Mappers run on nodes which hold their portion of the data locally, to minimize network traffic. Multiple Mappers run in parallel, each processing a portion of the input data.

Mapper Parameters

The MapReduce Mapper reads data in the form of key/value pairs. It outputs zero or more key/value pairs:

map(in_key, in_value) -> (inter_key, inter_value) list

The MapReduce Reducer and Intermediate Keys

After the Map phase is over, all the intermediate values for a given intermediate key are combined together into a list. This list is given to a Reducer. There may be a single Reducer, or multiple Reducers, this is specified as part of the job configuration. All values associated with a particular intermediate key are guaranteed to go to the same Reducer.

The intermediate keys, and their value lists, are passed to the Reducer in sorted key order. This step is known as the 'shuffle and sort'. The Reducer outputs zero or more final key/value pairs. These are written to HDFS. In practice, the Reducer usually emits a single key/value pair for each input key.

Hadoop and Sluggishly Running MapReduce Mappers

It is possible for some Map tasks to take more time to complete than the others, often due to faulty hardware, or under-powered machines. This might cause a bottleneck as all mappers need to finish before any reducers can kick-off. Hadoop uses speculative execution to mitigate against such situations. If a Mapper appears to be running sluggishly than the others, a new instance of the Mapper will be started on another machine, operating on the same data. The results of the first Mapper to finish will be used. Hadoop will kill off the Mapper which is still running.

Hadoop Distributed File System (HDFS)

Hadoop Distributed File System (HDFS) is a Java-based file system that provides scalable, reliable (fault-tolerant), cost-efficient and distributed data storage for big-data, that is designed to span large clusters of commodity servers.

By distributing storage and computation across many servers, the combined storage resource can grow with demand while remaining economical at every size.

HDFS works closely with MapReduce.

An HDFS cluster is comprised of a NameNode which manages the cluster metadata and DataNodes that store the data.

DataNodes

HDFS stores the files in the file-system on units called DataNodes.

File Blocks

Files on the DataNodes are split into large blocks (typically 128 megabytes), and each block of the file is independently replicated at multiple DataNodes. A typically block replication is 3 but it can be set to be something else. The blocks are stored on the local file system on the DataNodes.

NameNode

A NameNode manages the HDFS cluster metadata. Files and directories are represented in the NameNode by units called INodes, while they are stored on DataNodes. INodes record attributes like permissions, modification and access times, or namespace and disk space quotas.

File Blocks and NameNode

The NameNode actively monitors the number of replicas of a block. When a replica of a block is lost due to a DataNode failure or disk failure, the NameNode creates another replica of the block. The NameNode maintains the namespace tree and the mapping of blocks to DataNodes, holding the entire namespace image in RAM.

The NameNode does not directly send requests to DataNodes. It sends instructions to the DataNodes by replying to heartbeats sent by those DataNodes. The instructions include commands to: replicate blocks to other nodes, remove local block replicas, re-register and send an immediate block report, or shut down the node.

HDFS and Random File Access

HDFS works on blocks of files. The default HDFS block size ranges from 64 MB to 128MB. So you cannot read one line here, one line there. You always read and write as the size of the default block size. This is fine when you want to process the whole file. But it makes HDFS unsuitable for some applications, like where you want to use an index to look up small records.

HBase over HDFS and Random File Access

HBase on the other hand is great for Random File Access. If you want to read a small record, you will only read that small record.

HBase uses HDFS as its backing store. So how does it provide efficient record-based access?

HBase loads the tables from HDFS to memory or local disk, so most reads do not go to HDFS. Mutations are stored first in an append-only journal. When the journal gets large, it is built into an "addendum" table. When there are too many addendum tables, they all get compacted into a brand new primary table. For reads, HBase is consulting:

  • First the journal is consulted
  • Then the addendum tables
  • And at last the primary table.

This system means that we only write a full HDFS block when we have a full HDFS block's worth of changes.

A more thorough description of this approach is in the Bigtable whitepaper.

What is "YARN"

"YARN", aka "MapReduce 2.0 (MRv2)", is the code-name for the MapReduce module that Hadoop is based on. It is the architectural center of Hadoop that enables you to process data simultaneously in multiple ways.

First MapReduce Program

We will write a Java MapReduce program that uses the Hadoop API

WordCount example reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occured, separated by a tab.

Each mapper takes a line as input and breaks it into words. It then emits a key/value pair of the word and 1. Each reducer sums the counts for each word and emits a single key/value with the word and sum.

As an optimization, the reducer is also used as a combiner on the map outputs. This reduces the amount of data sent across the network by combining each word into a single record.

Running the Example

To run the example, the command syntax is

hadoop jar hadoop-*-examples.jar wordcount [-m <#maps>] [-r <#reducers>] <in-dir> <out-dir>

All of the files in the input directory are read and the counts of words in the input are written to the output directory. It is assumed that both inputs and outputs are stored in HDFS. If your input is not already in HDFS, but is rather in a local file system somewhere, you need to copy the data into HDFS using a command like this:

hadoop dfs -copyFromLocal <local-dir> <hdfs-dir>