简单的Streaming和Pipes示例

一、Hadoop Streaming

Streaming是Hadoop提供的一个可以使用其他编程语言来进行MapReduce来的API,因为Hadoop是基于Java(由于作者比较擅长Java,Lucene和Nutch都是出于Hadoop的作者)。Hadoop Streaming并不复杂,其只是使用了Unix的标准输入输出作为Hadoop和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程序的输入,将标准输出作为程序的输出就可以了。

在标准的输入输出中,key和value是以tab作为分隔符,并且在reduce的标准输入中,hadoop框架保证了输入的数据是经过了按key排序的。

下面的示例是用Python重写了上一个示例:

# max_temperature_map.py
#!/usr/bin/env python
import sys
for line in sys.stdin:
 val = line.strip().split()
 # 分隔年份和温度值,输出到标准输出
 print "%s\t%s"%(val[0], val[1])

# max_temperature_reduce.py
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, 0)
for line in sys.stdin:
 (key, temp) = line.strip().split('\t')
 if last_key and last_key != key:
 print "%s\t%s" % (last_key, max_val)
 (last_key, max_val) = (key, int(temp))
 else:
 (last_key, max_val) = (key, max(max_val, int(temp)))

if last_key:
 print "%s\t%s" % (last_key, max_val)

“if last_key and last_key != key”这一行命令主要因为当已经完成了某(key, value[])对的处理后直接输出,然后再重置(last_key, max_val)进行新的(key, value[])处理。因为reduce的输入是经过了排序的,因此”if last_key and last_key != key”是处理某key对应的value列表。

执行命令:

hadoop jar /home/stephenchan/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-streaming.jar \
-input sample.txt \
-output pyoutput \
-mapper max_temperature_map.py \
-reducer max_temperature_reduce.py
# 不用MapReduce其实也相当于下面的命令:
# cat sample.txt | python max_temperature_map.py | sort | python max_temperature_reduce.py > pyoutput.txt

二、Hadoop Pipes

Hadoop Pipes是Hadoop MapReduce的C++接口。与使用标准输入输出的Hadoop Streaming不同(当然Streaming也可以用于C++),Hadoop Pipes在tasktacker和map/reduce进行通信时使用的socket作为管道,不是标准输入输出,而不是JNI。

Hadoop Pipes不能运行在standalone模式下,所以要先配置成pseudo-distributed模式,因为Hadoop Pipes依赖于Hadoop的分布式缓存技术,而分布式缓存只会在HDFS运行的时候才会支持。

与Java的接口不一样,Hadoop Pipes的key和value都是基于STL的string,因此在处理时开发人员需要手动地进行数据类型的转换。

C++示例代码:

/* max_temperature.cpp */
#include <algorithm>
#include <limits>
#include <string>

#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"

class MaxTemperatureMapper : public HadoopPipes::Mapper {
 public:
 MaxTemperatureMapper(HadoopPipes::TaskContext& context) {
 }

 void map(HadoopPipes::MapContext& context) {
 std::string line = context.getInputValue();
 std::string year = line.substr(0, 4);
 std::string temp = line.substr(5, 3);
 context.emit(year, temp);
 }
};

class MaxTemperatureReducer : public HadoopPipes::Reducer {
 public:
 MaxTemperatureReducer(HadoopPipes::TaskContext& context) {
 }

 void reduce(HadoopPipes::ReduceContext& context) {
 int maxValue = 0;
 while (context.nextValue()) {
 maxValue = std::max(maxValue, HadoopUtils::toInt(context.getInputValue()));
 }
 context.emit(context.getInputKey(), HadoopUtils::toString(maxValue));
 }
};

int main(int argc, char *argv[]) {
 return HadoopPipes::runTask(HadoopPipes::TemplateFactory<MaxTemperatureMapper, MaxTemperatureReducer>());
}

makefile代码:

PLATFORM=Linux-i386-32
CC = g++
CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

max_temperature: max_temperature.cpp
 $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \
 -lhadooputils -lpthread -g -O2 -o $@

在使用make命令生成了max_temperature可执行文件之后,就要使用”hadoop fs”将可执行文件和示例数据文件拷贝到伪dfs中去。

hadoop fs -put max_temperature bin/max_temperature
hadoop fs -put sample.txt temp_sample.txt
hadoop pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-input temp_sample.txt \
-output cppoutput \
-program bin/max_temperature
# 运行的结果也是保存在伪dfs中,使用"hadoop fs"来查看
hadoop fs -cat cppoutput/part*
This entry was posted in Hadoop. Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>