[toc]
Hadoop Streaming说明文档
Hadoop Streaming rc2.6.5以后,Hadoop Streaming已作为Tools工具类
Hadoop Streaming安装
访问Index of /Code/JarDownload/hadoop-streaming下载最新版本,如果没有特别需要,例如定制功能等,请勿下载source版本.请一定要注意版本问题,如果下载的haodoop streaming放在本机上面分配任务以后,能完成job但是结果提示failed,请重新下载版本.
完成Hadoop-Streaming.jar的下载以后,放置到$HADOOP_HOME目录下(此处可以放置到任意目录下,但是为了方便文件管理,建议放置到HADOOP的安装目录中).
测试安装:
hadoop jar $HADOOP_HOME/hadoop-streaming.jar
出现下面的消息说明Hadoop streaming配置完成
No Arguments Given!
Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar [options]
Options:
dumptb <glob-pattern> Dumps all files that match the given pattern to
standard output as typed bytes.
loadtb <path> Reads typed bytes from standard input and stores them in
a sequence file in the specified path
[streamjob] <args> Runs streaming job with given arguments
第一个Hadoop Streaming程序
随便上传几个个文件到hadoop的hdfs文件系统中,比如说我们上传hadoop安装目录下的txt文件(LICENSE.txt,NOTICE.txt)全部上传到hdfs目录下的/input中.
hdfs dfs -put *.txt /input
查看文件是否上传
hdfs dfs -ls /input
开始第一个bash的Streaming程序
hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input /input\
-output /output\
-mapper cat\
-reducer wc\
等待结果输出,放置在hdfs文件系统目录下的output目录中
创建自定义的Streaming程序
C++版WordCount
- Mapper实现(mapper.cpp)
#include <iostream> #include <string> using namespace std; int main() { string key; while(cin >> key) { cout << key << "\t" << "1" << endl; // Define counter named counter_no in group counter_group cerr << "reporter:counter:counter_group,counter_no,1\n"; // dispaly status cerr << "reporter:status:processing......\n"; // Print logs for testing cerr << "This is log, will be printed in stdout file\n"; } return 0; }
- Reducer实现(reducer.cpp)
#include <iostream> #include <string> using namespace std; int main() { //reducer将会被封装成一个独立进程,因而需要有main函数 string cur_key, last_key, value; cin >> cur_key >> value; last_key = cur_key; int n = 1; while(cin >> cur_key) { //读取map task输出结果 cin >> value; if(last_key != cur_key) { //识别下一个key cout << last_key << "\t" << n << endl; last_key = cur_key; n = 1; } else { //获取key相同的所有value数目 n++; //key值相同的,累计value值 } } cout << last_key << "\t" << n << endl; return 0; }
- 编译运行
编译以上两个程序:
g++ -o mapper mapper.cpp g++ -o reducer reducer.cpp
测试:
echo “hello world” | ./mapper | sort | ./reducer
注:上面这种测试方法会频繁打印以下字符串,可以先注释掉,这些字符串hadoop能够识别
reporter:counter:counter_group,counter_no,1 reporter:status:processing…… This is log, will be printed in stdout file
测试通过后,可通过以下脚本将作业提交到集群中(run_cpp_mr.sh):
#!/bin/bash HADOOP_HOME=/opt/yarn-client INPUT_PATH=/test/input OUTPUT_PATH=/test/output echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\ -files mapper,reducer\ -input $INPUT_PATH\ -output $OUTPUT_PATH\ -mapper mapper\ -reducer reducer
Shell版WordCount
- Mapper实现(mapper.sh)
#! /bin/bash while read LINE; do for word in $LINE do echo "$word 1" # in streaming, we define counter by # [reporter:counter:<group>,<counter>,<amount>] # define a counter named counter_no, in group counter_group # increase this counter by 1 # counter shoule be output through stderr echo "reporter:counter:counter_group,counter_no,1" >&2 echo "reporter:counter:status,processing......" >&2 echo "This is log for testing, will be printed in stdout file" >&2 done done
- Reducer实现(mapper.sh)
#! /bin/bash count=0 started=0 word="" while read LINE;do newword=`echo $LINE | cut -d ' ' -f 1` if [ "$word" != "$newword" ];then [ $started -ne 0 ] && echo "$word\t$count" word=$newword count=1 started=1 else count=$(( $count + 1 )) fi done echo "$word\t$count"
- 测试运行
测试以上两个程序:
echo “hello world” | sh mapper.sh | sort | sh reducer.sh
注:上面这种测试方法会频繁打印以下字符串,可以先注释掉,这些字符串hadoop能够识别
reporter:counter:counter_group,counter_no,1 reporter:status:processing…… This is log, will be printed in stdout file
测试通过后,可通过以下脚本将作业提交到集群中(run_shell_mr.sh):
#!/bin/bash HADOOP_HOME=/opt/yarn-client INPUT_PATH=/test/input OUTPUT_PATH=/test/output echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\ -files mapper.sh,reducer.sh\ -input $INPUT_PATH\ -output $OUTPUT_PATH\ -mapper "sh mapper.sh"\ -reducer "sh reducer.sh"
参考文档
依照参考内容为权重从上至下排序,对前辈的辛勤付出表示感谢!