一、Hadoop Streaming
Streaming是Hadoop提供的一个可以使用其他编程语言来进行MapReduce来的API,因为Hadoop是基于Java(由于作者比较擅长Java,Lucene和Nutch都是出于Hadoop的作者)。Hadoop Streaming并不复杂,其只是使用了Unix的标准输入输出作为Hadoop和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程序的输入,将标准输出作为程序的输出就可以了。
在标准的输入输出中,key和value是以tab作为分隔符,并且在reduce的标准输入中,hadoop框架保证了输入的数据是经过了按key排序的。
下面的示例是用Python重写了上一个示例:
# max_temperature_map.py
#!/usr/bin/env python
import sys
for line in sys.stdin:
val = line.strip().split()
# 分隔年份和温度值,输出到标准输出
print "%s\t%s"%(val[0], val[1])
# max_temperature_reduce.py
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, 0)
for line in sys.stdin:
(key, temp) = line.strip().split('\t')
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(temp))
else:
(last_key, max_val) = (key, max(max_val, int(temp)))
if last_key:
print "%s\t%s" % (last_key, max_val)
“if last_key and last_key != key”这一行命令主要因为当已经完成了某(key, value[])对的处理后直接输出,然后再重置(last_key, max_val)进行新的(key, value[])处理。因为reduce的输入是经过了排序的,因此”if last_key and last_key != key”是处理某key对应的value列表。
执行命令:
hadoop jar /home/stephenchan/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-streaming.jar \ -input sample.txt \ -output pyoutput \ -mapper max_temperature_map.py \ -reducer max_temperature_reduce.py # 不用MapReduce其实也相当于下面的命令: # cat sample.txt | python max_temperature_map.py | sort | python max_temperature_reduce.py > pyoutput.txt
二、Hadoop Pipes
Hadoop Pipes是Hadoop MapReduce的C++接口。与使用标准输入输出的Hadoop Streaming不同(当然Streaming也可以用于C++),Hadoop Pipes在tasktacker和map/reduce进行通信时使用的socket作为管道,不是标准输入输出,而不是JNI。
Hadoop Pipes不能运行在standalone模式下,所以要先配置成pseudo-distributed模式,因为Hadoop Pipes依赖于Hadoop的分布式缓存技术,而分布式缓存只会在HDFS运行的时候才会支持。
与Java的接口不一样,Hadoop Pipes的key和value都是基于STL的string,因此在处理时开发人员需要手动地进行数据类型的转换。
C++示例代码:
/* max_temperature.cpp */
#include <algorithm>
#include <limits>
#include <string>
#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
class MaxTemperatureMapper : public HadoopPipes::Mapper {
public:
MaxTemperatureMapper(HadoopPipes::TaskContext& context) {
}
void map(HadoopPipes::MapContext& context) {
std::string line = context.getInputValue();
std::string year = line.substr(0, 4);
std::string temp = line.substr(5, 3);
context.emit(year, temp);
}
};
class MaxTemperatureReducer : public HadoopPipes::Reducer {
public:
MaxTemperatureReducer(HadoopPipes::TaskContext& context) {
}
void reduce(HadoopPipes::ReduceContext& context) {
int maxValue = 0;
while (context.nextValue()) {
maxValue = std::max(maxValue, HadoopUtils::toInt(context.getInputValue()));
}
context.emit(context.getInputKey(), HadoopUtils::toString(maxValue));
}
};
int main(int argc, char *argv[]) {
return HadoopPipes::runTask(HadoopPipes::TemplateFactory<MaxTemperatureMapper, MaxTemperatureReducer>());
}
makefile代码:
PLATFORM=Linux-i386-32 CC = g++ CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include max_temperature: max_temperature.cpp $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \ -lhadooputils -lpthread -g -O2 -o $@
在使用make命令生成了max_temperature可执行文件之后,就要使用”hadoop fs”将可执行文件和示例数据文件拷贝到伪dfs中去。
hadoop fs -put max_temperature bin/max_temperature hadoop fs -put sample.txt temp_sample.txt hadoop pipes \ -D hadoop.pipes.java.recordreader=true \ -D hadoop.pipes.java.recordwriter=true \ -input temp_sample.txt \ -output cppoutput \ -program bin/max_temperature # 运行的结果也是保存在伪dfs中,使用"hadoop fs"来查看 hadoop fs -cat cppoutput/part*