MS BI Guy Does Hadoop (Part 2 – Taking Hadoop for a Spin)

In part 1 of my Hadoop adventures, I walked you through the steps of setting the Cloudera virtual machine, which comes with CentOS and Hadoop preinstalled. Now, I’ll go through the steps to run a small Hadoop program for analyzing weather data. The program and the code samples can be downloaded from the source code that accompanies the book Hadoop: The Definitive Guide (3rd Edition) by Tom White. Again, the point of this exercise is to benefit Windows users who aren’t familiar with Unix but are willing to evaluate Hadoop in Unix environment.

Downloading the Source Code

Let’s start by downloading the book source and the sample dataset:

  1. Start the Cloudera VM, log in, and open the File Manager and create a folder downloads as a subfolder of the cloudera folder (this is your home folder because you log in to CentOS as user cloudera). Then, create a folder book under the downloads folder.
  2. Open Firefox and navigate to the book source code page, and click the Zip button. Then, save the file to the book folder.
  3. Open the File Manager and navigate to the /cloudera/downloads folder. Right-click the book folder and click Open Terminal Here. Enter the following command to extract the file:
    [cloudera@localhost]$ unzip tomwhite-hadoop-book-3e-draft-6-gc5b14af.zip
  4. Unzipping the file, creates a folder tomwhite-hadoop-book-c5b14af and extracts the files in it. To minimize the number of folder nesting, use the File Manager to navigate do the/book/tomwhite-hadoop-book-c5b14af folder, press Ctrl+A to copy all files and paste them into the /cloudera/downloads/books folder. You can then delete the tomwhite-hadoop-book-c5b14af folder.

    061012_0049_MSBIGuyDoes1

Building the Source Code

Next, you need to compile the source code and build the Java JAR files for the book samples.

Tip I failed to build the entire source code from the first try because my virtual machine ran out of memory when building the ch15 code. Therefore, before building the source, increase the memory of the Cloudera VM to 3 GB.

  1. Download and install Maven. Think of Maven as MSBUILD. You might find also the following instructions helpful to install Maven.
  2. Open the Terminal window (command prompt) and create the following environment variables so you don’t have to reference directly the Hadoop version and folder where Hadoop is installed:

    [cloudera@localhost]$ export HADOOP_HOME=/usr/lib/hadoop-0.20

    [cloudera@localhost]$ export HADOOP_VERSION=0.20.2-cdh3u4

  3. In the terminal window, navigate to the /cloudera/downloads/book and build the book source code with Maven using the following command. If the command is successful, it should show a summary that all projects are built successfully and place a file hadoop-examples.jar in the book folder.

    [cloudera@localhost book] $ mvn package -DskipTests -Dhadoop.version=1.0.2

  1. Next, copy the input dataset with the weather data that Hadoop will analyze. For testing purposes, we’ll use a very small dataset which represents the weather datasets kept by National Climatic Data Center (NCDC). Our task it to parse the files in order to obtain the maximum temperature per year. The mkdir command creates a /user/cloudera/input/ncdc folder in the Hadoop file system (HDFS). Next, we copy the file from the local file system to HDFS using put.

    [cloudera@localhost book]$ su root

    [root@localhost book]# /usr/bin/hadoop dfs -mkdir /user/cloudera/input/ncdc

    [root@localhost book]# /usr/bin/hadoop dfs -put ./input/ncdc/sample.txt /user/cloudera/input/ncdc

    hadoop dfs -ls /user/cloudera/input/ncdc

    -rw-r–r– 1 cloudera supergroup 529 2012-06-07 16:24 /user/cloudera/input/ncdc/sample.txt

The input file is a fixed-width file with the following content (I highlight the year and temperature sections).

0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999

0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999

0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999

0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999

0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

Analyzing Data

Now, it’s time to run the code sample and analyze the weather data.

  1. Run the MaxTemperature application.

[root@localhost book]# /usr/bin/hadoop MaxTemperature input/ncdc/sample.txt output

[cloudera@localhost book]$ hadoop MaxTemperature input/ncdc/sample.txt output

12/06/07 16:25:44 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

12/06/07 16:25:44 INFO input.FileInputFormat: Total input paths to process : 1

12/06/07 16:25:44 WARN snappy.LoadSnappy: Snappy native library is available

12/06/07 16:25:44 INFO util.NativeCodeLoader: Loaded the native-hadoop library

12/06/07 16:25:44 INFO snappy.LoadSnappy: Snappy native library loaded

12/06/07 16:25:45 INFO mapred.JobClient: Running job: job_201206071457_0008

12/06/07 16:25:46 INFO mapred.JobClient: map 0% reduce 0%

12/06/07 16:25:54 INFO mapred.JobClient: map 100% reduce 0%

12/06/07 16:26:05 INFO mapred.JobClient: map 100% reduce 100%

12/06/07 16:26:06 INFO mapred.JobClient: Job complete: job_201206071457_0008

12/06/07 16:26:06 INFO mapred.JobClient: Counters: 26

12/06/07 16:26:06 INFO mapred.JobClient: Job Counters

12/06/07 16:26:06 INFO mapred.JobClient: Launched reduce tasks=1

12/06/07 16:26:06 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=8493

12/06/07 16:26:06 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0

12/06/07 16:26:06 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0

12/06/07 16:26:06 INFO mapred.JobClient: Launched map tasks=1

12/06/07 16:26:06 INFO mapred.JobClient: Data-local map tasks=1

12/06/07 16:26:06 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10370

12/06/07 16:26:06 INFO mapred.JobClient: FileSystemCounters

12/06/07 16:26:06 INFO mapred.JobClient: FILE_BYTES_READ=61

12/06/07 16:26:06 INFO mapred.JobClient: HDFS_BYTES_READ=644

12/06/07 16:26:06 INFO mapred.JobClient: FILE_BYTES_WRITTEN=113206

12/06/07 16:26:06 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=17

12/06/07 16:26:06 INFO mapred.JobClient: Map-Reduce Framework

12/06/07 16:26:06 INFO mapred.JobClient: Map input records=5

12/06/07 16:26:06 INFO mapred.JobClient: Reduce shuffle bytes=61

12/06/07 16:26:06 INFO mapred.JobClient: Spilled Records=10

12/06/07 16:26:06 INFO mapred.JobClient: Map output bytes=45

12/06/07 16:26:06 INFO mapred.JobClient: CPU time spent (ms)=1880

12/06/07 16:26:06 INFO mapred.JobClient: Total committed heap usage (bytes)=196022272

12/06/07 16:26:06 INFO mapred.JobClient: Combine input records=0

12/06/07 16:26:06 INFO mapred.JobClient: SPLIT_RAW_BYTES=115

12/06/07 16:26:06 INFO mapred.JobClient: Reduce input records=5

12/06/07 16:26:06 INFO mapred.JobClient: Reduce input groups=2

12/06/07 16:26:06 INFO mapred.JobClient: Combine output records=0

12/06/07 16:26:06 INFO mapred.JobClient: Physical memory (bytes) snapshot=236310528

12/06/07 16:26:06 INFO mapred.JobClient: Reduce output records=2

12/06/07 16:26:06 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1078792192

12/06/07 16:26:06 INFO mapred.JobClient: Map output records=5

  1. Hadoop generates an output file (part-r-00000) that includes the job results, which we can see by browsing HDFS:

[root@localhost book]# hadoop dfs -ls /user/cloudera/output

Found 3 items

-rw-r–r– 1 cloudera supergroup 0 2012-06-07 16:26 /user/cloudera/output/_SUCCESS

drwxr-xr-x – cloudera supergroup 0 2012-06-07 16:25 /user/cloudera/output/_logs

-rw-r–r– 1 cloudera supergroup 17 2012-06-07 16:26 /user/cloudera/output/part-r-00000 

  1. Browse the content of the file:

[root@localhost book]# hadoop dfs -cat /user/cloudera/output/part-r-00000

1949 111 # the max temperature for 1949 was 11.1 Celsius

1950 22 # the max temperature for 1950 was 2.2 Celsius

Understanding the Map Job

The book provides detailed explanation of the source code. In a nutshell, the programmer has to implement:

  1. A Map job
  2. (Optional) a Reduce job – You don’t need a Reduce job when there is no need to merge the map results, such as when processing can be carried out entirely in parallel (see my note below).
  3. An application that ties the Mapper and the Reducer.

Note What I learned from the book is that Hadoop is not just about analyzing data. There is nothing stopping you to write a Reduce job that does some kind of processing to take advantage of the distributed computing capabilities of Hadoop. For example, the New York Times used Amazon’s EC2 compute cloud and Hadoop to process four terabytes of scanned public articles and convert them to PDFs. For more information, read the “Self-Service, Prorated Supercomputing Fun!” article by Derek Gottfrid.

The Java code of the Map class is shown below.

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper

extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final int MISSING = 9999;

@Override

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String year = line.substring(15, 19);

int airTemperature;

if (line.charAt(87) == ‘+’) { // parseInt doesn’t like leading plus signs

airTemperature = Integer.parseInt(line.substring(88, 92));

} else {

airTemperature = Integer.parseInt(line.substring(87, 92));

}

String quality = line.substring(92, 93);

if (airTemperature != MISSING && quality.matches(“[01459]”)) {

context.write(new Text(year), new IntWritable(airTemperature));

} }

The code simply parses the input file line by line to extract the year and temperature reading from the fixed-width input file. So, no surprises here. Imagine, you’re an ETL developer and decide to use code to parse a file instead of using the SSIS Flat File Source, which relies on a data provider to do the parsing for you. However, what’s interesting in Hadoop is that the framework is intrinsically parallel and distributes the ETL job on multiple nodes. The map function extracts the year and the air temperature and writes them to the Context object.

(1950, 0)

(1950, 22)

(1950, 11)

(1949, 111)

(1949, 78) 

Next, Hadoop processes the output, sorts it and converts it into key-value pairs. In this case, the year is the key, the values are the temperature readings.

(1949, [111, 78])

(1950, [0, 22, 11])

Understanding the Reduce Job

The Reducer class is simple:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer

extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override

public void reduce(Text key, Iterable<IntWritable> values,

Context context)

throws IOException, InterruptedException {

int maxValue = Integer.MIN_VALUE;

for (IntWritable value : values) {

maxValue = Math.max(maxValue, value.get());

}

context.write(key, new IntWritable(maxValue));

}

}

For each key-pair (year), the reducer job loops through pair values (temperature reading) and returns the maximum temperature.

Understanding the Application

Finally, you need an application that ties the Map and Reduce classes.

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {

public static void main(String[] args) throws Exception {

if (args.length != 2) {

System.err.println(“Usage: MaxTemperature <input path> <output path>”);

System.exit(-1);

}

Job job = new Job();

job.setJarByClass(MaxTemperature.class);

job.setJobName(“Max temperature”);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);

job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

} }

Summary

Although simple and unassuming, the MaxTemperature application demonstrates a few aspects of the Hadoop inner workings:

  1. You copy the input datasets (presumably huge files) to the Hadoop distributed file system (HDFS). Hadoop shreds the files into 64 MB blocks. Then, it replicates each block three times (assuming triple replication configuration): to the node where the command is executed, and two additional nodes if you have a multi-node Hadoop cluster to provide fault tolerance. If a node fails, the file can still be assembled from the working nodes.
  2. The programmer writes Java code to implement a map job, a reduce job, and an application that invokes them.
  3. The Hadoop framework parallelizes and distributes the jobs to move the MapReduce computation to each node hosting a part of the input dataset. Behind the scenes, Hadoop creates a JobTracker job on the name node and TaskTracker jobs that run on the data nodes to manage the tasks and report progress back to the JobTracker job. If a task fails, the JobTracker can reschedule the job to run on a different tasktracker.
  4. Once the map jobs are done, the sorted map outputs are received by the node where the reduce job(s) are running. The reduce job merges the sorted outputs and writes the result in an output file stored in the Hadoop file system for reliability.
  5. Hadoop is a batch processing system. Jobs are started, processed, and their output is written to disk.