Posts

Programming MapReduce Jobs with HDInsight Server for Windows

In a previous blog “Installing HDInsight Server for Windows”, I introduced you to the Microsoft HDInsight Server for Windows. Recall that HDInsight Server for Windows is a Windows-based Hadoop distribution that offers two main benefits for Big Data customers:

  • An officially supported Hadoop distribution on Windows server – Previously, you can set up Hadoop on Windows as an unsupported installation (via Cygwin) for development purposes. What this means for you is that you can now set up a Hadoop cluster on servers running Windows Server OS.
  • Extends the reach of the Hadoop ecosystem to .NET developers by allowing them to write MapReduce jobs in .NET code, such as C#.

And, in previous blogs, I’ve introduced you to Hadoop. Recall that there are two main reasons for using Hadoop for storing and processing Big Data:

  • Storage – You can store massive files in a distributed and fault-tolerant file system (HDFS) without worrying that hardware failure will result in a loss of data.
  • Distributed processing – When you outgrows the limitations of a single server, you can distribute job processing across the nodes in a Hadoop cluster. This allows you to perform crude data analysis directly on files stored in HDFS or execute any other type of jobs that can benefit from a parallel execution.

This blog continues the HDInsight Server for Windows journey. As many of you probably don’t have experience in Unix or Java, I’ll show you how HDInsight makes it easy to write MapReduce jobs on a Windows machine.

Note Writing MapReduce jobs can be complex. If all you need is performing some crude data analysis, you should consider an abstraction layer, such as Hive, which is capable for deriving the schema and generating the MapReduce jobs for you. This doesn’t mean that experience in MapReduce is not useful. When processing the files go beyond just imposing a schema on the data and querying the results , you might need programming logic, such as in The New York Times Archive case.

As a prerequisite, I installed HDInsight on my Windows 8 laptop. Because of its prerelease status, the CTP of HDInsight Server for Windows currently supports a single node only which is fine for development and testing. My task is to analyze the same dataset that I used in the MS BI Guy Does Hadoop (Part 2 – Taking Hadoop for a Spin) blog. The dataset (temp.txt) contains temperature readings from weather stations around the world and it represents the weather datasets kept by National Climatic Data Center (NCDC). You will find the sample dataset in the source code attached to this blog. It has the following content (the most important parts are highlighted in red: the year found in offset 15 and temperature found in offset 88).

0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999

0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999

0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999

0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999

0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

Note that the data is stored in its raw format and no schema was imposed on the data. The schema will be derived at runtime by parsing the file content.

Installing Microsoft .NET SDK for Hadoop

The Microsoft .NET SDK for Hadoop facilitates the programming effort required to code MapReduce jobs in .NET. To install it:

  1. Install NuGet first. NuGet is a Visual Studio extension that makes it easy to add, remove, and update libraries and tools in Visual Studio projects that use the .NET Framework.
  2. Open Visual Studio (2010 or 2012) and create a new C# Class Library project.
  3. Go to Tools ð Library Package Manager ð Package Manager Console.
  4. In the Package Manager Console window that opens in the bottom of the screen, enter:
    install-package Microsoft.Hadoop.MapReduce –pre

    This command will download the required Hadoop binaries and add them as references in your project.

Coding the Map Job

The Map job is responsible for parsing the input (the weather dataset), deriving the schema from it, and generating a key-value pair for the data that we’re interested in. In our case, the key will be the year and the value will be the temperature measure for that year. The Map class derives from the MapperBase class defined in Microsoft.Hadoop.MapReduce.dll.

122812_2018_Programming1

At runtime, HDInsight will parse the file content and invoke the Map method once for each line in the file. In our case, the Map job is simple. We parse the input and extract the temperature and year. If the parsing operation is successful, we return the key-value pair. The end result will look like this:

(1950, 0)

(1950, 22)

(1950, 11)

(1949, 111)

(1949, 78)

Coding the Reduce Job

Suppose that we want to get the maximum temperature for each year. Because each weather station might have multiple readings (lines in the input file) for the same year, we need to combine the results and find the maximum year. This is analogous to GROUP BY in SQL. The following Reduce job gets the work done:

122812_2018_Programming2

The Reduce job is even simpler. The Hadoop framework pre-processed the output of the Map jobs before it’s sent to the Reduce function. This processing sorts and groups the key-value pairs by key, so the input to the Reduce job will look like this:

(1949, [111, 78])

(1950, [0, 22, −11])

In our case, the only thing left for the Reduce job is to loop through the values for a given key (year) and return the maximum value, so the final output will be:

(1949, 111)

(1950, 22)

Testing MapReduce

Instead of deploying to Hadoop each time you make a change during the development and testing lifecycle, you can add another project, such as a Console Application, and use it as a test harness to test the MapReduce code. For your convenience, Microsoft provides a StreamingUnit class in Microsoft.Hadoop.MapReduce.dll. Here is what our test harness code looks like:

122812_2018_Programming3

The code uses a test input file. It reads the content of the file one line at the time and adds each line as a new element to an instance of ArrayList. Then, the code calls the StreamInsight.Execute method to initiate the MapReduce job.

Deploying to Hadoop

Once the code is tested, it’s time to deploy the dataset and MapReduce jobs to Hadoop.

  1. Deploy the file to the Hadoop HDFS file system.
    C:\Hadoop\hadoop-1.1.0-SNAPSHOT\bin>hadoop fs -copyFromLocal D:\MyApp\Hadoop\MapReduce\temp.txt input/Temp/input.txt

Note When you execute the hadoop command shell in the previous step, the file will be uploaded to your folder. However, if you use the JavaScript interactive console found in the HDInsight Dashboard, the file will be uploaded to the Hadoop folder in HDFS because the console runs under the hadoop user. Consequently, the MapReduce job won’t be able to find the file. So, you use the hadoop command prompt.

      2.   Browse the file system using the web interface (http://localhost:50070) to see that the file is in your folder.

122812_2018_Programming4

3.     Finally, we need to execute the job with HadoopJobExecutor, which be called in various ways. The easiest way is to use MRRunner
D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug>.\mrlib\mrrunner -dll FirstJob.dll

D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug>.\mrlib\mrrunner -dll FirstJob.dll

File dependencies to include with job:[Auto-detected] D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\FirstJob.dll

[Auto-detected] D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\Microsoft.Hadoop.MapReduce.dll

[Auto-detected] D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\Newtonsoft.Json.dll

>>CMD: c:\hadoop\hadoop-1.1.0-SNAPSHOT\bin\hadoop.cmd jar c:\hadoop\hadoop-1.1.0-SNAPSHOT\lib\hadoop-streaming.jar -D “mapred.map.max.attempts=1” -D “mapred.reduce.max.attempts=1” -input inpu

emp -mapper ..\..\jars\Microsoft.Hadoop.MapDriver.exe -reducer ..\..\jars\Microsoft.Hadoop.ReduceDriver.exe -file D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\MRLib\Microsoft.Hadoop.MapDriver.e

p\MapReduce\FirstJob\bin\Debug\MRLib\Microsoft.Hadoop.ReduceDriver.exe -file D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\MRLib\Microsoft.Hadoop.CombineDriver.exe -file “D:\MyApp\Hadoop\MapRedu

irstJob.dll” -file “D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\Microsoft.Hadoop.MapReduce.dll” -file “D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\Newtonsoft.Json.dll” -cmdenv “MSFT_HADOOP_MA

-cmdenv “MSFT_HADOOP_MAPPER_TYPE=FirstJob.TemperatureMapper” -cmdenv “MSFT_HADOOP_REDUCER_DLL=FirstJob.dll” -cmdenv “MSFT_HADOOP_REDUCER_TYPE=FirstJob.TemperatureReducer”

packageJobJar: [D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\MRLib\Microsoft.Hadoop.MapDriver.exe, D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\MRLib\Microsoft.Hadoop.ReduceDriver.exe, D:\MyApp

Job\bin\Debug\MRLib\Microsoft.Hadoop.CombineDriver.exe, D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\FirstJob.dll, D:\MyApp\Hadoop\MapReduce\FirstJob\bin\Debug\Microsoft.Hadoop.MapReduce.dll, D

e\FirstJob\bin\Debug\Newtonsoft.Json.dll] [/C:/Hadoop/hadoop-1.1.0-SNAPSHOT/lib/hadoop-streaming.jar] C:\Users\Teo\AppData\Local\Temp\streamjob7017247708817804198.jar tmpDir=null

12/12/28 12:35:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable

log4j:ERROR Failed to rename [C:\Hadoop\hadoop-1.1.0-SNAPSHOT\logs/hadoop.log] to [C:\Hadoop\hadoop-1.1.0-SNAPSHOT\logs/hadoop.log.2012-12-27].

12/12/28 12:35:20 WARN snappy.LoadSnappy: Snappy native library not loaded

12/12/28 12:35:20 INFO mapred.FileInputFormat: Total input paths to process : 1

12/12/28 12:35:20 INFO streaming.StreamJob: getLocalDirs(): [c:\hadoop\hdfs\mapred\local]

12/12/28 12:35:20 INFO streaming.StreamJob: Running job: job_201212271510_0010

12/12/28 12:35:20 INFO streaming.StreamJob: To kill this job, run:

12/12/28 12:35:20 INFO streaming.StreamJob: C:\Hadoop\hadoop-1.1.0-SNAPSHOT/bin/hadoop job -Dmapred.job.tracker=localhost:50300 -kill job_201212271510_0010

12/12/28 12:35:20 INFO streaming.StreamJob: Tracking URL: http://127.0.0.1:50030/jobdetails.jsp?jobid=job_201212271510_0010

12/12/28 12:35:21 INFO streaming.StreamJob: map 0% reduce 0%

12/12/28 12:35:38 INFO streaming.StreamJob: map 100% reduce 0%

12/12/28 12:35:50 INFO streaming.StreamJob: map 100% reduce 100%

12/12/28 12:35:56 INFO streaming.StreamJob: Job complete: job_201212271510_0010

12/12/28 12:35:56 INFO streaming.StreamJob: Output: output/Temp

4.   Using the web interface or the JavaScript console, go to the output folder and view the part-00000 file to see the output (should match your testing results).

122812_2018_Programming5

Installing HDInsight Server for Windows

As you’ve probably heard the news, Microsoft rebranded their Big Data offerings as HDInsight that currently encompasses two key services:

  • Windows Azure HDInsight Service (formerly known as Hadoop-based Services on Windows Azure) – This is a cloud-based Hadoop distribution hosted on Windows Azure.
  • Microsoft HDInsight Server for Windows – A Windows-based Hadoop distribution that offers two main benefits for Big Data customers:
    • An officially supported Hadoop distribution on Windows server – Previously, you can set up Hadoop on Windows as an unsupported installation (via Cygwin) for development purposes. What this means for you is that you can now set up a Hadoop cluster on servers running Windows Server OS.
    • Extends the reach of the Hadoop ecosystem to .NET developers and allows them to write MapReduce jobs in .NET code, such as C#.

Both services are available as preview offerings and changes are expected as they evolve. The Installing the Developer Preview of Apache Hadoop-based services on Windows article covers the setup steps pretty well. I decided to set up HDInsight Server for Windows by installing the Microsoft Web Platform Installer on my Windows 8 laptop.

Note Initially, I planned to install HDInsight Server for Windows on a VM running Windows Server 2012 Standard Edition. Although the installer completed successfully, it failed to create the sites and shortcuts to the dashboards (Hadoop Name Node, Dashboard, and MapRaduce). This was probably caused by the fact that server was configured as a domain controller. There is an ongoing discussion about this issue on the Microsoft HDInsight forum.

The Windows 8 setup failed to create the shortcut to the dashboard. However, the following steps fixed the issue:

1. Open up an Administrator PowerShell prompt and elevate the execution policy of the PowerShell to accept scripts.

PS:> Set-ExecutionPolicy RemoteSigned

2. Navigate to the C:\HadoopFeaturePackSetup\HadoopFeaturePackSetupTools folder:

cd C:\HadoopFeaturePackSetup\HadoopFeaturePackSetupTools

  • Install HadoopWebApi

.\winpkg.ps1 ..\Packages\HadoopWebApi-winpkg.zip install -CredentialFilePath c:\Hadoop\Singlenodecreds.xml

  • Install the dashboard

.\winpkg.ps1 ..\Packages\HadoopDashboard-winpkg.zip install -CredentialFilePath c:\Hadoop\Singlenodecreds.xml

This should create the shortcuts on the desktop and you should be able to navigate to http://localhost:8085 to access the dashboard.

110112_0205_InstallingH1

From here, you can open the Interactive Console and your experience should be the same as Windows Azure HDInsight Service. David Zhang has a great coverage of how you can use the Interactive Console in his video presentation “Introduction to the Hadoop on Azure Interactive JavaScript Console”.

BTW, HDInsight Server installs a set of Windows services corresponding to the UNIX daemons when Hadoop is installed on UNIX.

110112_0205_InstallingH2

Hadoop and Big Data Tonight with Atlanta BI Group

Atlanta BI Group is meeting tonight. The Topic is Hadoop and Big Data by Ketan Dave and our sponsor is Enterprise Software Solutions.

With wide acceptance of open source technologies , Hadoop/Map Reduce has become a viable option when it comes implementing the 100 of Terabytes to Petabytes of Data solutions. Scalability, Reliability , Versatility and Cost benefits of Hadoop based system is replacing traditional approach of data solutions. Microsoft has partnered with Hadoop vendors, have recently made announcements to make data on Hadoop accessible by Excel, easily linked to SQL Server and its business intelligence, analytical and reporting tools for business intelligence and managed through Active Directory.

I hope you can make it!

MS Guy Does Hadoop (Part 4 – Analyzing Data)

In my previous blog, I talked about Hive. Hive provides a SQL-like layer on top of Hadoop so you don’t have write tons of MapReduce code to query Hadoop and to aggregate and join data. To facilitate working with Hive, Microsoft introduced a Hive ODBC driver (as of this writing, the driver is only available to Hadoop on Azure CTP subscribers). You can use this driver to connect to Hive running on Microsoft Azure or your local Hadoop server. Danny Lee has provided detailed instructions of how to do the former. I’ll show you how to use it to connect to your local Hive server.

Start the Hive Server

If you use the Cloudera VM, the Hive server is not running by default. This service allows external clients to connect to Hive. To start it:

  1. Configure your Cloudera VM to obtain an IP address on your network. To do so in Oracle Virtual Box, go to the VM settings (Network tab), and change the network adapter to Bridge Adapter.

063012_2132_MSGuyDoesHa1

  1. Start the Cloudera VM and open the command prompt.
  2. Note the IP address assigned to the VM:

[cloudera@localhost ]$ ifconfig

[cloudera@localhost ~]$ ifconfig

eth0 Link encap:Ethernet HWaddr 08:00:27:A0:6C:DC

inet addr:192.168.1.111 Bcast:192.168.1.255 Mask:255.255.255.0

UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1

RX packets:4320 errors:0 dropped:0 overruns:0 frame:0

TX packets:2122 errors:0 dropped:0 overruns:0 carrier:0

collisions:0 txqueuelen:1000

RX bytes:3762720 (3.5 MiB) TX bytes:251411 (245.5 KiB)

       3..  If your host OS is Windows, edit the C:\Windows\System32\drivers\etc\host file and add an entry for that address, e.g.:

192.168.1.111    cloudera

4.  Ping the VM from the host OS to make sure it responds on the DNS name

C:> ping cloudera

5.  Start the Hive server using this command:

[cloudera@localhost ]$ hive –service hiveserver

By default, the Hive server listens on port 10000.

Analyze Data in Excel

There are two ways to bring Hive results in Excel and both options require the Hive ODBC driver:

  • You can use the Hive Pane to import data. This option provides a basic user interface, called a Hive Pane, which is capable of auto-generating Hive queries.
  • Import Hive tables directly into PowerPivot for Excel.

Using the Hive Pane

Once you install the Hive ODBC driver, you’ll get a new button in the Data ribbon group called Hive Pane.

  1. Click the Enter Cluster Details button. In the Host field, enter whatever name you specified in the host file (cloudera in my case). Note that the default port is set to 10000. Click OK. You shouldn’t see errors at this point.
  2. Expand the Select the Hive Object to Query and select a table. Select which columns you want to bring in. Optionally, specify criteria, aggregate grouping, and ordering. Notice that by default, the driver brings the first 200 rows but you can use the Limit Rows section to overwrite the default.
  3. Click Execute Query to run the query and generate a table in Excel.
  4. From there on, you can use the Excel native PivotTable and PivotChart reports to analyze data or link the data to PowerPivot.

063012_2132_MSGuyDoesHa2

Importing Data in PowerPivot

The second option is to bypass the Hive Pane and import a Hive table directly into PowerPivot. To do so, you need to set up a file data source first.

  1. In Windows, go to Administrative Tools and click Data Sources (ODBC).
  2. In the ODBC Data Source Administrator, click the File DSN tab, and then click the Add button.
  3. In the Create New Data Source dialog box, select the HIVE driver.

063012_2132_MSGuyDoesHa3

  1. Click Next and save the file data source, such as in the C:\Users\Teo\Documents\My Data Sources folder. Ignore the warning that pops up.
  2. Back to the ODBC Data Source Administrator (File DSN tab), browse to the folder where you saved the file data source, select it, and click Configure. That will bring you to the same ODBC Hive Setup where you specify the Hadoop server name and port. Close the ODBC Data Source Administrator.
  3. Back to Excel, click the PowerPivot ribbon menu, and then click the PowerPivot Window.
  4. In the PowerPivot Window Home tab, click the From Other Sources button in the Get External Data ribbon group.
  5. In the Table Import Wizard, select the Others (OLEDB/ODBC) option, and then click Next.
  6. In the Specify a Connection String, click the Build button to open the Data Link Properties.
  7. Select the Provider tab and then select the Microsoft OLE DB Provider for ODBC Drivers.

    063012_2132_MSGuyDoesHa4

  8. Select the Connection tab. Select the Use Connection String option, and then click the Build button.
  9. In the Select Data Source dialog box, browse to the folder where you saved the file data source, select it, and then click OK to return back to the Data Link Properties.

063012_2132_MSGuyDoesHa5

The Connection String field should now be populated with the following text:

DRIVER={HIVE};Description=;HOST=cloudera;DATABASE=default;PORT=10000;FRAMED=0;AUTHENTICATION=0;AUTH_DATA=;UID=;PWD=

10.   Click the Test Connection button to verify connectivity. Click OK to return to the Table Import Wizard which should now have the following connection string:

Provider=MSDASQL.1;Persist Security Info=False;Extended Properties=”DRIVER={HIVE};Description=;HOST=cloudera;DATABASE=default;PORT=10000;FRAMED=0;AUTHENTICATION=0;AUTH_DATA=;UID=;”

Follow the wizard, to import the Hive tables as you would with any other data source.

MS Guy Does Hadoop (Part 3 – Hive)

Writing MapReduce Java jobs might be OK for simple analytical needs or distributing processing jobs but it might be challenging for more involved scenarios, such as joining two datasets. This is where Hive comes in. Hive was originally developed by the Facebook data warehousing team after they concluded that “… developers ended up spending hours (if not days) to write programs for even simple analyses”. Instead, Hive offers a SQL–like language that is capable of auto-generating the MapReduce code.

The Hive Shell

Hive introduces the notion of a “table” on top of data. It has its own shell which can be invoked by typing “hive” in the command window. The following command shows the Hive tables. I have defined two tables: records and records_ex.

[cloudera@localhost book]$ hive

hive> show tables;

OK

records

records_ex

Time taken: 4.602 seconds

hive>

 

Creating a Managed Table

Suppose you have a file with the following tab-delimited format:

1950    0    1

1950    22    1

1950    -11    1

1949    111    1

1949    78    1

 

The following Hive statement creates a records table with three columns.

hive> CREATE TABLE records (year STRING, temperature INT, quality INT)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ‘\t’; 

Next, we use the LOAD DATA statement to populate the records table with data from a file located on the local file system:

LOAD DATA LOCAL INPATH ‘input/ncdc/micro-tab/sample.txt’

OVERWRITE INTO TABLE records; 

This causes Hive to move the file to its repository on local file system (/hive/warehouse). Therefore, by default, Hive will manage the table. If you drop the table, Hive will delete the source data.

Creating an External Table

What if the data is already in HDFS and you don’t want to move the files? In this case, you can tell Hive that the table will be external to Hive and you’ll manage the data. Suppose that you’ve already copied the sample.txt file to HDFS:

>hive[cloudera@localhost ~]$ hadoop dfs -ls /user/cloudera/input/ncdc

Found 1 items

-rw-r–r– 1 cloudera supergroup 529 2012-06-07 16:24 /user/cloudera/input/ncdc/sample.txt 

Next, we tell Hive to create an external table:

CREATE EXTERNAL TABLE records_ex (year STRING, temperature INT, quality INT)

LOCATION ‘/user/cloudera/records_ex’;

LOAD DATA INPATH ‘/input/ncdc/sample.txt’

OVERWRITE INTO TABLE records_ex 

The EXTERNAL clause causes Hive to leave the data where it is without even checking if the file exists. The INPATH clause points to the source file. The OVEWRITE clause causes the existing data to be removed.

Querying Data

The Hive SQL variant language is called HiveQL. HiveQL does not support the full SQL-92 specification as this wasn’t a design goal. The following two examples show how to select all data from our table.

hive> select * from records_ex;

OK

1950 0 1

1950 22 1

1950 -11 1

1949 111 1

1949 78 1

Time taken: 0.527 seconds 

hive> SELECT year, MAX(temperature)

> FROM records

> WHERE temperature != 9999

> AND quality in (1,2)

> GROUP BY year;

Total MapReduce jobs = 1

Launching Job 1 out of 1

Number of reduce tasks not specified. Estimated from input data size: 1

In order to change the average load for a reducer (in bytes):

set hive.exec.reducers.bytes.per.reducer=<number>

In order to limit the maximum number of reducers:

set hive.exec.reducers.max=<number>

In order to set a constant number of reducers:

set mapred.reduce.tasks=<number>

Starting Job = job_201206241704_0001, Tracking URL = http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201206241704_0001

Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201206241704_0001

2012-06-24 18:21:15,022 Stage-1 map = 0%, reduce = 0%

2012-06-24 18:21:19,106 Stage-1 map = 100%, reduce = 0%

2012-06-24 18:21:30,212 Stage-1 map = 100%, reduce = 100%

Ended Job = job_201206241704_0001

OK

1949 111

1950 22

Time taken: 26.779 seconds 

As you can see from the second example, Hive generates a MapReduce job. Please don’t make any conclusions from the fact that this simple query takes 26 seconds on my VM while it would take a millisecond to execute on any modern relational database. It takes quite a bit of time to instantiate MapReduce jobs and end users probably won’t query Hadoop directly anyway. Besides, the performance results will probably look completely different with hundreds of terabytes of data.

In a future blog on Hadoop, I plan to summarize my research on Hadoop and recommend usage scenarios.

MS BI Guy Does Hadoop (Part 2 – Taking Hadoop for a Spin)

In part 1 of my Hadoop adventures, I walked you through the steps of setting the Cloudera virtual machine, which comes with CentOS and Hadoop preinstalled. Now, I’ll go through the steps to run a small Hadoop program for analyzing weather data. The program and the code samples can be downloaded from the source code that accompanies the book Hadoop: The Definitive Guide (3rd Edition) by Tom White. Again, the point of this exercise is to benefit Windows users who aren’t familiar with Unix but are willing to evaluate Hadoop in Unix environment.

Downloading the Source Code

Let’s start by downloading the book source and the sample dataset:

  1. Start the Cloudera VM, log in, and open the File Manager and create a folder downloads as a subfolder of the cloudera folder (this is your home folder because you log in to CentOS as user cloudera). Then, create a folder book under the downloads folder.
  2. Open Firefox and navigate to the book source code page, and click the Zip button. Then, save the file to the book folder.
  3. Open the File Manager and navigate to the /cloudera/downloads folder. Right-click the book folder and click Open Terminal Here. Enter the following command to extract the file:
    [cloudera@localhost]$ unzip tomwhite-hadoop-book-3e-draft-6-gc5b14af.zip
  4. Unzipping the file, creates a folder tomwhite-hadoop-book-c5b14af and extracts the files in it. To minimize the number of folder nesting, use the File Manager to navigate do the/book/tomwhite-hadoop-book-c5b14af folder, press Ctrl+A to copy all files and paste them into the /cloudera/downloads/books folder. You can then delete the tomwhite-hadoop-book-c5b14af folder.

    061012_0049_MSBIGuyDoes1

Building the Source Code

Next, you need to compile the source code and build the Java JAR files for the book samples.

Tip I failed to build the entire source code from the first try because my virtual machine ran out of memory when building the ch15 code. Therefore, before building the source, increase the memory of the Cloudera VM to 3 GB.

  1. Download and install Maven. Think of Maven as MSBUILD. You might find also the following instructions helpful to install Maven.
  2. Open the Terminal window (command prompt) and create the following environment variables so you don’t have to reference directly the Hadoop version and folder where Hadoop is installed:

    [cloudera@localhost]$ export HADOOP_HOME=/usr/lib/hadoop-0.20

    [cloudera@localhost]$ export HADOOP_VERSION=0.20.2-cdh3u4

  3. In the terminal window, navigate to the /cloudera/downloads/book and build the book source code with Maven using the following command. If the command is successful, it should show a summary that all projects are built successfully and place a file hadoop-examples.jar in the book folder.

    [cloudera@localhost book] $ mvn package -DskipTests -Dhadoop.version=1.0.2

  1. Next, copy the input dataset with the weather data that Hadoop will analyze. For testing purposes, we’ll use a very small dataset which represents the weather datasets kept by National Climatic Data Center (NCDC). Our task it to parse the files in order to obtain the maximum temperature per year. The mkdir command creates a /user/cloudera/input/ncdc folder in the Hadoop file system (HDFS). Next, we copy the file from the local file system to HDFS using put.

    [cloudera@localhost book]$ su root

    [root@localhost book]# /usr/bin/hadoop dfs -mkdir /user/cloudera/input/ncdc

    [root@localhost book]# /usr/bin/hadoop dfs -put ./input/ncdc/sample.txt /user/cloudera/input/ncdc

    hadoop dfs -ls /user/cloudera/input/ncdc

    -rw-r–r– 1 cloudera supergroup 529 2012-06-07 16:24 /user/cloudera/input/ncdc/sample.txt

The input file is a fixed-width file with the following content (I highlight the year and temperature sections).

0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999

0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999

0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999

0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999

0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

Analyzing Data

Now, it’s time to run the code sample and analyze the weather data.

  1. Run the MaxTemperature application.

[root@localhost book]# /usr/bin/hadoop MaxTemperature input/ncdc/sample.txt output

[cloudera@localhost book]$ hadoop MaxTemperature input/ncdc/sample.txt output

12/06/07 16:25:44 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

12/06/07 16:25:44 INFO input.FileInputFormat: Total input paths to process : 1

12/06/07 16:25:44 WARN snappy.LoadSnappy: Snappy native library is available

12/06/07 16:25:44 INFO util.NativeCodeLoader: Loaded the native-hadoop library

12/06/07 16:25:44 INFO snappy.LoadSnappy: Snappy native library loaded

12/06/07 16:25:45 INFO mapred.JobClient: Running job: job_201206071457_0008

12/06/07 16:25:46 INFO mapred.JobClient: map 0% reduce 0%

12/06/07 16:25:54 INFO mapred.JobClient: map 100% reduce 0%

12/06/07 16:26:05 INFO mapred.JobClient: map 100% reduce 100%

12/06/07 16:26:06 INFO mapred.JobClient: Job complete: job_201206071457_0008

12/06/07 16:26:06 INFO mapred.JobClient: Counters: 26

12/06/07 16:26:06 INFO mapred.JobClient: Job Counters

12/06/07 16:26:06 INFO mapred.JobClient: Launched reduce tasks=1

12/06/07 16:26:06 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=8493

12/06/07 16:26:06 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0

12/06/07 16:26:06 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0

12/06/07 16:26:06 INFO mapred.JobClient: Launched map tasks=1

12/06/07 16:26:06 INFO mapred.JobClient: Data-local map tasks=1

12/06/07 16:26:06 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10370

12/06/07 16:26:06 INFO mapred.JobClient: FileSystemCounters

12/06/07 16:26:06 INFO mapred.JobClient: FILE_BYTES_READ=61

12/06/07 16:26:06 INFO mapred.JobClient: HDFS_BYTES_READ=644

12/06/07 16:26:06 INFO mapred.JobClient: FILE_BYTES_WRITTEN=113206

12/06/07 16:26:06 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=17

12/06/07 16:26:06 INFO mapred.JobClient: Map-Reduce Framework

12/06/07 16:26:06 INFO mapred.JobClient: Map input records=5

12/06/07 16:26:06 INFO mapred.JobClient: Reduce shuffle bytes=61

12/06/07 16:26:06 INFO mapred.JobClient: Spilled Records=10

12/06/07 16:26:06 INFO mapred.JobClient: Map output bytes=45

12/06/07 16:26:06 INFO mapred.JobClient: CPU time spent (ms)=1880

12/06/07 16:26:06 INFO mapred.JobClient: Total committed heap usage (bytes)=196022272

12/06/07 16:26:06 INFO mapred.JobClient: Combine input records=0

12/06/07 16:26:06 INFO mapred.JobClient: SPLIT_RAW_BYTES=115

12/06/07 16:26:06 INFO mapred.JobClient: Reduce input records=5

12/06/07 16:26:06 INFO mapred.JobClient: Reduce input groups=2

12/06/07 16:26:06 INFO mapred.JobClient: Combine output records=0

12/06/07 16:26:06 INFO mapred.JobClient: Physical memory (bytes) snapshot=236310528

12/06/07 16:26:06 INFO mapred.JobClient: Reduce output records=2

12/06/07 16:26:06 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1078792192

12/06/07 16:26:06 INFO mapred.JobClient: Map output records=5

  1. Hadoop generates an output file (part-r-00000) that includes the job results, which we can see by browsing HDFS:

[root@localhost book]# hadoop dfs -ls /user/cloudera/output

Found 3 items

-rw-r–r– 1 cloudera supergroup 0 2012-06-07 16:26 /user/cloudera/output/_SUCCESS

drwxr-xr-x – cloudera supergroup 0 2012-06-07 16:25 /user/cloudera/output/_logs

-rw-r–r– 1 cloudera supergroup 17 2012-06-07 16:26 /user/cloudera/output/part-r-00000 

  1. Browse the content of the file:

[root@localhost book]# hadoop dfs -cat /user/cloudera/output/part-r-00000

1949 111 # the max temperature for 1949 was 11.1 Celsius

1950 22 # the max temperature for 1950 was 2.2 Celsius

Understanding the Map Job

The book provides detailed explanation of the source code. In a nutshell, the programmer has to implement:

  1. A Map job
  2. (Optional) a Reduce job – You don’t need a Reduce job when there is no need to merge the map results, such as when processing can be carried out entirely in parallel (see my note below).
  3. An application that ties the Mapper and the Reducer.

Note What I learned from the book is that Hadoop is not just about analyzing data. There is nothing stopping you to write a Reduce job that does some kind of processing to take advantage of the distributed computing capabilities of Hadoop. For example, the New York Times used Amazon’s EC2 compute cloud and Hadoop to process four terabytes of scanned public articles and convert them to PDFs. For more information, read the “Self-Service, Prorated Supercomputing Fun!” article by Derek Gottfrid.

The Java code of the Map class is shown below.

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper

extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final int MISSING = 9999;

@Override

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String year = line.substring(15, 19);

int airTemperature;

if (line.charAt(87) == ‘+’) { // parseInt doesn’t like leading plus signs

airTemperature = Integer.parseInt(line.substring(88, 92));

} else {

airTemperature = Integer.parseInt(line.substring(87, 92));

}

String quality = line.substring(92, 93);

if (airTemperature != MISSING && quality.matches(“[01459]”)) {

context.write(new Text(year), new IntWritable(airTemperature));

} }

The code simply parses the input file line by line to extract the year and temperature reading from the fixed-width input file. So, no surprises here. Imagine, you’re an ETL developer and decide to use code to parse a file instead of using the SSIS Flat File Source, which relies on a data provider to do the parsing for you. However, what’s interesting in Hadoop is that the framework is intrinsically parallel and distributes the ETL job on multiple nodes. The map function extracts the year and the air temperature and writes them to the Context object.

(1950, 0)

(1950, 22)

(1950, 11)

(1949, 111)

(1949, 78) 

Next, Hadoop processes the output, sorts it and converts it into key-value pairs. In this case, the year is the key, the values are the temperature readings.

(1949, [111, 78])

(1950, [0, 22, 11])

Understanding the Reduce Job

The Reducer class is simple:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer

extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override

public void reduce(Text key, Iterable<IntWritable> values,

Context context)

throws IOException, InterruptedException {

int maxValue = Integer.MIN_VALUE;

for (IntWritable value : values) {

maxValue = Math.max(maxValue, value.get());

}

context.write(key, new IntWritable(maxValue));

}

}

For each key-pair (year), the reducer job loops through pair values (temperature reading) and returns the maximum temperature.

Understanding the Application

Finally, you need an application that ties the Map and Reduce classes.

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {

public static void main(String[] args) throws Exception {

if (args.length != 2) {

System.err.println(“Usage: MaxTemperature <input path> <output path>”);

System.exit(-1);

}

Job job = new Job();

job.setJarByClass(MaxTemperature.class);

job.setJobName(“Max temperature”);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(MaxTemperatureMapper.class);

job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

} }

Summary

Although simple and unassuming, the MaxTemperature application demonstrates a few aspects of the Hadoop inner workings:

  1. You copy the input datasets (presumably huge files) to the Hadoop distributed file system (HDFS). Hadoop shreds the files into 64 MB blocks. Then, it replicates each block three times (assuming triple replication configuration): to the node where the command is executed, and two additional nodes if you have a multi-node Hadoop cluster to provide fault tolerance. If a node fails, the file can still be assembled from the working nodes.
  2. The programmer writes Java code to implement a map job, a reduce job, and an application that invokes them.
  3. The Hadoop framework parallelizes and distributes the jobs to move the MapReduce computation to each node hosting a part of the input dataset. Behind the scenes, Hadoop creates a JobTracker job on the name node and TaskTracker jobs that run on the data nodes to manage the tasks and report progress back to the JobTracker job. If a task fails, the JobTracker can reschedule the job to run on a different tasktracker.
  4. Once the map jobs are done, the sorted map outputs are received by the node where the reduce job(s) are running. The reduce job merges the sorted outputs and writes the result in an output file stored in the Hadoop file system for reliability.
  5. Hadoop is a batch processing system. Jobs are started, processed, and their output is written to disk.

 

MS BI Guy Does Hadoop (Part 1 – Getting Started)

With Big Data and Hadoop getting a lot of attention nowadays, I’ve decided it’s time to take a look so I am starting a log of my Hadoop adventures. I hope it’ll benefit Windows users especially BI pros. If not, at least I’ll keep a track of my experience, so I can recreate it if needed. Before I start, for an excellent introduction to Hadoop from a Microsoft perspective, watch the talk on Big Data – What is the Big Deal? by David Dewitt. Previously, I’ve experimented and I got my feed wet with Apache Hadoop-based Services for Windows Azure, which is the Microsoft implementation for Hadoop in the cloud, but I was thirsty for more and wanted to dig deeper. Microsoft is currently working on CTP of Hadoop-based Services For Windows, which will provide a supported environment for installing and running Hadoop on Windows. While waiting, O’Reilly was king enough to send me a review copy of Hadoop – The Definitive Guide, 3rd Edition, by Tom White. Since Hadoop is an open-source project, I had to rediscover and relearn something I thought I would never had to since my university days – Unix, or to be more precise its CentOS Linux variant which is installed on the Cloudera VM. So, part 1 is about setting up your environment.

From the book, I discovered that Cloudera has a virtual machine for Virtual Box. I have VirtualBox on my Windows 7 laptop so I could run SharePoint 2010 (available in x64 only). VirtualBox is a great piece of software that was originally developed by Sun Microsystems and currently owned by Oracle. So, I’ve decided to take the VM shortcut since I don’t have much time to mess around with Cygwin, Java, etc. After downloading and double-extracting the Cloudera file, I created a new VirtualBox machine and I’ve made the following changes.

060412_0245_MSBIGuyDoes1

On the next step, I increased the memory to 2GB (recommended by Cloudera). In the Virtual Hard Disk step, I chose the “Use existing hard disk” option and pointed to the vmdi file I extracted from the Cloudera downloadable. Then, in the Settings page for the new VM, I’ve changed the storage to use the IDE controller instead of SATA which Cloudera said that the VM might have an issue with.

060412_0245_MSBIGuyDoes2

Once this was done, I was able to start the VM, which automatically logged me into CentOS as user cloudera. The first challenge I had to overcome was installing the VirtualBox Guest Editions for Linux in order to be able to resize the window and move the mouse cursor in and out without having to hold the right Ctrl key. This turned out to be more difficult than expected. The final solution took the following steps:

  1. Once you’ve started the guest OS, in the VM menu toolbar click Install Guest Additions to mount the disk.
  2. Open the File Manager and navigate to the /etc/yum.repos.d folder. Right-click the folder and click Open Terminal Here.

    In the command window, type the following command to elevate your privileges:

    $ su

    Enter the password (claudera) when prompted

  3. Open the Vi editor to edit the Cloudera-cdh3.repo as mentioned in the Cloudera VM demo note by typing this command.

    su -c vi Cloudera-cdh3.repo

  4. Change the baseurl line (changes in bold):

    [Cloudera-cdh3]

    name=Cloudera’s Distribution for Hadoop, Version 3

    enabled=1

    gpgcheck=0

    baseurl=http://archive.cloudera.com/redhat/cdh/3u4/

  5. Press ESC to go to command mode and type :wq to save and exit vi.

    Tip: To edit files in a more civilized way, click the File Manager icon in the menu bar at the bottom of the shell. However, you won’t have access to save files. As a workaround, launch the File Manager with elevated permissions as follows:

    $ su –c Thunar

  6. Enter the following command to install a few utilities and development kernel:

    $ yum install dkms binutils gcc make patch libgomp glibc-headers glibc-devel kernel-headers kernel-devel

  7. Then navigate to the media folder and run the Guest Additions file.
    $ cd /media
    $ cd VBOXADDITIONS_4.1.16_78094
    $ ./VBoxLinuxAdditions.run

    This should install the guest additions successfully. If you see any error messages, execute additional packages with yum as requested.

Next, you can verify the Hadoop installation by executing the steps in the Starting Hadoop and Verifying it is Working Properly section in the Hadoop Quick Start Guide.