[toc]

Hadoop Streaming说明文档

Hadoop Streaming rc2.6.5以后,Hadoop Streaming已作为Tools工具类

Hadoop Streaming安装

访问Index of /Code/JarDownload/hadoop-streaming下载最新版本,如果没有特别需要,例如定制功能等,请勿下载source版本.请一定要注意版本问题,如果下载的haodoop streaming放在本机上面分配任务以后,能完成job但是结果提示failed,请重新下载版本.

完成Hadoop-Streaming.jar的下载以后,放置到$HADOOP_HOME目录下(此处可以放置到任意目录下,但是为了方便文件管理,建议放置到HADOOP的安装目录中).

测试安装:

hadoop jar $HADOOP_HOME/hadoop-streaming.jar

出现下面的消息说明Hadoop streaming配置完成

No Arguments Given!
Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar [options]
Options:
  dumptb <glob-pattern> Dumps all files that match the given pattern to
                        standard output as typed bytes.
  loadtb <path> Reads typed bytes from standard input and stores them in
                a sequence file in the specified path
  [streamjob] <args> Runs streaming job with given arguments

第一个Hadoop Streaming程序

随便上传几个个文件到hadoop的hdfs文件系统中,比如说我们上传hadoop安装目录下的txt文件(LICENSE.txt,NOTICE.txt)全部上传到hdfs目录下的/input中.

hdfs dfs -put *.txt /input

查看文件是否上传

hdfs dfs -ls /input

开始第一个bash的Streaming程序

hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input /input\
-output /output\
-mapper cat\
-reducer wc\

等待结果输出,放置在hdfs文件系统目录下的output目录中

创建自定义的Streaming程序

C++版WordCount

  1. Mapper实现(mapper.cpp)
    #include <iostream>
    #include <string>
    using namespace std;
    int main() {
      string key;
      while(cin >> key) {
        cout << key << "\t" << "1" << endl;
        // Define counter named counter_no in group counter_group
        cerr << "reporter:counter:counter_group,counter_no,1\n";
        // dispaly status
        cerr << "reporter:status:processing......\n";
        // Print logs for testing
        cerr << "This is log, will be printed in stdout file\n";
      }
      return 0;
    }
    
  2. Reducer实现(reducer.cpp)
    #include <iostream>
    #include <string>
    using namespace std;
    int main() { //reducer将会被封装成一个独立进程,因而需要有main函数
      string cur_key, last_key, value;
      cin >> cur_key >> value;
      last_key = cur_key;
      int n = 1;
      while(cin >> cur_key) { //读取map task输出结果
        cin >> value;
        if(last_key != cur_key) { //识别下一个key
          cout << last_key << "\t" << n << endl;
          last_key = cur_key;
          n = 1;
        } else { //获取key相同的所有value数目
          n++; //key值相同的,累计value值
        }
      }
      cout << last_key << "\t" << n << endl;
      return 0;
    }
    
  3. 编译运行

    编译以上两个程序:

    g++ -o mapper mapper.cpp
    g++ -o reducer reducer.cpp
    

    测试:

    echo “hello world” | ./mapper | sort | ./reducer
    

    注:上面这种测试方法会频繁打印以下字符串,可以先注释掉,这些字符串hadoop能够识别

    reporter:counter:counter_group,counter_no,1
    reporter:status:processing……
    This is log, will be printed in stdout file
    

    测试通过后,可通过以下脚本将作业提交到集群中(run_cpp_mr.sh):

    #!/bin/bash
    HADOOP_HOME=/opt/yarn-client
    INPUT_PATH=/test/input
    OUTPUT_PATH=/test/output
    echo "Clearing output path: $OUTPUT_PATH"
    $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH
    
    ${HADOOP_HOME}/bin/hadoop jar\
    ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\
    -files mapper,reducer\
    -input $INPUT_PATH\
    -output $OUTPUT_PATH\
    -mapper mapper\
    -reducer reducer
    

Shell版WordCount

  1. Mapper实现(mapper.sh)
    #! /bin/bash
    while read LINE; do
        for word in $LINE
        do
        echo "$word 1"
        # in streaming, we define counter by
        # [reporter:counter:<group>,<counter>,<amount>]
        # define a counter named counter_no, in group counter_group
        # increase this counter by 1
        # counter shoule be output through stderr
        echo "reporter:counter:counter_group,counter_no,1" >&2
        echo "reporter:counter:status,processing......" >&2
        echo "This is log for testing, will be printed in stdout file" >&2
        done
    done
    
  2. Reducer实现(mapper.sh)
    #! /bin/bash
    count=0
    started=0
    word=""
    while read LINE;do
        newword=`echo $LINE | cut -d ' '  -f 1`
        if [ "$word" != "$newword" ];then
        [ $started -ne 0 ] && echo "$word\t$count"
        word=$newword
        count=1
        started=1
        else
        count=$(( $count + 1 ))
        fi
    done
        echo "$word\t$count"
    
  3. 测试运行

    测试以上两个程序:

    echo “hello world” | sh mapper.sh | sort | sh reducer.sh
    

    注:上面这种测试方法会频繁打印以下字符串,可以先注释掉,这些字符串hadoop能够识别

    reporter:counter:counter_group,counter_no,1
    reporter:status:processing……
    This is log, will be printed in stdout file
    

    测试通过后,可通过以下脚本将作业提交到集群中(run_shell_mr.sh):

    #!/bin/bash
    HADOOP_HOME=/opt/yarn-client
    INPUT_PATH=/test/input
    OUTPUT_PATH=/test/output
    echo "Clearing output path: $OUTPUT_PATH"
    $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH
    
    ${HADOOP_HOME}/bin/hadoop jar\
        ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\
        -files mapper.sh,reducer.sh\
        -input $INPUT_PATH\
        -output $OUTPUT_PATH\
        -mapper "sh mapper.sh"\
        -reducer "sh reducer.sh"
    

参考文档

依照参考内容为权重从上至下排序,对前辈的辛勤付出表示感谢!