Pipes Tutorial

From Lamp
Jump to: navigation, search

PIPES TUTORIAL by Hitesh Maidasani

Another helpful tutorial: http://cs.smith.edu/dftwiki/index.php/Hadoop_Tutorial_2.2_--_Running_C%2B%2B_Programs_on_Hadoop

Adapted from http://developer.yahoo.com/hadoop/tutorial/module4.html#pipes


Pipes is a library which allows C++ source code to be used for Mapper and Reducer code. Applications which require high numerical performance may see better throughput if written in C++ and used through Pipes. The include files and static libraries are present in the c++/Linux-i386-32/ directory under your Hadoop installation. Your application should include include/hadoop/Pipes.hh andTemplateFactory.hh Both key and value inputs to pipes programs are provided as STL strings (std::string). A program must still define an instance of Mapper and Reducer; these names have not changed. (They, like all other classes defined in Pipes, are in the HadoopPipes namespace.) Unlike the classes of the same names in Hadoop itself, the map() and reduce() functions take in a single argument which is a reference to an object of type MapContext and ReduceContext respectively. The most important methods contained in each of these context objects are:

 const std::string& getInputKey();
 const std::string& getInputValue();
 void emit(const std::string& key, const std::string& value);

Defining a Pipes Program: A program to use with Pipes is defined by writing classes extending Mapper and Reducer. Hadoop must then be informed which classes to use to run the job. An instance of your C++ program will be started by the Pipes framework in main() on each machine. This should do any (hopefully brief) configuration required for your task. It should then define a Factory to create Mapper and Reducer instances as necessary, and then run the job by calling the runTask()method. The simplest way to define a factory is with the following code:

 int main(int argc, char *argv[]) {
 // classes are indicated to the factory via templates (defined in TemplateFactory)
 	return HadoopPipes::runTask(HadoopPipes::TemplateFactory< MyMapperClass,  MyReducerClass >() );

Need Mapper and Reducer:

General structure of mapperClass:

 class MyMapperClass : public HadoopPipes::Mapper {
 // constructor: does nothing
 	 MyMapperClass( HadoopPipes::TaskContext& context ) {
 // map function does all processing and output
 void map( HadoopPipes::MapContext& context ) {
 //returns the key from key-value input given to mapper (usually key is a file name)
 string name = context.getInputKey(); 
 //returns the value from key-value input given to mapper (usually value is a file content)
 	string data = context.getInputValue();
 	//do all processing using the input
 	//output key-value pair which will be sent to reducer
 	context.emit( <key>, <value>);

General structure of reducerClass:

 class MyReducerClass : public HadoopPipes::Reducer {
 	 // constructor: does nothing
 MyReducerClass(HadoopPipes::TaskContext& context) {
 // reduce function
 void reduce( HadoopPipes::ReduceContext& context ) {
 /*can have same input and output structure as mapper but if don’t need further processing after mapper output, 
 can leave reducer empty. In that case, you would need an IdentityReducer, which performs no reduction, writing all input 
 values directly to the output. How to run the IdentityReducer is explained when running the pipes program. 
 But this general reducer structure [even though empty and does nothing] is still required in your program.*/

Example. Converting ClutterRemoval Driver to ClutterRemoval for pipes

Here is the original driver for ClutterRemoval to be used via command line:

 //#include "stdafx.h"
 #include "clutterRemoval.h"
 int main(int argc, char* argv[])
 	CClutterRemoval cr;
 	DLImage origDLImg;						//Original image
 	string fileName( argv[ 1 ] );
 	string outFileName(argv[2]);
 	/*	string outFileName( argv[1] );			//Output File
 	string path( argv[2] );
 	int index;
 	index = outFileName.find_last_of("/\\");
 	outFileName = outFileName.substr( index + 1 );
 	if( path.find_last_of( "/\\" ) == path.size() - 1 )
 	  outFileName = path + outFileName;
 		//Add "/" 
 		outFileName = path + "/" + outFileName;
 	//cout << "Output File " << outFileName << endl;
 	//Load the input file
 	origDLImg.dlLoadImage( fileName.c_str() ); 
 	//Remove clutter
 	cr.removeClutter( origDLImg );
 	//Save into an output file
 	origDLImg.dlSaveImage( outFileName.c_str() );
 	return 0;

The pipes program is very similar, but with a mapper and reducer in the structure described above. There is also additional input and output processing, instead of getting the input from command line. Notice the hadoop files included.

Here is the pipes version of ClutterRemoval to be used in Hadoop

 //#include "stdafx.h"
 #include "clutterRemoval.h"
 #include "base64.h"
 #include "iostream"
 #include "DLBitsPerPixelConverter.h"
 #include "DLTIFFImage.h"
 #include "/chomes/hitmai/hadoop-0.20.1+152/c++/Linux-i386-32/include/hadoop/Pipes.hh"
 #include "/chomes/hitmai/hadoop-0.20.1+152/c++/Linux-i386-32/include/hadoop/TemplateFactory.hh"
 class ClutterRemovalMapper : public HadoopPipes::Mapper {
 // constructor: does nothing
 ClutterRemovalMapper( HadoopPipes::TaskContext& context ) {

 // map function
 void map( HadoopPipes::MapContext& context ) {
 	CClutterRemoval cr;
 	DLImage origDLImg;		    //Original image	
 	string name = context.getInputKey();
      cerr << name << endl;
 	unsigned char* buf;
 	//-----Handling base64 input data----------------
 	string data;
 	data = context.getInputValue();	//read entire base64 string  from input until EOF
 	string str64 = base64_decode(data);	//decode the base64 string
 	buf = (unsigned char*) str64.c_str();	//change string to  const char* and cast it to buf
 	//----------regular processing----------
 	// pass to Doclib to create an image
 	// ----- Begin processing ----------------------------
 	// if grayscale, convert to BW
 	//Remove clutter
 	cr.removeClutter( origDLImg );
 	// ----- End processing ----------------------------
 	int imgSize;
 	unsigned char *newBuff;
 	newBuff=(unsigned char *)origDLImg.dlSaveImageToMem(imgSize,DLTIFFImage::createImageReader());
 	string s=base64_encode(newBuff,imgSize);
 	context.emit( name, s );
 class ClutterRemovalReducer : public HadoopPipes::Reducer {
 // constructor: does nothing
 ClutterRemovalReducer(HadoopPipes::TaskContext& context) {
 // reduce function
 void reduce( HadoopPipes::ReduceContext& context ) {
 /*Notice reducer does no processing, since Mapper does all processing and output, but
 We will need the IdentityReducer to have all Output in one file*/
 int main(int argc, char *argv[]) {
   return HadoopPipes::runTask(HadoopPipes::TemplateFactory<ClutterRemovalMapper,     ClutterRemovalReducer>() );

Compiling the pipes program.

Description using the ClutterRemoval

This is the Makefile used to compile the ClutterRemoval for command line but not pipes :

 DOCLIB_PATH= /chomes/hitmai/doclib
 CFLAGS=-O3 -I$(DOCLIB_PATH)$/include -I$(INCLUDE) -W -L$(DOCLIB_PATH)$/src/DOCLIB -L/usr/bin/ld -lDoclib -ltiff -lpng -ljpeg -lz
 all: commandline 
 	$(CC) $(CFLAGS) -o  clutterRemovalCommandLine ../src/clutterRemoval.cpp ../src/clutterRemovalDriver.cpp 
        ../src/featureExtraction.cpp ../utils/libsvm-2.91/svm-predict.cpp ../utils/libsvm-2.91/svm.cpp -I../include/ 
       -I../utils/libsvm-2.91/ -I$DOCLIB_PATH/include/ -I../utils/dlDT/linux/ -L$DOCLIB_PATH/src/DOCLIB -L../lib/ 
      -lDoclib -ldlDT -ltiff -lpng -ljpeg

This is the Makefile used to compile the ClutterRemoval for both command line and also the pipes (additional features are Underlined) :

 HADOOP_INSTALL = /chomes/hitmai/hadoop-0.20.1+152 	//the location of the hadoop intall directory
 PLATFORM = Linux-amd64-64		//linux platform Linux-amd64-64 if 64 bit or Linux-i386-32 if 32 bit
 //How to do this is described below
 CFLAGS=-O3 -I$(DOCLIB_PATH)$/include -I$(INCLUDE) -W -L$(DOCLIB_PATH)$/src/DOCLIB -L/usr/bin/ld -lDoclib -ltiff -lpng -ljpeg -lz
 -I$(DOCLIB_PATH)/include \
 -lhadooppipes -lhadooputils -lpthread \
 -L$(DOCLIB_PATH)/src/DOCLIB/.libs \
 -lDoclib -ltiff -lpng -ljpeg -lz 
 all: commandline pipes
 $(CC) $(CFLAGS) -o  clutterRemovalCommandLine ../src/clutterRemoval.cpp ../src/clutterRemovalDriver.cpp 
 ../src/featureExtraction.cpp ../utils/libsvm-2.91/svm-predict.cpp ../utils/libsvm-2.91/svm.cpp -I../include/ 
 -I../utils/libsvm-2.91/ -I$(DOCLIB_PATH)/include/ -I../utils/dlDT/linux/ -L$(DOCLIB_PATH)/src/DOCLIB -L../lib/ -lDoclib -ldlDT -ltiff -lpng -ljpeg
 $(CC) $(CPPFLAGS) -o clutterRemovalPipes ../src/clutterRemoval.cpp ../src/clutterRemovalPipes.cpp ../src/featureExtraction.cpp 
 ../utils/libsvm-2.91/svm-predict.cpp ../utils/libsvm-2.91/svm.cpp ../include/base64.cpp -I../include/ -I../utils/libsvm-2.91/ 
 -I$(DOCLIB_PATH)/include/ -I../utils/dlDT/linux/ -L$(DOCLIB_PATH)/src/DOCLIB -L../lib/ -lDoclib -ldlDT -ltiff -lpng -ljpeg ${LDFLAGS}	

Note: pipes target is very similar to the commandline target but uses the additional CPPFLAGS and LDFLAGS


Before you create the Makefile, you need to figure out whether your computer hosts a 32-bit processor or a 64-bit processor, and pick the right library. To find this out, run the following command:

 uname -a

To which the OS responds:

 Linux hadoop6 2.6.31-20-generic #58-Ubuntu SMP Fri Mar 12 05:23:09 UTC 2010 i686 GNU/Linux

The i686 indicates a 32-bit machine, for which you need to use the Linux-i386-32 library. Anything with 64 indicates the other type, for which you use the Linux-amd64-64 library. Running the pipes program on hadoop • Copy the executable file (clutterRemovalPipes) to the bin directory in HDFS:

 hadoop fs -mkdir bin                    (Note: it should already exist!)
 hadoop fs -put  clutterRemovalPipes bin/ clutterRemovalPipes

Run the Program with a command similar to the following general structure: $ bin/hadoop pipes -input inputPath -output outputPath -program path/to/pipes/program/executable

The command for the clutterRemovalPipes:

 hadoop pipes –D mapred.job.name=ClutterRemoval -D mapred.map.tasks=88 
 -D hadoop.pipes.java.recordreader=false -D hadoop.pipes.java.recordwriter=false -inputformat org.apache.hadoop.mapred.SequenceFileInputFormat 
 - writer org.apache.hadoop.mapred.SequenceFileOutputFormat -input LineSegInput -output ClutterOutputPipes_Feb12 -program bin/clutterRemovalPipes 
 -reduces 1 -reduce org.apache.hadoop.mapred.lib.IdentityReducer


 -D mapred.map.tasks=88 	//this is to set number of mappers
 -reduces 1 	//this is to set number of reducers
 -reduce org.apache.hadoop.mapred.lib.IdentityReducer	
 //this is to set the type of reducer. Here we use the IdentityReducer described above
 inputformat org.apache.hadoop.mapred.SequenceFileInputFormat 
 -writer org.apache.hadoop.mapred.SequenceFileOutputFormat
 //these two set the format type of input and output

Now you should have your pipes program working successfully!

Personal tools