[MongoDB] ChangeStream

DataBase 2023. 7. 5. 15:02

 

collection, database 혹은 전체 클러스터에 스트림을 생성해서 변경사항을 실시간으로 받아오는 방법.
과거 oplog tailing 같은 것보다 훨씬 안정적이고 편리한 방법.

예제는 mongosh 기준으로 작성하는데 당연히 각 언어별 드라이버로도 가능함.

 

// changeStream cursor 생성
[direct: mongos] db> c = db.col.watch()
ChageStreamCursor on col
 
// 커서 iteration은 이렇게. hasNext()는 blocking.
while (!c.isClosed()) {
    if (c.hasNext()) {
        printjson(c.next());
    }
}
 
// 이 상태에서 mongosh 하나 더 열어서 아래처럼 뭔가 insert 하면
[direct: mongos] db> db.col.insertOne({user_id: 'xxx', age: 99})
 
// 이렇게 변경사항(ChangeEvent)이 출력되는 걸 확인할 수 있음
{
  _id: {
    _data: '8264A4F20E000000012B022C0100296E5A10044DA5B3C1425D4E71B857432EB3E0C7F046645F6964006464A4F20E052A0428342594750004'
  },
  operationType: 'insert',
  clusterTime: Timestamp({ t: 1688531470, i: 1 }),
  wallTime: ISODate("2023-07-05T04:31:10.145Z"),
  fullDocument: {
    _id: ObjectId("64a4f20e052a042834259475"),
    user_id: 'xxx',
    age: 11
  },
  ns: {
    db: 'db',
    coll: 'col'
  },
  documentKey: {
    _id: ObjectId("64a4f20e052a042834259475")
  }
}
 
 
// 만약 non-blocking으로 하고 싶으면 hasNext() 대신 tryNext()
while (!c.isClosed()) {
    let changeData = c.tryNext();
    while (changeData !== null) {
        printjson(changeData);
        changeData = c.tryNext();
    }
}
 
 
// update 테스트
[direct: mongos] db> db.col.updateOne({user_id:'xxx'}, {$set:{age: 100}})
 
 
 
// 그럼 이렇게 변경사항이 출력되는 걸 확인할 수 있음
{
  _id: {
    _data: '8264A4F3F8000000022B022C0100296E5A10044DA5B3C1425D4E71B857432EB3E0C7F046645F6964006464A4F14930F9C2245148A3E30004'
  },
  operationType: 'update',
  clusterTime: Timestamp({ t: 1688531960, i: 2 }),
  wallTime: ISODate("2023-07-05T04:39:20.465Z"),
  ns: {
    db: 'tdb',
    coll: 'col'
  },
  documentKey: {
    _id: ObjectId("64a4f14930f9c2245148a3e3")
  },
  updateDescription: {
    updatedFields: {
      age: 100
    },
    removedFields: [],
    truncatedArrays: []
  }
}

 

 

 

 

changeStream이 aggregation framework를 사용하기 때문에 간단한 필터링을 건다던가 하는 것들이 가능함.

// aggregation pipeline 세팅
[direct: mongos] db> filter = [];
[]
[direct: mongos] db> filter.push({$match:{
    // insert 타입이면서 user_id가 yyy인 것만
    operationType: 'insert',
    'fullDocument.user_id': 'yyy'
}});
1
// changeStream cursor 생성 및 iteration
[direct: mongos] db> c = db.col.watch(filter);
ChangeStreamCursor on col
 
while (!c.isClosed()) {
    let changeData = c.tryNext();
    while (changeData !== null) {
        printjson(changeData);
        changeData = c.tryNext();
    }
}
 
 
 
// 그리고 이것저것 섞어서 insert 몇개 해봄
[direct: mongos] db> c.col.insertMany([
{user_id:'xxx', age:1},
{user_id:'yyy', age:2},
{user_id:'zzz', age:3},
{user_id:'yyy', age:4}
])
 
 
 
// 그럼 $match에 지정한대로 user_id가 yyy인것만 출력됨
{
  _id: {
    _data: '8264A4F9A2000000022B022C0100296E5A10044DA5B3C1425D4E71B857432EB3E0C7F046645F6964006464A4F9A2052A0428342594810004'
  },
  operationType: 'insert',
  clusterTime: Timestamp({ t: 1688533410, i: 2 }),
  wallTime: ISODate("2023-07-05T05:03:30.235Z"),
  fullDocument: {
    _id: ObjectId("64a4f9a2052a042834259481"),
    user_id: 'yyy',
    age: 2
  },
  ns: {
    db: 'db',
    coll: 'col'
  },
  documentKey: {
    _id: ObjectId("64a4f9a2052a042834259481")
  }
}
{
  _id: {
    _data: '8264A4F9A2000000042B022C0100296E5A10044DA5B3C1425D4E71B857432EB3E0C7F046645F6964006464A4F9A2052A0428342594830004'
  },
  operationType: 'insert',
  clusterTime: Timestamp({ t: 1688533410, i: 4 }),
  wallTime: ISODate("2023-07-05T05:03:30.235Z"),
  fullDocument: {
    _id: ObjectId("64a4f9a2052a042834259483"),
    user_id: 'yyy',
    age: 4
  },
  ns: {
    db: 'db',
    coll: 'col'
  },
  documentKey: {
    _id: ObjectId("64a4f9a2052a042834259483")
  }
}

 

 

 

 

 

중단된 부분부터 재개할 수도 있음.

changeStream cursor 반환값(ChangeEvent)의 _id를 resume token으로 사용하는 방법.

// 일단 스트림을 만들고
let c = db.col.watch();
let firstChange;
 
// 변경사항 하나만 받아서 firstChange에 저장
while (!c.isClose()) {
    if (c.hasNext()) {
        firstChange = c.next();
        break;
    }
}
 
 
 
// 이 상태에서 뭔가 하나 insert()
db.col.insertOne({user_id:'xxx', age:10})
 
 
// 그럼 앞서 지정한 iteration이 조용히 종료되는 것을 확인 가능.
// 그리고 firstChange를 찍어보면 변경정보가 들어있는 것을 확인 가능.
 
// 이 상태에서 몇 개 더 insert()
db.col.insertOne({user_id:'xxx', age:11})
db.col.insertOne({user_id:'xxx', age:12})
db.col.insertOne({user_id:'xxx', age:13})
 
 
// 이 상태에서 아무것도 지정하지 않은 상태로 changeStream을 시작하면 위에 insert한 3개의 변경사항은 잡아낼 수 없음.
// 하지만 resumeToken을 지정하는 방식으로 시작하면
let resumeToken = firstChange._id;
let resumeCursor = db.col.watch([], {
    resumeAfter: resumeToken
});
 
// 이 루프를 실행하는 순간 이전에 insert한 3개의 변경정보가 출력됨을 확인 가능
while (!resumeCursor.isClosed()) {
    let changeData = resumeCursor.tryNext();
    while (changeData !== null) {
        printjson(changeData);
        changeData = resumeCursor.tryNext();
    }
}

 

 

 

 

기타 ChangeStream을 사용하기 위해 필요한 정보나 주의사항, 매개변수를 통한 자잘한 기능들이 있으므로 아래 페이지 참고.

https://www.mongodb.com/docs/manual/changeStreams/

 

 

Posted by bloodguy
,