Book Review: Pentaho Analytics for MongoDB (February 2014)


Bookcover

PentahoBookCoverMongodb

Book review by: David Fombella Pombal (twitter: @pentaho_fan)

Book Title: Pentaho Analytics for MongoDB

Author: Bo Borland

Paperback: 146 pages

I would like to suggest this book if you want to get started with MongoDB document oriented storage engine and Pentaho Open Source BI suite.

Target Audience
This book is intended for business analysts, data architects, and developers new
to either Pentaho or MongoDB, who want to be able to deliver a complete solution
for storing, processing, and visualizing data. It’s assumed that you already have
experience in defining the data requirements needed to support business processes
and exposure to database modeling, SQL query, and reporting techniques.

Rating: 8 out of 10

 

Chapter 1, Getting Started with Pentaho and MongoDB, introduces you to the powerful combination of MongoDB and Pentaho and provides step-by-step guidance on how to install and configure both technologies and restore the sample MongoDB data provided with this book.
Chapter 2, MongoDB Database Fundamentals, expands on the topic of data modeling and explains MongoDB database concepts essential to querying MongoDB data with Pentaho.
Chapter 3, Using Pentaho Instaview, shows you how to visualize data by connecting Pentaho to MongoDB. You use Instaview with the sample MongoDB database to analyze and visualize the website clickstream data.
Chapter 4, Modifying and Enhancing Instaview Transformations, introduces Pentaho Data Integration (PDI)—the ETL tool used by Instaview to extract, load, and transform data from various data sources.
Chapter 5, Modifying and Enhancing Instaview Metadata, explores metadata by explaining dimensional modeling concepts and how to model metadata to better reflect business requirements.
Chapter 6, Pentaho Report Designer Fundamentals, teaches you the basics of Pentaho Report Designer (PRD) to build pixel-perfect reports sourced directly from MongoDB databases.
Chapter 7, Pentaho Report Designer Prompting and Charting, expands on the previous chapter by teaching you additional advanced PRD features. You can enhance your report with new queries, charts, and a prompt designed to make the report more interactive.
Chapter 8, Deploying Pentaho Analytics to the Web, is all about web-enabling your MongoDB data using Pentaho methods and web interfaces for connecting to, modeling, and analyzing our sample clickstream data in a web browser.

MongoDB Chapter8

 

Chapter Contents List:

  Chapter 1: Getting Started with Pentaho and MongoDB
MongoDB technology overview
Pentaho technology overview
Installing MongoDB
Installing MongoDB as a Windows service
Restoring the sample clickstream MongoDB database
Installing Pentaho

Chapter 2: MongoDB Database Fundamentals
MongoDB database objects
Sample clickstream database objects
MongoDB data modeling
Normalized models
Denormalized models
MongoDB query methods
Query exercise 1
Read operations
Query exercise 2
Query operators
Querying arrays

    Chapter 3: Using Pentaho Instaview
Accessing and connecting Instaview to MongoDB
Parsing and profiling a MongoDB collection
Adding a MongoDB query expression
Creating and saving an analysis view and Instaview

    Chapter 4: Modifying and Enhancing Instaview Transformations
Opening an existing Instaview
Data integration
Adding a new data source
CSV file input
Stream lookup
Creating a new analysis view from blended data

Chapter 5: Modifying and Enhancing Instaview Metadata
Model design with dimensions and measures
Open an existing Instaview
Modifying measures and dimensions
Session duration measure
Session count measure
Event count measure
Referring URL dimension
Other dimension changes
Creating a new analysis view

Chapter 6: Pentaho Report Designer Fundamentals
Pentaho Report Designer features
Data sources
Report elements
Aggregations and calculations
Formatting and output
Navigating through Pentaho Report Designer
Report workspace
The Structure tab
The Data tab
The Style and Attributes tabs
The palette
The main menu and toolbar
The tab toolbar
Interface reference
Creating a MongoDB connection and query
Adding a MongoDB data source
Adding and formatting report elements
Adding a message field to your report
Adding number-fields to your report
Adding calculated values to your report

   Chapter 7: Pentaho Report Designer Prompting and Charting
Adding additional MongoDB queries
Adding a bar chart query
Adding a pie chart query
Visualizing your data with charts
JFreeChart chart types
Subreports
Chart data collectors and properties
Creating a bar chart
Modifying bar chart properties
Creating a pie chart
Creating a report prompt
Creating a new parameter
Adding parameters to existing report queries
Creating subreport import parameters

    Chapter 8: Deploying Pentaho Analytics to the Web
Publishing a Report Designer report to the Web
Publishing the clickstream report
An introduction to the Pentaho User Console
Running and scheduling the clickstream report
Enabling your Instaview output for the Web
Copying and modifying the Instaview transformation
Using the Data Source Wizard to model your data
Creating a JDBC connection and default metadata model
Customizing the metadata model
Creating Analyzer Views and Dashboard Designer dashboards
Creating a map view in Analyzer
Creating a heat grid in Analyzer
Creating a dashboard using Dashboard Designer

Advertisements

Open Business Analytics Training in London #BI #BigData #ETL #OLAP


Training Main page

Training

Dates:  From 28th April to 1st May 2014

Duration: 24 hours. 4 days

Location: Executive offices group meeting rooms. London.

Address: Central Court, 25 Southampton Bldgs – WC2A 1AL .

Training contents:

DAY 1
Business Intelligence Open Source Introduction and BI Server User Console
a. Pentaho 5 Architecture and new features, Mondrian, Kettle, etc…
b. Users and roles in Pentaho 5.
c. Browsing the repository in the user console.
d. Design tools.
Pentaho Data Integration (Kettle) ETL tool
a. Best Practices of ETL Processes.
b. Functional Overview (Jobs, transformations, stream control)
c. Parameters and Variables in kettle
• Environment variables and shared connections.
• ETL scheduling
d. Jobs
• Overview
• Step types (Mail, File Management, Scripting, etc…)
• Steps description
e. Transformations
• Overview
• Step types (Input, Output, Transform, Big Data, etc…)
• Steps description
f. Practical exercises
g. Data profiling with DataCleaner (pattern analysis, value distribution, date gap analysis …)
h. Talend Open Studio vs Kettle comparative
DAY 2
Data warehousing, OLAP and Mondrian
a. Datawarehouse – Datamart.
b. Star database schemas.
c.Multidimensional/OLAP
d. Mondrian ROLAP engine.
e. JPivot and MDX.
f. Designing OLAP structures Schema Workbench.
g. Tips to maximize Mondrian performance.
h. Alternatives to JPivot: STPivot, Saiku Analytics, OpenI
i. Practical Exercises
Social Intelligence
a. Introduction
b. Social KPIs (Facebook, Twitter …)
c. Samples
DAY 3
Reporting
a. AdHoc Reporting
• WAQR
• Pentaho Metadata Editor
• Creating a business model
b. Pentaho Reporting. Report Designer.
c. Practical Exercises

Big Data

a. Big Data Introduction
b. Pentaho Big Data components
c. Relational vs Columnar and Document Databases
DAY 4
Dashboards and Ctools
a. Introduction.
• Previous concepts.
• Best practices in dashboard design.
• Practical design.
b. Types of dashboards.
c. CDF
• Introduction.
• Samples.
d. CDE (Dashboard Editor)
• Introduction.
• Samples.
• Practical Exercise.
e. Ad hoc Dashboards.
• Introduction.
• Samples.
f. STDashboard
Plug-ins
a. SPARKL (Application designer)
b. Startup Rules (Substitute of xactions)

Book Review: Pentaho for Big Data Analytics (November 2013)


Bookcoverhttps://www.packtpub.com/pentaho-for-big-data-analytics/book

Book review by: David Fombella Pombal (twitter: @pentaho_fan)

Book Title: Pentaho for BIg Data Analytics

Authors: Manoj R Patil, Feris Thia

Paperback: 118 pages

I would like to suggest this book if you want to get started with Pentaho Open Source BI tool together with Hadoop and Big Data.

Target Audience
If you are  a Data Scientist, a Hadoop programmer, a Big Data enthusiast, or a developer working in the Business Intelligence domain who is aware of Hadoop or the Pentaho tools and want to try out creating a solution in the Big Data space, this is your manual.

Rating: 7 out of 10

Chapter 1, The Rise of  Pentaho Analytics along with Big Data

This chapter serves as a brief summary of the Pentaho tools and its history around Business Intelligence field, weaving in stories on the rise of Big Data.

Pentaho Tools:

Server Applications

  • Business Analytics (BA) Server: Java-based BI system with a report management system and lightweight process-flow engine, HTML5-based web interface. In Community Edition , there is another substitute application called Business Intelligence (BI) Server

BA

  • Data Integration (DI) Server: Enterprise version only server for the ETL processes and Data Integration

Thin Client Tools

  • Pentaho Interactive Reporting: WYSIWYG type of design interface used to construct simple and adhoc reports on the fly without the need of having IT or programming skills. There are several CE alternatives as WAQR (Web Ad-Hoc Query Reporting) and Saiku Reporting.

PIRPentaho Interactive Reporting (EE)

saikurepSaiku Reporting (CE)

WAQRjpgWeb Ad Hoc Query Reporting

  • Pentaho Analyzer: An advanced OLAP viewer with support for drag-and-drop. It is an EE intuitive analytical visualization tool with the capability  to filter and drill down into data, stored in a Mondrian (Pentaho ROLAP engine) data source.

analyzer_territoryPentaho Analyzer

  • Pentaho Dashboard Designer (EE): Commercial plugin that allows users to create dashboards with an easy graphical interface

Design Tools

  • Schema Workbench: Graphical tool for creating ROLAP schemas for Pentaho Analysis (Mondrian).
  • Aggregation Designer: Generate pre-calculated tales  to improve the performance of Mondrian OLAP schemas with this tool.
  • Design Studio: An eclipse-based application and plugin, that eases the creation of business process flows with a special XML script to define action sequences xactions.
  • Report Designer: A banded report designing tool with a great GUI, useful to create sub-reports, charts and graphs.
  • Data Integration:  This wonderful ETL tool is also known as Kettle, and is composed by an ETL engine and GUI  that allows the user to design ETL jobs and transformations.
  • Metadata Editor: This tool is used to create business models and acts as an abstraction layer from the underlying physical database.

 

chp1Pentaho BI Suite components

Chapter 2, Setting Up the ground

In this topic we will install Pentaho BI Suite CE and Saiku OLAP plugin from Marketplace. Besides, in the chapter we learn how to administer data sources using Pentaho User Console and Pentaho Administration Console.

chp2 marketplaceMarketplace plugin

Chapter 3, Churning Big Data with Pentaho

This chapter provides a basic understanding of the Big Data ecosystem and an example to analyze data sitting on the Hadoop framework using Pentaho. At the end of this chapter, you will learn how to translate diverse data sets into meaningful data sets using Hadoop/Hive.
This chapter covers the following subjects:
• Overview of Big Data and Hadoop
• Hadoop architecture
• Big Data capabilities of Pentaho Data Integration (PDI)  Kettle
• Working with PDI and Hortonworks Data Platform, a Hadoop distribution
• Loading data from Hadoop Distributed File System (HDFS) to Hive using PDI

Hadoop ecosystemThe Hadoop ecosystem

HDFS to hive transformationHDFS to Hive transformation

Chapter 4, Pentaho Business Analytics Tools

This topics gives a quick summary of the business analytics life cycle. We will look at several applications such as Pentaho Action Sequence and Pentaho Report Designer, as well as the Community Dashboard Editor (CDE), Community Data Access (CDA) and Community Dashboard Framework (CDF) plugins and their configuration, in order to get in touch with them.

CtoolsCtools

Hive Java queryHive Java query using User Defined Java Class Step

Chapter 5, Visualization of Big Data

This chapter provides a basic understanding of visualizations and examples to analyze the patterns using various charts based on Hive data. This chapter shows us  how to create an interactive analytical dashboard that gets data from Hive. Summarizing this chapter covers the following themes:
• Evolution of data visualization and its classification
• Data source preparation
• Consumption of HDFS-based data through HiveQL
• Creation of several types of charts
• Making charts more attractive using styling

hive query chp5Hive query

DashboardStock Price Analysis Dashboard

Appendix A, Big Data Sets

Talks about data preparation with one sample from stock exchange data.

Appendix B, Hadoop Setup

Takes you through the installation and configuration of the third-party Hadoop distribution, Hortonworks Sandbox, which is used throughout the book .

http://hortonworks.com/products/hortonworks-sandbox/

Hortonworks

 

Big Data Speeds Across the Chasm


By Pentaho CEO

Last week I visited our European team and met with customers, prospects, press and analysts to learn and talk about big data. My week in Europe confirmed my belief that we are definitely in the right business at the most exciting possible time. In a region that is rife with economic challenges, my conversations were optimistic and inspiring.Seasoned industry people will be familiar with Geoffrey Moore’s famous curve showing the phases of technology adoption, in which the toughest challenge is ‘crossing the chasm between the early adopters and the early majority. With some technologies, this journey can take years. Many never make it across.After speaking to me and executives from MapR, Cloudera and ParAccel, Brian McKenna of Computer Weekly proposes in his article “Big data analytics set to confound conventional adoption curve in UK” that big data adoption is moving relatively fast in the UK and Europe. The UK industry analyst Clive Longbottom, who I met with, reinforced this saying that big data adoption in the UK was only three months behind the US.Of course the real proof is in what customers are doing. During my visit, our customer Carsten Bomsdorf of Travian Games presented at the Big Data Analytics conference in London about how his company uses Pentaho to analyze the behavior of its 140 million gamers to continuously innovate its award-winning products. And in marked contrast to last year, every single European customer and prospect I met with was either executing or actively planning for big data analytics.Why is the adoption curve for big data moving faster than other technologies, even in Europe’s more traditionally risk and hype-averse markets? The answer is economic urgency. Big data analytics has demonstrated that it can help companies identify new revenue streams – even needles in haystacks – regardless of the economic climate. Quite simply big data is the ultimate tool for matching supply with demand.If Europe’s enthusiasm for big data is anything to go on, I have to conclude that 2013 really will be the year that it starts to enter mainstream production. Fasten your seat belts – it’s going to be a wild ride!Quentin Gallivan, CEO, Pentaho

via Big Data Speeds Across the Chasm.

Hadoop beginners tutorial on Ubuntu


Pentaho & Big Data

Why this tutorial? Pentaho Business Analytics used with Hadoop allows easy management and as a consequence this short introduction could be useful to getting in touch with Hadoop.

What we want to do

In this short tutorial, I will describe the required steps for setting up a single-node Hadoop cluster using the Hadoop Distributed File System (HDFS) on Ubuntu Linux.

Hadoop is a framework written in Java for running applications on large clusters of commodity hardware and incorporates features similar to those of the Google File System and of MapReduce. HDFS is a highly fault-tolerant distributed file system and like Hadoop designed to be deployed on low-cost hardware. It provides high throughput access to application data and is suitable for applications that have large data sets.

Cluster of machines running Hadoop at Yahoo!

The main goal of this tutorial is to get a ”simple” Hadoop installation up and running so that you can play around with the software and learn more about it.

This tutorial has been tested with the following software versions:

  • Ubuntu Linux 10.04 LTS (deprecated: 8.10 LTS, 8.04, 7.10, 7.04)
  • Hadoop 1.0.3, released May 2012

You can find the time of the last document update at the very bottom of this page.

Prerequisites

Sun Java 6

Hadoop requires a working Java 1.5.x (aka 5.0.x) installation. However, using Java 1.6.x (aka 6.0.x aka 6) is recommended for running Hadoop. For the sake of this tutorial, I will therefore describe the installation of Java 1.6.

Important Note: The apt instructions below are taken from this SuperUser.com thread. I got notified that the previous instructions that I provided no longer work. Please be aware that adding a third-party repository to your Ubuntu configuration is considered a security risk. If you do not want to proceed with the apt instructions below, feel free to install Sun JDK 6 via alternative means (e.g. by downloading the binary package from Oracle) and then continue with the next section in the tutorial.

# Add the Ferramosca Roberto's repository to your apt repositories
# See https://launchpad.net/~ferramroberto/
#
$ sudo apt-get install python-software-properties
$ sudo add-apt-repository ppa:ferramroberto/java

# Update the source list
$ sudo apt-get update

# Install Sun Java 6 JDK
$ sudo apt-get install sun-java6-jdk

# Select Sun's Java as the default on your machine.
# See 'sudo update-alternatives --config java' for more information.
#
$ sudo update-java-alternatives -s java-6-sun

The full JDK which will be placed in /usr/lib/jvm/java-6-sun (well, this directory is actually a symlink on Ubuntu).

After installation, make a quick check whether Sun’s JDK is correctly set up:

user@ubuntu:~# java -version
java version "1.6.0_20"
Java(TM) SE Runtime Environment (build 1.6.0_20-b02)
Java HotSpot(TM) Client VM (build 16.3-b01, mixed mode, sharing)

Adding a dedicated Hadoop system user

We will use a dedicated Hadoop user account for running Hadoop. While that’s not required it is recommended because it helps to separate the Hadoop installation from other software applications and user accounts running on the same machine (think: security, permissions, backups, etc).

$ sudo addgroup hadoop
$ sudo adduser --ingroup hadoop hduser

This will add the user hduser and the group hadoop to your local machine.

Configuring SSH

Hadoop requires SSH access to manage its nodes, i.e. remote machines plus your local machine if you want to use Hadoop on it (which is what we want to do in this short tutorial). For our single-node setup of Hadoop, we therefore need to configure SSH access to localhost for the hduser user we created in the previous section.

I assume that you have SSH up and running on your machine and configured it to allow SSH public key authentication. If not, there are several guides available.

First, we have to generate an SSH key for the hduser user.

user@ubuntu:~$ su - hduser
hduser@ubuntu:~$ ssh-keygen -t rsa -P ""
Generating public/private rsa key pair.
Enter file in which to save the key (/home/hduser/.ssh/id_rsa):
Created directory '/home/hduser/.ssh'.
Your identification has been saved in /home/hduser/.ssh/id_rsa.
Your public key has been saved in /home/hduser/.ssh/id_rsa.pub.
The key fingerprint is:
9b:82:ea:58:b4:e0:35:d7:ff:19:66:a6:ef:ae:0e:d2 hduser@ubuntu
The key's randomart image is:
[...snipp...]
hduser@ubuntu:~$

The second line will create an RSA key pair with an empty password. Generally, using an empty password is not recommended, but in this case it is needed to unlock the key without your interaction (you don’t want to enter the passphrase every time Hadoop interacts with its nodes).

Second, you have to enable SSH access to your local machine with this newly created key.

hduser@ubuntu:~$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys

The final step is to test the SSH setup by connecting to your local machine with the hduser user. The step is also needed to save your local machine’s host key fingerprint to the hduser user’s known_hosts file. If you have any special SSH configuration for your local machine like a non-standard SSH port, you can define host-specific SSH options in $HOME/.ssh/config (see man ssh_config for more information).

hduser@ubuntu:~$ ssh localhost
The authenticity of host 'localhost (::1)' can't be established.
RSA key fingerprint is d7:87:25:47:ae:02:00:eb:1d:75:4f:bb:44:f9:36:26.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'localhost' (RSA) to the list of known hosts.
Linux ubuntu 2.6.32-22-generic #33-Ubuntu SMP Wed Apr 28 13:27:30 UTC 2010 i686 GNU/Linux
Ubuntu 10.04 LTS
[...snipp...]
hduser@ubuntu:~$

If the SSH connect should fail, these general tips might help:

  • Enable debugging with ssh -vvv localhost and investigate the error in detail.
  • Check the SSH server configuration in /etc/ssh/sshd_config, in particular the options PubkeyAuthentication (which should be set to yes) and AllowUsers (if this option is active, add the hduser user to it). If you made any changes to the SSH server configuration file, you can force a configuration reload with sudo /etc/init.d/ssh reload.

Disabling IPv6

One problem with IPv6 on Ubuntu is that using 0.0.0.0 for the various networking-related Hadoop configuration options will result in Hadoop binding to the IPv6 addresses of my Ubuntu box.
In my case, I realized that there’s no practical point in enabling IPv6 on a box when you are not connected to any IPv6 network. Hence, I simply disabled IPv6 on my Ubuntu machine. Your mileage may vary.

To disable IPv6 on Ubuntu 10.04 LTS, open /etc/sysctl.conf in the editor of your choice and add the following lines to the end of the file:

#disable ipv6
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1

You have to reboot your machine in order to make the changes take effect. You can check whether IPv6 is enabled on your machine with the following command:

$ cat /proc/sys/net/ipv6/conf/all/disable_ipv6

A return value of 0 means IPv6 is enabled, a value of 1 means disabled (that’s what we want).

Alternative

You can also disable IPv6 only for Hadoop as documented in HADOOP-3437. You can do so by adding the following line to conf/hadoop-env.sh:

export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true

Hadoop

Installation

You have to download Hadoop from the Apache Download Mirrors and extract the contents of the Hadoop package to a location of your choice. I picked /usr/local/hadoop. Make sure to change the owner of all the files to the hduser user and hadoop group, for example:

$ cd /usr/local
$ sudo tar xzf hadoop-1.0.3.tar.gz
$ sudo mv hadoop-1.0.3 hadoop
$ sudo chown -R hduser:hadoop hadoop

(Just to give you the idea, YMMV — personally, I create a symlink from hadoop-1.0.3 to hadoop.)

Update $HOME/.bashrc

Add the following lines to the end of the $HOME/.bashrc file of user hduser. If you use a shell other than bash, you should of course update its appropriate configuration files instead of .bashrc.

# Set Hadoop-related environment variables
export HADOOP_HOME=/usr/local/hadoop

# Set JAVA_HOME (we will also configure JAVA_HOME directly for Hadoop later on)
export JAVA_HOME=/usr/lib/jvm/java-6-sun

# Some convenient aliases and functions for running Hadoop-related commands
unalias fs &> /dev/null
alias fs="hadoop fs"
unalias hls &> /dev/null
alias hls="fs -ls"

# If you have LZO compression enabled in your Hadoop cluster and
# compress job outputs with LZOP (not covered in this tutorial):
# Conveniently inspect an LZOP compressed file from the command
# line; run via:
#
# $ lzohead /hdfs/path/to/lzop/compressed/file.lzo
#
# Requires installed 'lzop' command.
#
lzohead () {
    hadoop fs -cat $1 | lzop -dc | head -1000 | less
}

# Add Hadoop bin/ directory to PATH
export PATH=$PATH:$HADOOP_HOME/bin

You can repeat this exercise also for other users who want to use Hadoop.

Excursus: Hadoop Distributed File System (HDFS)

From The Hadoop Distributed File System: Architecture and Design:

The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for applications that have large data sets. HDFS relaxes a few POSIX requirements to enable streaming access to file system data. HDFS was originally built as infrastructure for the Apache Nutch web search engine project. HDFS is part of the Apache Hadoop project, which is part of the Apache Lucene project.

The following picture gives an overview of the most important HDFS components.

HDFS Architecture (source: http://hadoop.apache.org/core/docs/current/hdfs_design.html)

Configuration

Our goal in this tutorial is a single-node setup of Hadoop. More information of what we do in this section is available on the Hadoop Wiki.

hadoop-env.sh

The only required environment variable we have to configure for Hadoop in this tutorial is JAVA_HOME. Open /conf/hadoop-env.sh in the editor of your choice (if you used the installation path in this tutorial, the full path is /usr/local/hadoop/conf/hadoop-env.sh) and set the JAVA_HOME environment variable to the Sun JDK/JRE 6 directory.

Change

# The java implementation to use.  Required.
# export JAVA_HOME=/usr/lib/j2sdk1.5-sun

to

# The java implementation to use.  Required.
export JAVA_HOME=/usr/lib/jvm/java-6-sun

Note: If you are on a Mac with OS X 10.7 you can use the following line to set up JAVA_HOME in conf/hadoop-env.sh.

# for our Mac users
export JAVA_HOME=`/usr/libexec/java_home`

conf/*-site.xml

Note: As of Hadoop 0.20.x and 1.x, the configuration settings previously found in hadoop-site.xml were moved to core-site.xml (hadoop.tmp.dir, fs.default.name), mapred-site.xml (mapred.job.tracker) and hdfs-site.xml (dfs.replication).

In this section, we will configure the directory where Hadoop will store its data files, the network ports it listens to, etc. Our setup will use Hadoop’s Distributed File System, HDFS, even though our little “cluster” only contains our single local machine.

You can leave the settings below ”as is” with the exception of the hadoop.tmp.dir variable which you have to change to the directory of your choice. We will use the directory /app/hadoop/tmp in this tutorial. Hadoop’s default configurations use hadoop.tmp.dir as the base temporary directory both for the local file system and HDFS, so don’t be surprised if you see Hadoop creating the specified directory automatically on HDFS at some later point.

Now we create the directory and set the required ownerships and permissions:

$ sudo mkdir -p /app/hadoop/tmp
$ sudo chown hduser:hadoop /app/hadoop/tmp
# ...and if you want to tighten up security, chmod from 755 to 750...
$ sudo chmod 750 /app/hadoop/tmp

If you forget to set the required ownerships and permissions, you will see a java.io.IOException when you try to format the name node in the next section).

Add the following snippets between the <configuration> … </configuration> tags in the respective configuration XML file.

In file conf/core-site.xml:

<!-- In: conf/core-site.xml -->
<property>
  <name>hadoop.tmp.dir</name>
  <value>/app/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>

<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:54310</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>

In file conf/mapred-site.xml:

<!-- In: conf/mapred-site.xml -->
<property>
  <name>mapred.job.tracker</name>
  <value>localhost:54311</value>
  <description>The host and port that the MapReduce job tracker runs
  at.  If "local", then jobs are run in-process as a single map
  and reduce task.
  </description>
</property>

In file conf/hdfs-site.xml:

<!-- In: conf/hdfs-site.xml -->
<property>
  <name>dfs.replication</name>
  <value>1</value>
  <description>Default block replication.
  The actual number of replications can be specified when the file is created.
  The default is used if replication is not specified in create time.
  </description>
</property>

See Getting Started with Hadoop and the documentation in Hadoop’s API Overview if you have any questions about Hadoop’s configuration options.

Formatting the HDFS filesystem via the NameNode

The first step to starting up your Hadoop installation is formatting the Hadoop filesystem which is implemented on top of the local filesystem of your “cluster” (which includes only your local machine if you followed this tutorial). You need to do this the first time you set up a Hadoop cluster.

Do not format a running Hadoop filesystem as you will lose all the data currently in the cluster (in HDFS).

To format the filesystem (which simply initializes the directory specified by the dfs.name.dir variable), run the command

hduser@ubuntu:~$ /usr/local/hadoop/bin/hadoop namenode -format

The output will look like this:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop namenode -format
10/05/08 16:59:56 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = ubuntu/127.0.1.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 0.20.2
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20 -r 911707; compiled by 'chrisdo' on Fri Feb 19 08:07:34 UTC 2010
************************************************************/
10/05/08 16:59:56 INFO namenode.FSNamesystem: fsOwner=hduser,hadoop
10/05/08 16:59:56 INFO namenode.FSNamesystem: supergroup=supergroup
10/05/08 16:59:56 INFO namenode.FSNamesystem: isPermissionEnabled=true
10/05/08 16:59:56 INFO common.Storage: Image file of size 96 saved in 0 seconds.
10/05/08 16:59:57 INFO common.Storage: Storage directory .../hadoop-hduser/dfs/name has been successfully formatted.
10/05/08 16:59:57 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at ubuntu/127.0.1.1
************************************************************/
hduser@ubuntu:/usr/local/hadoop$

Starting your single-node cluster

Run the command:

hduser@ubuntu:~$ /usr/local/hadoop/bin/start-all.sh

This will startup a Namenode, Datanode, Jobtracker and a Tasktracker on your machine.

The output will look like this:

hduser@ubuntu:/usr/local/hadoop$ bin/start-all.sh
starting namenode, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-namenode-ubuntu.out
localhost: starting datanode, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-datanode-ubuntu.out
localhost: starting secondarynamenode, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-secondarynamenode-ubuntu.out
starting jobtracker, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-jobtracker-ubuntu.out
localhost: starting tasktracker, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-tasktracker-ubuntu.out
hduser@ubuntu:/usr/local/hadoop$

A nifty tool for checking whether the expected Hadoop processes are running is jps (part of Sun’s Java since v1.5.0). See also How to debug MapReduce programs.

hduser@ubuntu:/usr/local/hadoop$ jps
2287 TaskTracker
2149 JobTracker
1938 DataNode
2085 SecondaryNameNode
2349 Jps
1788 NameNode

You can also check with netstat if Hadoop is listening on the configured ports.

hduser@ubuntu:~$ sudo netstat -plten | grep java
tcp   0  0 0.0.0.0:50070   0.0.0.0:*  LISTEN  1001  9236  2471/java
tcp   0  0 0.0.0.0:50010   0.0.0.0:*  LISTEN  1001  9998  2628/java
tcp   0  0 0.0.0.0:48159   0.0.0.0:*  LISTEN  1001  8496  2628/java
tcp   0  0 0.0.0.0:53121   0.0.0.0:*  LISTEN  1001  9228  2857/java
tcp   0  0 127.0.0.1:54310 0.0.0.0:*  LISTEN  1001  8143  2471/java
tcp   0  0 127.0.0.1:54311 0.0.0.0:*  LISTEN  1001  9230  2857/java
tcp   0  0 0.0.0.0:59305   0.0.0.0:*  LISTEN  1001  8141  2471/java
tcp   0  0 0.0.0.0:50060   0.0.0.0:*  LISTEN  1001  9857  3005/java
tcp   0  0 0.0.0.0:49900   0.0.0.0:*  LISTEN  1001  9037  2785/java
tcp   0  0 0.0.0.0:50030   0.0.0.0:*  LISTEN  1001  9773  2857/java
hduser@ubuntu:~$

If there are any errors, examine the log files in the /logs/ directory.

Stopping your single-node cluster

Run the command

hduser@ubuntu:~$ /usr/local/hadoop/bin/stop-all.sh

to stop all the daemons running on your machine.

Example output:

hduser@ubuntu:/usr/local/hadoop$ bin/stop-all.sh
stopping jobtracker
localhost: stopping tasktracker
stopping namenode
localhost: stopping datanode
localhost: stopping secondarynamenode
hduser@ubuntu:/usr/local/hadoop$

Running a MapReduce job

We will now run your first Hadoop MapReduce job. We will use the WordCount example job which reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occurred, separated by a tab. More information of what happens behind the scenes is available at the Hadoop Wiki.

Download example input data

We will use three ebooks from Project Gutenberg for this example:

Download each ebook as text files in Plain Text UTF-8 encoding and store the files in a temporary directory of choice, for example /tmp/gutenberg.

hduser@ubuntu:~$ ls -l /tmp/gutenberg/
total 3604
-rw-r--r-- 1 hduser hadoop  674566 Feb  3 10:17 pg20417.txt
-rw-r--r-- 1 hduser hadoop 1573112 Feb  3 10:18 pg4300.txt
-rw-r--r-- 1 hduser hadoop 1423801 Feb  3 10:18 pg5000.txt
hduser@ubuntu:~$

Restart the Hadoop cluster

Restart your Hadoop cluster if it’s not running already.

hduser@ubuntu:~$ /usr/local/hadoop/bin/start-all.sh

Copy local example data to HDFS

Before we run the actual MapReduce job, we first have to copy the files from our local file system to Hadoop’s HDFS.

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser
Found 1 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
hduser@ubuntu:/usr/local/hadoop$

Run the MapReduce job

Now, we actually run the WordCount example job.

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar hadoop*examples*.jar wordcount /user/hduser/gutenberg /user/hduser/gutenberg-output

This command will read all the files in the HDFS directory /user/hduser/gutenberg, process it, and store the result in the HDFS directory /user/hduser/gutenberg-output.

Note: Some people run the command above and get the following error message:

Exception in thread "main" java.io.IOException: Error opening job jar: hadoop*examples*.jar
at org.apache.hadoop.util.RunJar.main (RunJar.java: 90)
Caused by: java.util.zip.ZipException: error in opening zip file

In this case, re-run the command with the full name of the Hadoop Examples JAR file, for example:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar hadoop-examples-1.0.3.jar wordcount /user/hduser/gutenberg /user/hduser/gutenberg-output

Example output of the previous command in the console:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar hadoop*examples*.jar wordcount /user/hduser/gutenberg /user/hduser/gutenberg-output
10/05/08 17:43:00 INFO input.FileInputFormat: Total input paths to process : 3
10/05/08 17:43:01 INFO mapred.JobClient: Running job: job_201005081732_0001
10/05/08 17:43:02 INFO mapred.JobClient:  map 0% reduce 0%
10/05/08 17:43:14 INFO mapred.JobClient:  map 66% reduce 0%
10/05/08 17:43:17 INFO mapred.JobClient:  map 100% reduce 0%
10/05/08 17:43:26 INFO mapred.JobClient:  map 100% reduce 100%
10/05/08 17:43:28 INFO mapred.JobClient: Job complete: job_201005081732_0001
10/05/08 17:43:28 INFO mapred.JobClient: Counters: 17
10/05/08 17:43:28 INFO mapred.JobClient:   Job Counters
10/05/08 17:43:28 INFO mapred.JobClient:     Launched reduce tasks=1
10/05/08 17:43:28 INFO mapred.JobClient:     Launched map tasks=3
10/05/08 17:43:28 INFO mapred.JobClient:     Data-local map tasks=3
10/05/08 17:43:28 INFO mapred.JobClient:   FileSystemCounters
10/05/08 17:43:28 INFO mapred.JobClient:     FILE_BYTES_READ=2214026
10/05/08 17:43:28 INFO mapred.JobClient:     HDFS_BYTES_READ=3639512
10/05/08 17:43:28 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=3687918
10/05/08 17:43:28 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=880330
10/05/08 17:43:28 INFO mapred.JobClient:   Map-Reduce Framework
10/05/08 17:43:28 INFO mapred.JobClient:     Reduce input groups=82290
10/05/08 17:43:28 INFO mapred.JobClient:     Combine output records=102286
10/05/08 17:43:28 INFO mapred.JobClient:     Map input records=77934
10/05/08 17:43:28 INFO mapred.JobClient:     Reduce shuffle bytes=1473796
10/05/08 17:43:28 INFO mapred.JobClient:     Reduce output records=82290
10/05/08 17:43:28 INFO mapred.JobClient:     Spilled Records=255874
10/05/08 17:43:28 INFO mapred.JobClient:     Map output bytes=6076267
10/05/08 17:43:28 INFO mapred.JobClient:     Combine input records=629187
10/05/08 17:43:28 INFO mapred.JobClient:     Map output records=629187
10/05/08 17:43:28 INFO mapred.JobClient:     Reduce input records=102286

Check if the result is successfully stored in HDFS directory /user/hduser/gutenberg-output:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser
Found 2 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:43 /user/hduser/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-output
Found 2 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:43 /user/hduser/gutenberg-output/_logs
-rw-r--r--   1 hduser supergroup     880802 2010-05-08 17:43 /user/hduser/gutenberg-output/part-r-00000
hduser@ubuntu:/usr/local/hadoop$

If you want to modify some Hadoop settings on the fly like increasing the number of Reduce tasks, you can use the “-D” option:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar hadoop*examples*.jar wordcount -D mapred.reduce.tasks=16 /user/hduser/gutenberg /user/hduser/gutenberg-output

An important note about mapred.map.tasks: Hadoop does not honor mapred.map.tasks beyond considering it a hint. But it accepts the user specified mapred.reduce.tasks and doesn’t manipulate that. You cannot force mapred.map.tasks but you can specify mapred.reduce.tasks.

Retrieve the job result from HDFS

To inspect the file, you can copy it from HDFS to the local file system. Alternatively, you can use the command

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-r-00000

to read the file directly from HDFS without copying it to the local file system. In this tutorial, we will copy the results to the local file system though.

hduser@ubuntu:/usr/local/hadoop$ mkdir /tmp/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -getmerge /user/hduser/gutenberg-output /tmp/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$ head /tmp/gutenberg-output/gutenberg-output
"(Lo)cra"       1
"1490   1
"1498," 1
"35"    1
"40,"   1
"A      2
"AS-IS".        1
"A_     1
"Absoluti       1
"Alack! 1
hduser@ubuntu:/usr/local/hadoop$

Note that in this specific output the quote signs (“) enclosing the words in the head output above have not been inserted by Hadoop. They are the result of the word tokenizer used in the WordCount example, and in this case they matched the beginning of a quote in the ebook texts. Just inspect the part-00000 file further to see it for yourself.

The command fs -getmerge will simply concatenate any files it finds in the directory you specify. This means that the merged file might (and most likely will) not be sorted.

Hadoop Web Interfaces

Hadoop comes with several web interfaces which are by default (see conf/hadoop-default.xml) available at these locations:

These web interfaces provide concise information about what’s happening in your Hadoop cluster. You might want to give them a try.

NameNode Web Interface (HDFS layer)

The name node web UI shows you a cluster summary including information about total/remaining capacity, live and dead nodes. Additionally, it allows you to browse the HDFS namespace and view the contents of its files in the web browser. It also gives access to the ”local machine’s” Hadoop log files.

By default, it’s available at http://localhost:50070/.

A screenshot of Hadoop’s Name Node web interface.

JobTracker Web Interface (MapReduce layer)

The job tracker web UI provides information about general job statistics of the Hadoop cluster, running/completed/failed jobs and a job history log file. It also gives access to the ”local machine’s” Hadoop log files (the machine on which the web UI is running on).

By default, it’s available at http://localhost:50030/.

A screenshot of Hadoop’s Job Tracker web interface.

TaskTracker Web Interface (MapReduce layer)

The task tracker web UI shows you running and non-running tasks. It also gives access to the ”local machine’s” Hadoop log files.

By default, it’s available at http://localhost:50060/.

A screenshot of Hadoop’s Task Tracker web interface.

Writing An Hadoop MapReduce Program In Python

by Michael G. Noll on September 21, 2007 (last updated: June 17, 2012)

In this tutorial, I will describe how to write a simple MapReduce program for Hadoop in the Python programming language.

Motivation

Even though the Hadoop framework is written in Java, programs for Hadoop need not to be coded in Java but can also be developed in other languages like Python or C++ (the latter since version 0.14.1). However, the documentation and the most prominent Python example on the Hadoop home page could make you think that youmust translate your Python code using Jython into a Java jar file. Obviously, this is not very convenient and can even be problematic if you depend on Python features not provided by Jython. Another issue of the Jython approach is the overhead of writing your Python program in such a way that it can interact with Hadoop – just have a look at the example in<HADOOP_INSTALL>/src/examples/python/WordCount.py and you see what I mean. I still recommend to have at least a look at the Jython approach and maybe even at the new C++ MapReduce API called Pipes, it’s really interesting.

Having that said, the ground is prepared for the purpose of this tutorial: writing a Hadoop MapReduce program in a more Pythonic way, i.e. in a way you should be familiar with.

What we want to do

We will write a simple MapReduce program (see also Wikipedia) for Hadoop in Python but without using Jython to translate our code to Java jar files.

Our program will mimick the WordCount example, i.e. it reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occured, separated by a tab.

Note: You can also use programming languages other than Python such as Perl or Ruby with the “technique” described in this tutorial. I wrote some words about what happens behind the scenes. Feel free to correct me if I’m wrong.

Prerequisites

You should have an Hadoop cluster up and running because we will get our hands dirty. If you don’t have a cluster yet, my following tutorials might help you to build one. The tutorials are tailored to Ubuntu Linux but the information does also apply to other Linux/Unix variants.

Python MapReduce Code

The “trick” behind the following Python code is that we will use HadoopStreaming (see also the wiki entry) for helping us passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output). We will simply use Python’s sys.stdin to read input data and print our own output to sys.stdout. That’s all we need to do because HadoopStreaming will take care of everything else! Amazing, isn’t it? Well, at least I had a “wow” experience…

Map: mapper.py

Save the following code in the file /home/hduser/mapper.py. It will read data from STDIN (standard input), split it into words and output a list of lines mapping words to their (intermediate) counts to STDOUT (standard output). The Map script will not compute an (intermediate) sum of a word’s occurrences. Instead, it will output “<word> 1″ immediately – even though the <word> might occur multiple times in the input – and just let the subsequent Reduce step do the final sum count. Of course, you can change this behavior in your own scripts as you please, but we will keep it like that in this tutorial because of didactic reasons 🙂

Make sure the file has execution permission (chmod +x /home/hduser/mapper.py should do the trick) or you will run into problems.

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

Reduce: reducer.py

Save the following code in the file /home/hduser/reducer.py. It will read the results of mapper.py from STDIN (standard input), and sum the occurrences of each word to a final count, and output its results to STDOUT (standard output).

Make sure the file has execution permission (chmod +x /home/hduser/reducer.py should do the trick) or you will run into problems.

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)
 

Test your code (cat data | map | sort | reduce)

I recommend to test your mapper.py and reducer.py scripts locally before using them in a MapReduce job. Otherwise your jobs might successfully complete but there will be no job result data at all or not the results you would have expected. If that happens, most likely it was you (or me) who screwed up.

Here are some ideas on how to test the functionality of the Map and Reduce scripts.

 # very basic test
 hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
 foo     1
 foo     1
 quux    1
 labs    1
 foo     1
 bar     1
 quux    1
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
 bar     1
 foo     3
 labs    1
 quux    2
 # using one of the ebooks as example input
 # (see below on where to get the ebooks)
 hduser@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
 The     1
 Project 1
 Gutenberg       1
 EBook   1
 of      1
 [...]
 (you get the idea)

Running the Python Code on Hadoop

Download example input data

We will use three ebooks from Project Gutenberg for this example:

Download each ebook as text files in Plain Text UTF-8 encoding and store the files in a temporary directory of choice, for example /tmp/gutenberg.

hduser@ubuntu:~$ ls -l /tmp/gutenberg/
total 3604
-rw-r--r-- 1 hduser hadoop  674566 Feb  3 10:17 pg20417.txt
-rw-r--r-- 1 hduser hadoop 1573112 Feb  3 10:18 pg4300.txt
-rw-r--r-- 1 hduser hadoop 1423801 Feb  3 10:18 pg5000.txt
hduser@ubuntu:~$

Copy local example data to HDFS

Before we run the actual MapReduce job, we first have to copy the files from our local file system to Hadoop’s HDFS.

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
hduser@ubuntu:/usr/local/hadoop$

Run the MapReduce job

Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As I said above, we useHadoopStreaming for helping us passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output).

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /home/hduser/mapper.py -mapper /home/hduser/mapper.py -file /home/hduser/reducer.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output

If you want to modify some Hadoop settings on the fly like increasing the number of Reduce tasks, you can use the -Doption:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=16 ...

An important note about mapred.map.tasks: Hadoop does not honor mapred.map.tasks beyond considering it a hint. But it accepts the user specified mapred.reduce.tasks and doesn’t manipulate that. You cannot force mapred.map.tasks but can specify mapred.reduce.tasks.

The job will read all the files in the HDFS directory /user/hduser/gutenberg, process it, and store the results in the HDFS directory /user/hduser/gutenberg-output. In general Hadoop will create one output file per reducer; in our case however it will only create a single file because the input files are very small.

Example output of the previous command in the console:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
 additionalConfSpec_:null
 null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
 packageJobJar: [/app/hadoop/tmp/hadoop-unjar54543/]
 [] /tmp/streamjob54544.jar tmpDir=null
 [...] INFO mapred.FileInputFormat: Total input paths to process : 7
 [...] INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
 [...] INFO streaming.StreamJob: Running job: job_200803031615_0021
 [...]
 [...] INFO streaming.StreamJob:  map 0%  reduce 0%
 [...] INFO streaming.StreamJob:  map 43%  reduce 0%
 [...] INFO streaming.StreamJob:  map 86%  reduce 0%
 [...] INFO streaming.StreamJob:  map 100%  reduce 0%
 [...] INFO streaming.StreamJob:  map 100%  reduce 33%
 [...] INFO streaming.StreamJob:  map 100%  reduce 70%
 [...] INFO streaming.StreamJob:  map 100%  reduce 77%
 [...] INFO streaming.StreamJob:  map 100%  reduce 100%
 [...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
 [...] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$

As you can see in the output above, Hadoop also provides a basic web interface for statistics and information. When the Hadoop cluster is running, go to http://localhost:50030/ and browse around. Here’s a screenshot of the Hadoop web interface for the job we just ran.

A screenshot of Hadoop’s web interface, showing the details of the MapReduce job we just ran.

Check if the result is successfully stored in HDFS directory /user/hduser/gutenberg-output:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-output
 Found 1 items
 /user/hduser/gutenberg-output/part-00000     <r 1>   903193  2007-09-21 13:00
 hduser@ubuntu:/usr/local/hadoop$

You can then inspect the contents of the file with the dfs -cat command:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000
 "(Lo)cra"       1
 "1490   1
 "1498," 1
 "35"    1
 "40,"   1
 "A      2
 "AS-IS".        2
 "A_     1
 "Absoluti       1
 [...]
 hduser@ubuntu:/usr/local/hadoop$

Note that in this specific output above the quote signs (“) enclosing the words have not been inserted by Hadoop. They are the result of how our Python code splits words, and in this case it matched the beginning of a quote in the ebook texts. Just inspect the part-00000 file further to see it for yourself.

Improved Mapper and Reducer code: using Python iterators and generators

The Mapper and Reducer examples above should have given you an idea of how to create your first MapReduce application. The focus was code simplicity and ease of understanding, particularly for beginners of the Python programming language. In a real-world application however, you might want to optimize your code by using Python iterators and generators (an even better introduction in PDF) as some readers have pointed out.

Generally speaking, iterators and generators (functions that create iterators, for example with Python’s yield statement) have the advantage that an element of a sequence is not produced until you actually need it. This can help a lot in terms of computational expensiveness or memory consumption depending on the task at hand.

Note: The following Map and Reduce scripts will only work “correctly” when being run in the Hadoop context, i.e. as Mapper and Reducer in a MapReduce job. This means that running the naive test “cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py” will not work correctly anymore because some functionality is intentionally outsourced to Hadoop.

Precisely, we compute the sum of a word’s occurrences, e.g. (“foo”, 4), only if by chance the same word (“foo”) appears multiple times in succession. In the majority of cases, however, we let the Hadoop group the (key, value) pairs between the Map and the Reduce step because Hadoop is more efficient in this regard than our simple Python scripts.

mapper.py

#!/usr/bin/env python
"""A more advanced Mapper, using Python iterators and generators."""

import sys

def read_input(file):
    for line in file:
        # split the line into words
        yield line.split()

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_input(sys.stdin)
    for words in data:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        for word in words:
            print '%s%s%d' % (word, separator, 1)

if __name__ == "__main__":
    main()
 

reducer.py

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    # groupby groups multiple word-count pairs by word,
    # and creates an iterator that returns consecutive keys and their group:
    #   current_word - string containing a word (the key)
    #   group - iterator yielding all ["<current_word>", ""] items
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()