[node.js] cluster worker와 동기식(sync) 통신하기 (synchronous communication with cluster workers)
node.js 2019. 6. 26. 14:18
node.js의 cluster 모듈을 이용해서 worker를 생성한 후 worker와 통신이 필요한 경우,
worker.send(), process.send()를 이용해 메시지를 보내고 process.on('message')를 이용해 넘어온 메시지를 처리한다.
일반적으론 이 방식으로 충분하지만 만약 MASTER에서 모든 WORKER에 broadcasting으로 메시지를 보내고 모든 WORKER로부터 동기식으로 한번에 결과값을 받고 싶은 경우엔 저런 방식이 곤란하므로 다른 방법이 필요하다.
아래는 이벤트와 Promise를 조합하여 WORKER에 메시지를 보내고 결과값을 동기식으로 받을 수 있도록 만든 모듈.
const cluster = require('cluster');
// worker 통신 클래스
class WorkerCommunicator
{
constructor(workerCount)
{
this.workerCount = workerCount;
this.createdTime = Date.now();
this.dataList = [];
}
setResponse(data)
{
this.dataList.push(data);
}
getResponseAll()
{
return this.dataList;
}
readyToReport()
{
return this.dataList.length >= this.workerCount;
}
isTimeout()
{
// TTL 10초
return Date.now() - this.createdTime > 10000;
}
}
module.exports = {
// WorkerCommunicator instance array
workerCommunicatorList: {},
isInited: false,
COMMAND_NAME: 'worker_communicator_command',
MSG: {
REQ: 'req_communicator_data',
RES: 'res_communicator_data'
},
// Request from MASTER to WORKER
req: async function(opts={}){
this._init();
const workerCount = opts.worker_count || Object.keys(cluster.workers).length;
const msgFromMaster = opts.msg || null;
const self = this;
return new Promise((resolved)=>{
const communicatorId = self._getCommunicatorId();
self.workerCommunicatorList[communicatorId] = new WorkerCommunicator(workerCount);
process.once(self._getCommunicatorEventId(communicatorId), (res)=>{
return resolved(res);
});
for (const id in cluster.workers) {
const reqMsg = {};
reqMsg[self.COMMAND_NAME] = self.MSG.REQ;
reqMsg.id = communicatorId;
reqMsg.msg_from_master = msgFromMaster;
cluster.workers[id].send(reqMsg);
}
});
},
// Response from WORKER to MASTER
setResponse: function(fnResponse){
if (typeof fnResponse !== 'function') return;
const self = this;
const listener = async(msg)=>{
if (!(self.COMMAND_NAME in msg) || msg[self.COMMAND_NAME] !== self.MSG.REQ) return;
const resMsg = await fnResponse(msg);
resMsg[self.COMMAND_NAME] = self.MSG.RES;
resMsg.id = msg.id;
process.send(resMsg);
};
const onMessageListeners = process.listeners('message');
let listenersExists = false;
for (let i=0; i<onMessageListeners.length; i++) {
if (listener === onMessageListener[i]) {
listenerExists = true;
break;
}
}
if (listenerExists === false) {
process.addEventListener('message', listener);
}
},
_init: function(){
if (this.isInited === true) return;
this.isInited = true;
const self = this;
const listener = (worker, message, handler) => {
if (!(self.COMMAND_NAME in message) || message[self.COMMAND_NAME] !== self.MSG.RES) return;
self._res(message);
};
const onMessageListeners = cluster.listeners('message');
let listenerExists = false;
for (let i=0; i<onMessageListeners.length; i++) {
if (listener === onMessageListeners[i]) {
listenerExists = true;
break;
}
}
if (listenerExists === false) {
cluster.addListener('message', listener);
}
setInterval(()=>{
for (const communicatorId in self.workerCommunicatorList) {
if (self.workerCommunicatorList[communicatorId].isTimeout()) {
process.emit(self._getCommunicatorEventId(communicatorId), self.workerCommunicatorList[communicatorId].getResponseAll());
delete self.workerCommunicatorList[communicatorId];
}
}
}, 1000);
},
_res: function(msg){
const communicatorId = msg.id;
if (!(communicatorId in this.workerCommunicatorList)) return;
delete msg[this.COMMAND_NAME];
delete msg.id;
this.workerCommunicatorList[communicatorId].setResponse(msg);
if (this.workerCommunicatorList[communicatorId].readyToReport()) {
process.emit(this._getCommunicatorEventId(communicatorId), this.workerCommunicatorList[communicatorId].getResponseAll());
delete this.workerCommunicatorList[communicatorId];
}
},
_getCommunicatorId: function(){
// 그냥 unique id 아무거나
const x = parseInt(Math.random().toString().substr(2), 10).toString(36).substr(0,10).padStart(10, 'x');
const y = Date.now().toString(36).substr(0, 10).padStart(10, 'y');
return x+y;
},
_getCommunicatorEventId: function(communicatorId){
return `communicator_event_${communicatorId}`;
}
};
위 모듈을 worker_communicator.js 라는 파일에 저장했다고 가정하고 아래처럼 사용하면 됨.
const cluster = require('cluster');
const workerCommunicator = require('./worker_communicator.js');
// MASTER - 1초에 한번씩 'hello'와 'what time is it now?'를 번갈아 가며 WORKER 전체에 보내고 전체의 응답이 모일 경우 출력
if (cluster.isMaster) {
const cpuCount = require('os').cpus().length;
for (let i=0; i<cpuCount; i++) {
cluster.fork();
}
let i = 0;
setInterval(async()=>{
let msg = {};
if (++i % 2 === 0) msg.cmd = 'hello';
else msg.cmd = 'what time is it now?';
console.log(`[MASTER] cmd = ${msg.cmd}`);
const r = await workerCommunicator.req({msg: msg});
console.log('[WORKERS ANSWER]');
console.log(r);
console.log("\n");
}, 1000);
}
// WORKER - 'hello'일 경우 'hi'를, 'what time is it now?'일 경우 현재시각을 반환
else {
workerCommunicator.setResponse(async(msg)=>{
if (msg.msg_from_master === 'hello') {
return {
pid: process.pid,
greeting: 'hi'
};
}
else {
return {
pid: process.pid,
date: new Date()
};
}
});
}
위 코드를 실행시키면 1초에 한번씩 아래와 같은 결과를 얻을 수 있음.
[MASTER] cmd = what time is it now?
[WORKERS ANSWER]
[ { pid: 1831, date: '2019-06-26T04:28:54.559Z' },
{ pid: 1833, date: '2019-06-26T04:28:54.560Z' },
{ pid: 1834, date: '2019-06-26T04:28:54.561Z' },
{ pid: 1832, date: '2019-06-26T04:28:54.562Z' } ]
[MASTER] cmd = hello
[WORKERS ANSWER]
[ { pid: 1831, greeting: 'hi' },
{ pid: 1833, greeting: 'hi' },
{ pid: 1832, greeting: 'hi' },
{ pid: 1834, greeting: 'hi' } ]
[MASTER] cmd = what time is it now?
[WORKERS ANSWER]
[ { pid: 1831, date: '2019-06-26T04:28:56.554Z' },
{ pid: 1833, date: '2019-06-26T04:28:56.554Z' },
{ pid: 1832, date: '2019-06-26T04:28:56.554Z' },
{ pid: 1834, date: '2019-06-26T04:28:56.554Z' } ]
[MASTER] cmd = hello
[WORKERS ANSWER]
[ { pid: 1832, greeting: 'hi' },
{ pid: 1833, greeting: 'hi' },
{ pid: 1834, greeting: 'hi' },
{ pid: 1831, greeting: 'hi' } ]
'node.js' 카테고리의 다른 글
[node.js] load average throttle (0) | 2019.09.26 |
---|---|
[node.js] 동시에 실행되는 수를 조절하며 비동기 함수 전체 실행 (Promise.all with limited concurrency) (0) | 2019.09.26 |
[node.js] self-signed certificate (0) | 2017.12.11 |
[node.js] FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory 에러 원인, 해결방법 (2) | 2017.05.19 |
[node.js] 라즈베리파이에서 node.js로 mp3 재생하기 (0) | 2016.01.15 |