Hadoop은 Java로 Mapper, Reducer를 만들어서 실행하는 MapReduce 외에,

STDIN/STDOUT을 통해 데이터를 전달받아 처리하는 실행파일을 이용하여 MapReduce를 할 수 있도록 Streaming 방식을 제공한다.

사용방법은 Hadoop에 기본으로 포함되어 있는 hadoop-streaming-VERSION.jar 파일에 여러 옵션을 붙여 실행하는 방식이다.


텍스트 형태 밖에 다룰 수 없어 Input/Output의 선택의 폭이 좁으며,

세밀한 제어가 힘들고,

퍼포먼스가 딸리는 단점이 있지만,

Java를 모르는 상태에서 텍스트 데이터만 간단히 MapReduce 하기에는 상당히 편리하다.


최소한의 실행규칙은 아래와 같다.

$HADOOP_HOME/bin/hadoop jar $STREAMING_JAR_HOME/hadoop-streaming-*.jar \

  -input INPUT_PATH \

  -output OUTPUT_PATH \

  -mapper MAPPER_SCRIPT \

  -reducer REDUCER_SCRIPT


$STREAMING_JAR_HOME은 Hadoop 버전에 따라 아래처럼 나뉜다.

1.X = $HADOOP_HOME/contrib/streaming

2.X = $HADOOP_HOME/share/hadoop/tools/lib


mapper, reducer는 굳이 직접 만든 스크립트로 하지 않고 UNIX 명령어를 그대로 이용해도 된다.

하지만 만약 UNIX 파이프를 이용할 경우 에러가 발생하므로 파이프를 포함한 명령어 전체를 스크립트로 만들어서 실행해야 한다.


아래는 UNIX 명령어를 이용하는 예제.

$HADOOP_HOME/bin/hadoop jar $STREAMING_JAR_HOME/hadoop-streaming-*.jar \

  -input INPUT_PATH \

  -output OUTPUT_PATH \

  -mapper /bin/cat \

  -reducer /usr/bin/wc







예제

PHP를 이용하여 mapper, reducer 스크립트를 만들어서 apache log에서 IP별 카운트를 추출하는 MapReduce 예제를 기록해둔다.


mapper.php

#!/usr/bin/php
<?PHP
ini_set('memory_limit', -1);

$arrIP = array();

// apache log가 STDIN으로 쭉쭉 들어옴
while (($line = fgets(STDIN)) !== false) {
    $line = rtrim($line);
    if (!$line) continue;

    // apache log에서 IP 추출
    $ip = getIpAddrFromApacheLog($line);

    // 카운트 증가
    if (!isset($arrIP[$ip])) $arrIP[$i] = 0;
    ++$arrIP[$ip];
}

// reducer로 보내기
foreach ($arrIP as $ip=>$cnt) echo $ip."\t".$cnt."\n";

// apache log에서 ip를 추출
function getIpAddrFromApacheLog($apacheLog)
{
    preg_match("/^[0-9\.]+/", $apacheLog, $match);

    return $match[0];
}


reducer.php

#!/usr/bin/php
<?PHP
ini_set('memory_limit', -1);

$arrIP = array();

// mapper에서 출력한 IP\tCOUNT 형태의 문자열이 STDIN으로 쭉쭉 들어옴
while (($line = fgets(STDIN)) !== false) {
    $line = rtrim($line);
    if (!$line) continue;

    list($ip, $cnt) = explode("\t", $line);

    // reducing...
    if (!isset($arrIP[$ip])) $arrIP[$ip] = 0;
    $arrIP[$ip] += $cnt;
}

// 최종 출력
foreach ($arrIP as $ip=>$cnt) {
    echo $ip."\t".$cnt."\n";
}


mapper.php, reducer.php 에 실행권한을 부여해야 함.

[root@localhost]# chmod 755 mapper.php reducer.php


HDFS의 분석할 apache log 데이터는 /log_data/apache_log/201505 디렉토리에 2015-05-01.log, 2015-05-02.log 같은 형태로 저장되어 있다고 가정하고,

2015-05-10 부터 2015-05-19 까지의 로그데이터를 분석한다고 가정하고,

최종결과물은 /log_data/apache_log_result 디렉토리에 저장된다고 가정할 때 아래처럼 실행.

($HADOOP_HOME/bin 과 $STREAMING_JAR_HOME은 PATH에 등록되어 있다고 가정함)

// 혹시 이전 결과 데이터가 있으면 지워야 job 실행이 가능함.

[root@localhost]# hadoop dfs -rmr /log_data/apache_log_result

// streaming mapreduce job 실행

[root@localhost]# hadoop jar hadoop-streaming-*.jar \

  -input /log_data/apache_log/201505/2015-05-1*.log \

  -output /log_data/apache_log_result \

  -mapper mapper.php \

  -reducer reducer.php \

  -file mapper.php \

  -file reducer.php


위 실행 후 HDFS의 /log_data/apache_log_result 디렉토리에 보면 최종 결과물로 part-00000~N 까지의 파일이 있고,

각 파일은 아래처럼 IP\t카운트 형태의 결과물을 가진다.

211.34.104.199    99

211.34.104.200    107

211.34.104.201    370

...

...









Options


Hadoop Streaming에는 Generic, Streaming 이렇게 2가지 타입의 옵션이 있다.


Generic Options

Streaming에만 국한되지 않는 일반적인 옵션이다.

Hadoop Streaming 실행 명령어에서 반드시 Streaming Option보다 앞부분에 위치해야 한다.



 -conf 설정파일 

 설정파일 지정 

 -D property=value

 property 값 지정 

 -fs HOST:PORT 혹은 local

 namenode 지정 

 -files

 분산캐시의 파일 경로 지정 

 -libjars

 분산캐시의 jar 경로 지정 

 -archives

 분산캐시의 압축파일 경로 지정 


-D

실행하는 job에 한하여 각종 property의 값을 임시로 지정할 수 있다.

아래는 몇몇 옵션 설정의 예.

(옵션이름은 Hadoop 1.X 기준)

hadoop jar hadoop-streaming-*.jar \

  // reducer는 실행하지 않고 mapper만 실행

  -D mapred.reduce.tasks=0 \

  // input split의 사이즈를 128M로 지정

  -D mapred.min.split.size=134217728



-fs

namenode를 지정하는 옵션이다.

HDFS Federation을 사용한다면 간과할 수 없는 옵션.

hadoop jar hadoop-streaming-*.jar \

  -fs hdfs://namenode.bloodguy.com:9000



-files

이 옵션에 HDFS에 업로드한 파일 경로를 지정하고 job을 실행하면, 

각 노드의 local에 지정한 파일을 복사하고 파일명으로 symlink를 만든다.


mapper, reducer를 직접 작성한 파일로 하고자 한다면 전체 노드에 해당 스크립트들이 복사되어야 한다.

보통은 Streaming Option 인 -file 옵션으로 전체 노드에 복사할 파일을 하나씩 지정하는데,

현재 -file 옵션은 deprecated 상태이고 -files 옵션 사용을 권장하고 있다.


mapper, reducer로 사용할 스크립트 파일이나 옵션, dictionary 파일이 있을 경우 HDFS에 업로드하고,

-files 옵션을 이용해서 각 노드의 로컬에 가져와 사용할 수 있다.


만약 mapper, reducer로 각각 mapper.php, reducer.php 라는 파일을 사용한다면,

우선 HDFS의 특정경로에 mapper.php, reducer.php를 업로드 해놓고,

Streaming MapReduce 실행시 아래처럼 -files 옵션으로 지정하면 된다.

여러 파일일 경우 콤마로 나눠서 줄줄줄...

hadoop jar hadoop-streaming-*.jar \

  -files hdfs://NAMENODE:9000/script_file/mapper.php,hdfs://NAMENODE:9000/script_file/reducer.php \

  -input INPUT_PATH \

  -output OUTPUT_PATH \

  -mapper mapper.php \

  -reducer reducer.php


# 을 이용해서 symlink 이름을 재지정 할 수도 있다.

-files hdfs://namenode.bloodguy.com:9000/dictionary/dic.txt#mydic



-libjars

지정한 jar 파일을 각 노드별 CLASSPATH에서 이용할 수 있도록 해준다.



-archives

-files 옵션과 유사한데,

-files 옵션은 파일을 하나씩 콤마로 연결하여 줄줄줄 지정해야 하는 반면,

-archives 옵션은 HDFS의 압축파일 경로를 지정하고 job을 실행하면, 

각 노드별로 압축파일명을 디렉토리 명으로 해서 압축이 풀린 상태의 파일을 사용할 수 있게 해준다.

symlink의 이름을 마음대로 바꾸는 것도 가능하다.


아래처럼 하면 mapper/reducer 스크립트에서 dict/dict1.txt, dict/dict2.txt 라는 압축이 풀린 상태의 dictionary 파일을 이용할 수 있다.

// 압축

[root@localhost]# jar cvf dict.jar dict1.txt dict2.txt

// HDFS 업로드

[root@localhost]# hadoop dfs -put ./dict.jar /myArchives

// Streaming MapReduce 실행

[root@localhost]# hadoop jar hadoop-streaming-*.jar \

-archives hdfs://namenode.bloodguy.com:9000/myArchives/dict.jar#dict \

-input INPUT_PATH \

-output OUTPUT_PATH \

-mapper mapper.php \

-reducer reducer.php \

-file mapper.php \

-file reducer.php




Streaming Options

Generic Option과 다른 Streaming 전용옵션이다.

실행명령어에서 반드시 Generic Option 다음에 위치해야 한다.



 -input (필수옵션) 

 MapReduce를 수행하고자 하는 대상 데이터의 경로.

여러개를 지정하려면 각 대상 경로마다 -input 옵션을 추가해서 지정해주면 된다.

ex)

-input INPUT_PATH1 \

-input INPUT_PATH2 \

 -output (필수옵션)

 job의 최종 출력 결과가 저장될 디렉토리 경로이다. 

 -mapper (필수옵션)

 mapper로 사용할 실행파일명 혹은 Java Class 이름 

 -reducer (필수옵션)

 reducer로 사용할 실행파일명 혹은 Java Class 이름 

 -file

 mapper, reducer, combiner 실행파일이나 dictionary 파일 등 MapReduce 실행에 필요한 파일은 이 옵션으로 지정하여 전체 노드에 복사되어야 한다.

하지만 이제 deprecated 되었으므로 대신 -files 옵션을 이용하는 것을 권장. 

 -inputformat

 input format을 처리할 Java Class 이름. 지정하지 않을 경우 기본값은 TextInputFormat 

 -outputformat 

 output format을 처리할 Java Class 이름. 지정하지 않을 경우 기본값은 TextOutputFormat 

 -partitioner 

 mapper의 output이 어떻게 나눠져서 reducer로 전달될 지 결정하는 Java Class 이름. 

 -combiner

 combiner로 지정할 실행파일이나 Java Class 이름. 

 -cmdenv

 job에서 사용할 수 있는 환경변수 지정 

 -inputreader

 record reader class 지정. 

 -verbose

 지정할 경우 더 자세한 출력결과를 볼 수 있다. 

 -lazyOutput

 output을 지연해서 생성함. FileOutputFormat 클래스를 이용할 경우, 최초의 Context.write()가 호출되었을 때에만 output file이 생성됨.

내용없는 빈 파일이 생성되길 원하지 않을 경우 이 옵션을 사용하면 된다. 

 -numReduceTasks

 reduce task의 수를 지정. 

 Generic Option에서 -D mapred.reduce.tasks 와 동시에 지정하고 실행하면 numReduceTasks로 지정한 수만큼 reducer가 생성된다.

 -mapdebug

 -reducedebug

 map/reduce가 실패했을 경우 이 옵션으로 지정한 스크립트 파일이 아래와 같은 arguments와 함께 실행된다.

 $SCRIPT  $STDOUT  $STDERR  $SYSLOG  $JOBCONF








Compress

Streaming에서도 Generic Option의 -D를 이용한 property 설정을 통해 map/reduce 출력의 압축이 가능하다.

압축옵션이 설정되었다 해도 기존 mapper, reducer 스크립트의 STDIN으로 넘어오는 데이터는 Hadoop이 알아서 압축을 해제해서 넘겨주기 때문에,

스크립트에 압축을 해제한다거나 하는 코드를 따로 추가할 필요없이 그대로 사용하면 된다.


설정가능한 property는 아래와 같다.

(괄호 안은 버전2의 property 이름)


 mapred.compress.map.output

(mapreduce.map.output.compress

 mapper에서 reducer로 넘어가는 shuffle 단계에서 데이터를 압축한다. 

 mapred.map.output.compression.codec

(mapreduce.map.output.compress.codec)

 mapper에서 reducer로 넘어가는 shuffle 단계의 압축코덱 선택. 

 mapred.output.compress

(mapreduce.output.fileoutputformat.compress)

 job 최종 output 압축.

 mapred.output.compression.type

(mapreduce.output.fileoutputformat.compress.type)

 압축타입 지정. 여러 레코드를 한 번에 압축하여 압축률이 좋은 BLOCK으로 지정하는 것을 권장. 

 mapred.output.compression.codec

(mapreduce.output.fileoutputformat.compress.codec)

 job 최종 output 압축코덱 선택. 


shuffle 단계의 압축만 하려면 아래처럼 실행.

hadoop jar hadoop-streaming-*.jar \

  -D mapred.output.compression.type=BLOCK \

  -D mapred.compress.map.output=true \

  -D mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \

  -input INPUT_PATH \

  -output OUTPUT_PATH \

  -mapper /bin/cat \

  -reducer /bin/cat


위처럼 실행하고 job counter를 보면 확실히 FILE_BYTES_READ/WRITE와 reduce shuffle bytes가 줄어든 것을 확인할 수 있다.


job 모든 단계의 output을 압축하려면 아래처럼 실행.

OUTPUT_PATH의 최종결과물도 압축파일이며 part-0000X.gz 같은 형태의 이름을 가진다.

hadoop jar hadoop-streaming-*.jar \

  -D mapred.output.compression.type=BLOCK \

  -D mapred.compress.map.output=true \

  -D mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \

  -D mapred.output.compress=true \

  -D mapred.output.compression.codec=org.apache.io.compress.GzipCodec \

  -input INPUT_PATH \

  -output OUTPUT_PATH \

  -mapper /bin/cat \

  -reducer /bin/cat 


이래저래 테스트를 해봤는데 GzipCodec은 압축효율 좋지만 CPU를 많이 잡아 먹어서,

I/O에선 이득일지 모르겠으나 수행시간이 느려져서 쓸만한 물건이 아니라는 결론을 얻었다.

진짜 진짜 Network I/O가 병목이라 반드시 전송 데이터 크기를 줄여야 하지 않는 이상은 안 쓸테다.

압축을 하려면 효율은 떨어지지만 속도는 빠른 Snappy가 갑인 듯.








Aggregate

Hadoop은 mapper만 작성하면 간단한 통계수치 정도는 뽑을 수 있게 해주는 Aggregate라는 라이브러리 패키지를 제공한다.

mapper에서 AGG_NAME:LABEL\tVALUE 형태로 출력하고 reducer로 aggregate를 지정하는 방식으로 사용한다.


아래 예제는 mapper만 작성하고 reduce는 aggregate에 맡겨서 apache log에서 ip별 접속 카운트 합계를 구하는 방법이다.

우선 mapper.php

#!/usr/bin/php
<?PHP
// apache log가 한 줄씩 STDIN으로 넘어옴
while (($line = fgets(STDIN)) !== false) {
    // apache log에서 ip만 뽑는 함수가 있다고 가정하자
    $ip = getIpAddrFromApacheLog($line);

    // aggregate에 보내서 카운트를 1씩 증가시키는 출력
    echo "LongValueSum:".$ip."\t1\n";
}


위와 같이 작성된 mapper를 이용하여 아래처럼 MapReduce를 실행시키면 원하는 결과를 얻을 수 있다.

hadoop jar hadoop-streaming-*.jar \

  -input INPUT_PATH \

  -output OUTPUT_PATH \

  -mapper mapper.php \

  -reducer aggregate \

  -file mapper.php


사용할 수 있는 aggregate 타입은 아래와 같다.


DoubleValueSum

  - 출력형식: DoubleValueSum:LABEL\tVALUE\n

  - LABEL에 해당하는 수치를 각각 합산한다. 증가시킬 VALUE의 데이터 타입이 Double 형이다.

LongValueMax

  - 출력형식: LongValueMax:LABEL\tVALUE\n

  - LABEL에 해당하는 VALUE 중 가장 높은 값을 추출한다. VALUE의 데이터 타입이 Long 형이다.

LongValueMin

  - 출력형식: LongValueMin:LABEL\tVALUE\n

  - LABEL에 해당하는 VALUE 중 가장 낮은 값을 추출한다. VALUE의 데이터 타입이 Long 형이다.

LongValueSum

  - 출력형식: LongValueSum:LABEL\tVALUE\n

  - LABEL에 해당하는 VALUE를 합산한다. VALUE의 데이터 타입이 Long 형이다.

StringValueMax

  - 출력형식: StringValueMax:LABEL\tVALUE\n

  - LABEL에 해당하는 VALUE 중 가장 큰 값을 추출한다. VALUE의 데이터 타입이 String 형이다.

StringValueMin

  - 출력형식: StringValueMin:LABEL\tVALUE\n

  - LABEL에 해당하는 VALUE 중 가장 작은 값을 추출한다. VALUE의 데이터 타입이 String 형이다.

UniqValueCount

  - 출력형식: UniqValueCount:LABEL\n

  - LABEL을 Unique하게 추출한다. 최종 결과값에 각 LABEL은 하나씩만 존재한다.

ValueHistogram

  - 출력형식: ValueHistogram:LABEL\tVALUE\n

  - LABEL별 히스토그램을 계산한다. 최종출력 결과는 다음과 같은 순서다.

LABEL1  카운트  최소값  중앙값(Median)  최대값  평균값  표준편차

LABEL2  카운트  최소값  중앙값(Median)  최대값  평균값  표준편차

... ...


테스트 해 본 결과 동일한 일을 하는 reducer 스크립트를 만들어서 돌리는 게 빠르다.

1G 데이터를 대상으로 아래와 같은 실행시간이 걸렸다.

aggregate = 58초

reducer    = 49초







Counter, Status

mapper, reducer 스크립트에서 자신만의 Counter, Status를 남기는 방법은 지정된 포맷을 만족하는 문자열을 STDERR로 출력하는 것이다.

Counter의 경우 reporter:counter:그룹명,카운터명,카운트 형태이고,

Status의 경우 reporter:status:상태메세지 형태이다.


아래는 php로 된 mapper.php, reducer.php 에서 host별 처리 레코드 수를 카운팅하는 예제이다.

#!/usr/bin/php
<?PHP
$JobCounter = new JobCounter;
$counterName = sprintf("Input Records (%s)", php_uname('n'));

$totalCount = 0;
$hashTable = array();
while (($input = fgets(STDIN)) !== false) {
    // 처리 레코드 수 증가
    ++$totalCount;

    // 레코드 카운트를 실시간으로 확인하고 싶으면 여기서 1씩 증가시키면 됨.
    //$JobCounter->setCounter('HostCounters', $counterName, 1);

    list($key, $value) = explode("\t", $input);
    if (!isset($hashTable[$key])) $hashTable[$key] = 0;

    $hashTable[$key] += $value;
}

// 실시간이 중요하지 않고 완료시 카운트만 확인하고 싶으면
// 증가시킨 카운트를 한 번에 처리하는게 퍼포먼스가 좋다.
$jobCounter->setCounter('HostCounters', $counterName, $totalCount);

foreach ($hashTable as $key=>$value) {
    echo $key."\t".$value."\n";
}


// Job Counter 관련 클래스
class JobCounter
{
    private $fp = null;

    public function __construct()
    {
        $this->fp = fopen('php://stderr', 'w+');
    }

    public function __destruct()
    {
        fclose($this->fp);
    }

    // 카운터 증가
    public function setCounter($groupName, $counterName, $cnt)
    {
        $output = sprintf("reporter:counter:%s,%s,%d\n", $groupName, $counterName, $cnt);
        fwrite($this->fp, $output);
    }

    // 상태메세지 설정
    public function setStatus($statusMsg)
    {
        $output = sprintf("reporter:status:%s\n",$statusMsg );
        fwrite($this->fp, $output);
    }
}


위 스크립트를 mapper, reducer로 지정하여 job을 실행하고 나면,

아래 이미지처럼 job counter에서 노드당 얼만큼의 레코드를 처리했는지 한 눈에 알 수 있게 된다.









Key, Value

MapReduce framework은 mapper의 stdout을 기본적으로 TAB(\t)을 separator로 하여 key, value로 나눠 읽어 들인다.

하지만 옵션을 이용해서 이 부분을 수정할 수 있다.


숫자와 . 으로 이루어진 아래와 같은 원본 데이터가 있다고 가정하자.

523.307.365.845.344.761.967.768.677.391

165.91.908.827.165.167.26.176.10.303

934.827.97.659.144.857.441.665.494.864

471.16.171.837.862.516.597.828.283.273

219.449.365.126.275.531.294.301.707.304

605.641.131.702.299.276.559.740.941.52

604.412.69.776.248.931.291.846.759.574

119.978.23.484.103.298.14.397.600.722

702.205.362.833.908.662.109.466.402.50

519.5.462.588.781.711.519.71.556.277

...

...


아래와 같은 옵션으로 MapReduce를 실행하면 4번째 . 을 separator로 하여 앞부분은 key, 나머지를 value로 지정할 수 있다.

hadoop jar hadoop-streaming-.jar \

  -D stream.map.output.field.separator=. \

  -D stream.num.map.output.key.fields=4 \

  -input INPUT_PATH \

  -output OUTPUT_PATH \

  -mapper /bin/cat \

  -reducer /bin/cat


위 실행 결과물은 아래와 같은 형태가 된다. (separator는 사라짐)

0.30.361.224    451.385.66.305.210.932

0.30.768.557    441.815.491.373.296.363

0.311.183.415   392.274.775.981.633.999

0.314.352.492   873.695.43.527.230.574

0.317.890.792   289.926.146.306.884.876

0.319.336.558   663.876.80.645.1.268

0.322.900.183   396.364.972.104.698.731


reducer의 출력도 쪼갤 수 있다.

hadoop jar hadoop-streaming-*.jar \

  -D stream.map.output.field.separator=. \

  -D stream.num.map.output.key.fields=4 \

  -D stream.reduce.output.field.separator=. \

  -D stream.num.reduce.output.key.fields=2 \

  -input INPUT_PATH \

  -output OUTPUT_PATH \

  -mapper /bin/cat \

  -reducer /bin/cat


위 옵션으로 MapReduce를 실행하면,

4번째 . 에서 나뉘었던 mapper의 출력이 reducer에서 다시 2번째 . 으로 한 번 더 쪼개진 아래와 같은 형태의 결과물을 얻을 수 있다.

0.30    361.224    451.385.66.305.210.932

0.30    768.557    441.815.491.373.296.363

0.311   183.415    392.274.775.981.633.999

0.314   352.492    873.695.43.527.230.574

0.317   890.792    289.926.146.306.884.876

0.319   336.558    663.876.80.645.1.268

0.322   900.183    396.364.972.104.698.731








Partitioner

Hadoop에서 제공되는 KeyFieldBasedPartitioner 클래스를 이용하여 mapper의 출력결과의 특정 키를 기준으로 파티셔닝을 할 수 있다.

우선 위에서 사용한 stream.map.output.field.separatorstream.num.map.output.key.fields를 지정해서 mapper의 출력결과에서 키를 지정하고,

map.output.key.field.separatormapred.text.key.partitioner.options(버전2는 mapreduce.partition.keypartitioner.options)를 이용하여 파티셔닝 기준으로 사용할 키를 지정하는 방식이다.

키는 -k{from},{to} 형태로 지정한다.

예를 들어 2,3번째 필드를 기준으로 파티셔닝 하고 싶다면 -k2,3 같은 식이다.

그리고 -partitioner 옵션으로 KeyFieldBasedPartitioner 클래스를 지정한다.


아래와 같은 설정으로 실행하면 mapper의 출력결과의 키를 다시 . 으로 나누고 그 중 1,2번째 필드를 기준으로 파티셔닝을 하게 된다.

hadoop jar hadoop-streaming-*.jar \

  -D stream.map.output.field.separator=. \

  -D stream.num.map.output.key.fields=4 \

  -D map.output.key.field.separator=. \

  -D mapred.text.key.partitioner.options=-k1,2 \

  -input INPUT_PATH \

  -output OUTPUT_PATH \

  -mapper /bin/cat \

  -reducer /bin/cat \

  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner


mapper의 출력결과가 아래와 같을 경우,

1.2.1.1

1.1.1.1

1.2.1.1

1.3.1.1

2.1.1.1

2.2.1.1

2.3.1.1

3.1.1.1

3.2.1.1

3.3.1.1


reducer를 3개로 지정하고 -k1,1 옵션으로 수행하면 part-00000~2 까지 3개의 결과 파일과 함께,

각 파일에는 아래와 같은 식으로 데이터가 나뉘어진 것을 확인할 수 있다.

(아래는 그냥 예제고, 실제로는 파일명과 데이터가 일치하지 않을 가능성이 크다)

// part-00000

1.1.1.1

1.2.1.1

1.3.1.1

// part-00001

2.1.1.1

2.2.1.1

2.3.1.1

// part-00002

3.1.1.1

3.2.1.1

3.3.1.1


-k2,2 옵션으로 수행하면 3개의 파일에 아래와 같은 식으로 데이터가 나뉘어진다.

// part-00000

1.1.1.1

2.1.1.1

3.1.1.1

// part-00001

1.2.1.1

2.2.1.1

3.2.1.1

// part-00002

1.3.1.1

2.3.1.1

3.3.1.1








Comparator

KeyFieldBasedComparator 클래스를 이용하여 정렬을 할 수 있다.

mapred.output.key.comparator.class(버전2는 mapreduce.job.output.key.comparator.class) 옵션에 KeyFieldBasedComparator를 지정하고,

(버전1은 org.apache.hadoop.mapred.lib.KeyFieldBasedComparator, 버전2는 org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator)

mapred.text.key.comparator.options(버전2는 mapreduce.partition.keycomparator.options) 옵션으로 정렬 기준을 정할 수 있다.

sort와 같은 느낌으로 사용할 수 있는데 -k{form},{to}{opts} 형태로 지정해야 한다.

예를 들어, 키1은 numeric reverse sort, 키2는 numeric sort, 키3은 numeric reverse sort 라면 아래와 같은 형태로 지정한다.

-D mapred.text.key.comparator.options="-k1,1nr -k2,2n -k3,3nr"


아래와 같은 설정으로 실행하면 . 을 기준으로 나뉜 키1을 numeric sort, 키2는 numeric reverse sort 할 수 있다.

결과 확인을 위해 mapred.reduce.tasks(버전2는 mapreduce.job.reduces) 옵션을 이용하여 reducer는 1로 세팅하는 게 좋다.

hadoop jar hadoop-streaming-*.jar \

  -D stream.map.output.field.separator=. \

  -D stream.num.map.output.key.fields=4 \

  -D map.output.key.field.separator=. \

  -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \

  -D mapred.text.key.comparator.options="-k1,1n -k2,2nr" \

  -D mapred.reduce.tasks=1 \

  -input INPUT_PATH \

  -output OUTPUT_PATH \

  -mapper /bin/cat \

  -reducer /bin/cat


mapper의 출력결과가 아래와 같을 경우,

31.21.5.4

21.31.4.5

31.22.4.5

21.32.5.4

31.23.5.4


정렬 이후는 아래와 같다.

21.32.5.4

21.31.4.5

31.23.5.4

31.22.4.5

31.21.5.4







Job Variables

job 실행 중에 mapper, reducer에서 job의 property 값을 알고 싶다면,

원래 설정값의 . 을 _ 로 바꿔서 참조하면 된다.

예를 들어 mapper, reducer가 PHP 스크립트일 경우,

mapred.job.local.dir 을 알고 싶다면 $_SERVER['mapred_job_local_dir'] 값을 참조하면 된다.








[참고]

https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/HadoopStreaming.html




Posted by bloodguy
,