[Hadoop] MapReduce 튜닝

Hadoop 2015. 5. 15. 17:51





테스트 조건은 아래와 같다.


Hadoop 버전은 1.X

MapReduce 방식은 Streaming

데이터 용량은 50G

node는 12개

replication factor는 3




Hadoop은 정말정말 설정값이 많다.

그래서 모든 설정을 커버하진 못했고, 각종 서적과 구글링을 통해 MapReduce에 용하다는 몇 가지 property 튜닝에 대한 테스트만 했다.

그런데 테스트를 하면 할수록 MapReduce 실행시 각 노드별 CPU/RAM 등의 자원상황과 맞물리는 Task의 실행이나,

Disk, Network I/O가 어떤식으로 분리되고 물리느냐에 따라 수행시간이 달라질 수 밖에 없기 때문에 테스트도 상당히 애매했다.

클러스터의 각 노드별 task별 강력한 상황통제가 힘들다는 뜻.


그리고 각 property의 설정들은 서로 간의 trade-off 같은 영향을 미치는 게 많아서 어느 하나의 값으로 한 번씩 돌려보는 것도 의미없는 짓이라 테스트가 애매했다.

테스트 하는 내도록 랜덤하게 연결된 100마리짜리 두더지 잡기를 하는 기분이었다.


그 중에 확인된 두더지들을 아래에 적는다.




BlockSize


수치를 바꿨을 때 확실히 반응이 오는 것은 단연 BlockSize였다.

다른 모든 설정에 비해 가장 드라마틱하면서도 확신을 줄 수 있는 성능변화를 보였다.


기본값인 64M를 쓰고 있었는데, 128이나 256으로 올리면 실행시간이 줄어들었다.

dfs.blocksize를 바꾸고 적용했으면 좋았겠으나, 제정신을 유지한 상태에서 그동안 쌓인 데이터의 blocksize를 전부 변환시킬 자신이 없어서,

mapred.min.split.size 를 조절하는 방식으로 테스트를 진행했다.


64M일 경우 input split의 수가 많아져서 map task의 수도 늘어나는데,

실제 map task의 수행시간은 10초 안에 끝나므로 과도한 map task 재실행으로 인한 퍼포먼스의 손실이 있는 것 같다.

128M, 256M 일 경우 수행시간은 비슷했고, 512M 이상으로 증가시키면 오히려 수행시간이 늘어났다.


여기 있는 거의 모든 property가 그렇지만 job에 넘어오는 데이터의 사이즈나 map/reduce task가 처리하는 방식에 따라 테스트 결과는 달라진다는 것을 염두에 두자.

하지만 BlockSize는 무조건 128~256M 정도가 갑인 듯.





Map/Reduce Task


그 다음으로 성능에 큰 영향을 미친 건 map/reduce task 수였다.

실제로는 성능에 가장 큰 영향을 미치는 최고로 중요한 property라 생각하지만,

BlockSize에 이어 2번째로 적은 이유는,

상황에 따른 적절한 계산이 많이 다르고,

이 값은 잘 설정하면 성능이 나아진다기 보다는 잘못 설정하면 성능이 화끈하게 나빠지는 종류의 property라 생각하기 때문이다.


아래는 task의 수에 관련된 property에 대한 설명이다.


mapred.tasktracker.map.tasks.maximum

  - 하나의 TaskTracker에서 동시에 실행할 수 있는 map task의 최대 수

  - 기본값은 2


mapred.tasktracker.reduce.tasks.maximum

  - 하나의 TaskTracker에서 동시에 실행할 수 있는 reduce task의 최대 수

  - 기본값은 2


mapred.reduce.tasks

  - job 당 실행할 전체 reduce task의 수

  - 기본값은 1


mapred.tasktracker.map/reduce.tasks.maximum 은 노드 하나에서 동시에 실행될 map/reduce task의 수다.

해당 노드의 CPU, RAM 등에 맞춰 노드마다 mapred-site.xml에서 따로 설정해야 한다.

주의할 점은 final을 적용시켜 job 실행시 넘어온 옵션에 의해 덮어쓰여지지 않도록 하는 것이다.

<property>
    <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>7</value>
    <final>true</final>
</property>


그리고 이 값들은 수정하고 나면 TaskTracker를 재시작해야 적용된다.


노드별 map/reduce task의 수의 적정값을 계산하는 단 하나의 공식같은 건 없다.

숱한 테스트 후에 내린 결론이다.


CPU 코어수 - 1 이라든가, 0.95 ~ 1.75 * (CPU 코어수 - 1) 같은 대략적인 공식은 있지만,

각 노드의 CPU/RAM 같은 가용자원 상황과 job에서 map/reduce가 수행하는 작업의 성격에 따라 달라질 수 밖에 없다.

(더구나 Streaming 이라면 TaskTracker의 Child에서 실행하는 내가 만든 프로세스까지 감안해야 한다..;)


예를 들어,


CPU 코어 수에 맞춰 task 수를 지정했는데 RAM이 엄청나게 작다면,

작업 중에 swap이 자주 일어나서 RAM쪽이 병목이 될 것이고,

(머리는 좋은데 책가방이 작아서 공부할 책을 계속 집에 가서 바꿔 가져와야 하는 상황)


RAM에 맞춰 task 수를 지정했는데 CPU 코어수가 너무 작다면,

ContextSwitching이 병목의 원인이 될 것이다.


물론 위에 든 예의 경우는 그냥 극단적인 상황에 대한 것이고,

JVM의 heap 사이즈 설정 등을 통해 어느정도 조절도 가능하다.


하지만 결국 이건 권장수치를 제시할 공식이 없다는 건 사실을 반증한다.


일반적으로 나오는 예제들처럼 일단,

mapred.tasktracker.map.tasks.maximum 을 (CPU 코어수 - 1) * 0.95 ~ 1.75 사이로 정하고,

mapred.tasktracker.reduce.tasks.maximummapred.reduce.tasks 값을 바꿔 가면서 테스트 해서 최적의 값을 찾는 수 밖에 없다.


mapred.reduce.tasks 는 job이 실행되는 동안 실행될 전체 reduce task의 수를 지정하는 property이다.

한 방에 끝나는 게 좋으므로 Reducer Capacity(전체 노드의 mapred.tasktracker.reduce.tasks.maximum의 합계)만큼 설정하는게 이상적이지만,

실패나 장애를 감안하여 Reducer Capacity * 0.99 로 설정하는 것이 권장된다.


내가 테스트 해 본 결과에 의하면 나의 경우 아래의 설정이 가장 나은 성능을 보였다.

(다시 한 번 강조하지만 내 클러스터 상황과 테스트 한 job의 특성에 따른 결과일 뿐이다. 게다가 Streaming...)


mapred.tasktracker.map.tasks.maximum = (CPU 코어수 - 1) * 1.25

mapred.tasktracker.reduce.tasks.maximum = mapred.tasktracker.map.tasks.maximum / 2

mapred.reduce.tasks = Reducer Capacity * 0.99


하지만 성능이 나아졌다고 해봤자 몇 초 정도...







Shuffle


MapReduce 튜닝에 관한 문서를 보면 가장 많은 부분에 중요한 느낌으로 등장하는 것이 Shuffle 단계에서의 튜닝이다.

그도 그럴것이 이론적으로 보자면,

Spill 단계에서 Disk I/O가 발생하고, 각 단계별 결과데이터가 Network를 통해서 와리가리 하므로,

병목계의 양대산맥이 모두 포진하고 있는 단계가 바로 Shuffle 인 것이다.


그런데 결론부터 이야기하자면 개인적으로 이 값을 튜닝해서 그닥 재미를 볼 수는 없었다.

Spill 단계에서 Disk I/O를 줄이기 위해 이런저런 값을 튜닝해봤고,

Map output을 Reducer로 전달하는 Network I/O를 높이기 위해 thread 수를 조절해봤지만 결과는 그저 그랬다.


아마 job에서 sort가 어떤 성격이고 어느 정도 비중을 차지하느냐에 따라 달라지는 것 같은데,

일부러 Spilled Records를 Map output Records 보다 높게 나오게 만드는 설정을 해서 테스트를 해봐도 그렇게 성능이 처지지도 않았다.

Map output의 결과가 엄청 더 커야하는지도 모르겠다만.


하지만 이론적으론 굉장히 중요한 부분이고 언젠가 필요한 상황이 올 것에 대비해서 Shuffle 단계를 튜닝할 수 있는 property에 대해 기록한다.


io.sort.mb

  - sort 단계에서 사용할 버퍼 메모리 사이즈. 단위는 MB

  - 기본값은 100

  - 이론적으로는 이 수치가 임계점에 도달하면 Spill이 일어나고 Disk I/O가 발생하여 성능이 떨어지므로 높이길 권장한다.


io.sort.record.percent

  - sort 단계에서 record의 포인터를 저장하기 위한 인덱스 공간의 사이즈. io.sort.mb 설정값 대비 백분율

  - 기본값은 0.05 (이게 권장사항)

  - 하지만 이 부분은 예전부터 존재하던 바보같은 부분으로 MR2에선 가변 사이즈가 도입되어 이 값이 무용지물이 됨.


io.sort.factor

  - 한 번에 병합할 stream의 수

  - 기본값은 10

  - 10은 너무 적은 듯 싶으니 좀 늘이는게 기분이 나아짐.

  - io.sort.mb / 10 혹은 그냥 100 으로 지정하는 게 대세인 듯.


io.sort.spill.percent

  - 버퍼나 레코드 버퍼에서 Disk로 Spill하는 임계치. 0.5 이하는 하지마라고 함.

  - 기본값은 0.80


tasktracker.http.threads

  - map output을 reducer에 전달하는 thread의 수

  - 기본값은 40

  - 개별 job마다 설정하는 것은 불가능.


mapred.reduce.parallel.copies

  - copy 단계에서 데이터를 병렬로 전송할 thread의 수

  - 기본값은 5

  - 내가 테스트 해 본 경우 의외로 이 값을 높이면 성능이 나아지는 느낌이었다. 15로 지정해 놓고 쓴다.


mapred.job.shuffle.input.buffer.percent

  - Shuffle 단계에서 map output 보관에 필요한 메모리를 전체 heap 사이즈의 비율로 지정

  - 기본값은 0.7


mapred.job.shuffle.merge.percent

  - Reducer가 Shuffle 결과를 받아 버퍼에 저장하다가 파일로 저장하게 되는 임계값 (백분율)

  - 기본값은 0.66


mapred.inmem.merge.threshold

  - Reducer가 Shuffle 결과를 받아 버퍼에 저장하다가 파일로 저장하게 되는 임계값 (파일 수)

  - 기본값은 1000


mapred.job.reduce.input.buffer.percent

  - reduce 단계에서 map output을 보관하기 위해 사용하는 메모리 크기를 최대 heap 사이즈의 백분율로 지정

  - 기본값은 0.0








Memory


mapred.child.java.opts property를 통해 TaskTracker에서 실행하는 Child 프로세스에 적용할 Java 옵션을 지정할 수 있다.

일반적으로는 최대 heap 사이즈를 지정하는데 사용한다.

기본값은 -Xmx200m 으로 좀 작은 감이 있다.

이 수치를 계산하는 방법은 대략 아래와 같다.


노드의 RAM이 8G이고 노드에는 DataNode, TaskTracker 프로세스가 각각 실행중이라면,

DataNode와 TaskTracker는 각각 -Xmx1000m 으로 1G씩 총 2G를 잡고 시작하므로 가용 RAM은 6G이다. (8 -2)

이제 이 6G를 지정된 map/reduce task 수로 나눈다.

만약 각각 7씩 지정되어 있다면 총 14개의 task이므로 6G / 14 = 0.4285G 가 된다.

OS나 다른 곳에서 사용할 RAM까지 생각해서 깔끔하게 task당 400M씩 잡으면 적정 수치라고 간주할 수 있다.

(Streaming 이라면 내가 작성한 프로세스까지 생각해야 하므로 상황이 달라짐..)

<property>
    <name>mapred.child.java.opts</name>
    <value>-Xmx400m</value>
</property>


번외로 이 property는 말 그대로 Java 옵션을 지정하는 것이므로 다른 옵션도 지정할 수 있다.

예를 들어 Garbage Collection 로그를 /tmp/TASK_ID.gc 라는 파일에 저장하고 싶다면 아래처럼 설정하면 된다.

@taskid@ 라고 지정하면 실제 TaskID로 치환된다.

<property>
    <name>mapred.child.java.opts</name>
    <value>-Xmx400m -verbose:gc -Xloggc:/tmp/@taskid@.gc</value>
</property>








기타


io.file.buffer.size

  - Read/Write에 적용할 버퍼 사이즈를 지정.

  - 기본값은 4096 (4k)

  - 4k는 너무 옛스러운 수치이므로 65536(64k) 혹은 131072(128k)로 지정하는 것을 권장. 

  - 근데 성능향상 여부는 솔직히 잘 모르겠더라...


mapred.job.reuse.jvm.num.tasks

  - 하나의 jvm에서 실행시킬 task의 수. (jvm 재사용)

  - 기본값은 1

  - 이 값을 -1 로 지정하면 제한없음으로 설정하게 된다.

  - 설명하고자 하면 구구절절 길어질 것 같은데 성능향상이 그닥인 것 같아 그냥 가볍게 퉁친다.


dfs.namenode.handler.count

  - namenode의 서버 thread 수

  - 기본값은 10

  - 이 수치는 늘이면 성능이 좋아지는 것 같은데 Network I/O 관련이라 좀 긴가민가 하다.

  - 테스트 결과 수치를 40으로 늘였을 때 수행시간이 줄어들길래 40으로 해놓고 쓴다.

  - namenode 서버의 자원이 넉넉하다면 그 이상 늘이는 것도 나쁘지 않을 것 같다. 64로 해놓고 쓴다는 문서가 많이 보였다.


dfs.datanode.handler.count

  - datanode의 서버 thread 수

  - 기본값은 10 (예전엔 3)

  - 이것도 수치를 10으로 했을 때 수행시간이 줄어들길래 10으로 맞춰놓고 쓴다.

  - 하지만 이론적으로 이건 RPC handler의 수를 의미하는데 실제 read/write는 DataTransferProtocol을 이용하지 RPC를 이용하지는 않기 때문에 별 의미가 없다는 의견도 있었다.







결론


각 property의 이론적 의미와 서로 간의 관계에 대해 머리 속에 확실히 집어 넣고,

클러스터/노드의 상황 및 데이터 사이즈/종류, map/reduce task에 따라 적절한 수치를 찾아 적용하는 수 밖에 없음.

테스트를 하려면 데이터 사이즈가 어느 정도 되어야 하는데,

그러면 수행시간이 오래 걸리므로 굉장히 빡친다.

진짜 치명적인 상황이 아니라면 적당히 타협할 것을 권장...;








Posted by bloodguy
,