collection, database 혹은 전체 클러스터에 스트림을 생성해서 변경사항을 실시간으로 받아오는 방법.
과거 oplog tailing 같은 것보다 훨씬 안정적이고 편리한 방법.
예제는 mongosh 기준으로 작성하는데 당연히 각 언어별 드라이버로도 가능함.
// changeStream cursor 생성
[direct: mongos] db> c = db.col.watch()
ChageStreamCursor on col
// 커서 iteration은 이렇게. hasNext()는 blocking.
while (!c.isClosed()) {
if (c.hasNext()) {
printjson(c.next());
}
}
// 이 상태에서 mongosh 하나 더 열어서 아래처럼 뭔가 insert 하면
[direct: mongos] db> db.col.insertOne({user_id: 'xxx', age: 99})
// 이렇게 변경사항(ChangeEvent)이 출력되는 걸 확인할 수 있음
{
_id: {
_data: '8264A4F20E000000012B022C0100296E5A10044DA5B3C1425D4E71B857432EB3E0C7F046645F6964006464A4F20E052A0428342594750004'
},
operationType: 'insert',
clusterTime: Timestamp({ t: 1688531470, i: 1 }),
wallTime: ISODate("2023-07-05T04:31:10.145Z"),
fullDocument: {
_id: ObjectId("64a4f20e052a042834259475"),
user_id: 'xxx',
age: 11
},
ns: {
db: 'db',
coll: 'col'
},
documentKey: {
_id: ObjectId("64a4f20e052a042834259475")
}
}
// 만약 non-blocking으로 하고 싶으면 hasNext() 대신 tryNext()
while (!c.isClosed()) {
let changeData = c.tryNext();
while (changeData !== null) {
printjson(changeData);
changeData = c.tryNext();
}
}
// update 테스트
[direct: mongos] db> db.col.updateOne({user_id:'xxx'}, {$set:{age: 100}})
// 그럼 이렇게 변경사항이 출력되는 걸 확인할 수 있음
{
_id: {
_data: '8264A4F3F8000000022B022C0100296E5A10044DA5B3C1425D4E71B857432EB3E0C7F046645F6964006464A4F14930F9C2245148A3E30004'
},
operationType: 'update',
clusterTime: Timestamp({ t: 1688531960, i: 2 }),
wallTime: ISODate("2023-07-05T04:39:20.465Z"),
ns: {
db: 'tdb',
coll: 'col'
},
documentKey: {
_id: ObjectId("64a4f14930f9c2245148a3e3")
},
updateDescription: {
updatedFields: {
age: 100
},
removedFields: [],
truncatedArrays: []
}
}
changeStream이 aggregation framework를 사용하기 때문에 간단한 필터링을 건다던가 하는 것들이 가능함.
// aggregation pipeline 세팅
[direct: mongos] db> filter = [];
[]
[direct: mongos] db> filter.push({$match:{
// insert 타입이면서 user_id가 yyy인 것만
operationType: 'insert',
'fullDocument.user_id': 'yyy'
}});
1
// changeStream cursor 생성 및 iteration
[direct: mongos] db> c = db.col.watch(filter);
ChangeStreamCursor on col
while (!c.isClosed()) {
let changeData = c.tryNext();
while (changeData !== null) {
printjson(changeData);
changeData = c.tryNext();
}
}
// 그리고 이것저것 섞어서 insert 몇개 해봄
[direct: mongos] db> c.col.insertMany([
{user_id:'xxx', age:1},
{user_id:'yyy', age:2},
{user_id:'zzz', age:3},
{user_id:'yyy', age:4}
])
// 그럼 $match에 지정한대로 user_id가 yyy인것만 출력됨
{
_id: {
_data: '8264A4F9A2000000022B022C0100296E5A10044DA5B3C1425D4E71B857432EB3E0C7F046645F6964006464A4F9A2052A0428342594810004'
},
operationType: 'insert',
clusterTime: Timestamp({ t: 1688533410, i: 2 }),
wallTime: ISODate("2023-07-05T05:03:30.235Z"),
fullDocument: {
_id: ObjectId("64a4f9a2052a042834259481"),
user_id: 'yyy',
age: 2
},
ns: {
db: 'db',
coll: 'col'
},
documentKey: {
_id: ObjectId("64a4f9a2052a042834259481")
}
}
{
_id: {
_data: '8264A4F9A2000000042B022C0100296E5A10044DA5B3C1425D4E71B857432EB3E0C7F046645F6964006464A4F9A2052A0428342594830004'
},
operationType: 'insert',
clusterTime: Timestamp({ t: 1688533410, i: 4 }),
wallTime: ISODate("2023-07-05T05:03:30.235Z"),
fullDocument: {
_id: ObjectId("64a4f9a2052a042834259483"),
user_id: 'yyy',
age: 4
},
ns: {
db: 'db',
coll: 'col'
},
documentKey: {
_id: ObjectId("64a4f9a2052a042834259483")
}
}
중단된 부분부터 재개할 수도 있음.
changeStream cursor 반환값(ChangeEvent)의 _id를 resume token으로 사용하는 방법.
// 일단 스트림을 만들고
let c = db.col.watch();
let firstChange;
// 변경사항 하나만 받아서 firstChange에 저장
while (!c.isClose()) {
if (c.hasNext()) {
firstChange = c.next();
break;
}
}
// 이 상태에서 뭔가 하나 insert()
db.col.insertOne({user_id:'xxx', age:10})
// 그럼 앞서 지정한 iteration이 조용히 종료되는 것을 확인 가능.
// 그리고 firstChange를 찍어보면 변경정보가 들어있는 것을 확인 가능.
// 이 상태에서 몇 개 더 insert()
db.col.insertOne({user_id:'xxx', age:11})
db.col.insertOne({user_id:'xxx', age:12})
db.col.insertOne({user_id:'xxx', age:13})
// 이 상태에서 아무것도 지정하지 않은 상태로 changeStream을 시작하면 위에 insert한 3개의 변경사항은 잡아낼 수 없음.
// 하지만 resumeToken을 지정하는 방식으로 시작하면
let resumeToken = firstChange._id;
let resumeCursor = db.col.watch([], {
resumeAfter: resumeToken
});
// 이 루프를 실행하는 순간 이전에 insert한 3개의 변경정보가 출력됨을 확인 가능
while (!resumeCursor.isClosed()) {
let changeData = resumeCursor.tryNext();
while (changeData !== null) {
printjson(changeData);
changeData = resumeCursor.tryNext();
}
}
기타 ChangeStream을 사용하기 위해 필요한 정보나 주의사항, 매개변수를 통한 자잘한 기능들이 있으므로 아래 페이지 참고.
https://www.mongodb.com/docs/manual/changeStreams/
'DataBase' 카테고리의 다른 글
[MongoDB] collection에 해당하는 chunk 리스트 가져오기 (get chunks list from config.chunks) (0) | 2023.07.13 |
---|---|
[MongoDB] Time Series Collection (0) | 2023.07.11 |
[MongoDB] shard key 변경 (refineCollectionShardKey, reshard) (0) | 2023.06.30 |
[MongoDB] 트랜잭션 (Transaction) (0) | 2023.06.01 |
[MongoDB] 인덱스 사용량 확인 (check index usage) (0) | 2023.05.30 |