1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
| const Redis = require('ioredis');
class OrderProcessor {
constructor(group, consumer) {
this.redis = new Redis({ host: 'localhost', port: 6379 });
this.stream = 'orders';
this.group = group;
this.consumer = consumer;
this.running = true;
}
async initialize() {
// 建立消費者群組
try {
await this.redis.xgroup('CREATE', this.stream, this.group, '0', 'MKSTREAM');
} catch (err) {
if (!err.message.includes('BUSYGROUP')) {
throw err;
}
}
}
async processMessage(messageId, data) {
try {
console.log(`Processing order ${messageId}:`, data);
// 模擬處理邏輯
await new Promise(resolve => setTimeout(resolve, 100));
return true;
} catch (err) {
console.error(`Error processing ${messageId}:`, err);
return false;
}
}
parseMessage(fields) {
const data = {};
for (let i = 0; i < fields.length; i += 2) {
data[fields[i]] = fields[i + 1];
}
return data;
}
async run() {
console.log(`Consumer ${this.consumer} started`);
while (this.running) {
try {
const result = await this.redis.xreadgroup(
'GROUP', this.group, this.consumer,
'COUNT', '1',
'BLOCK', '5000',
'STREAMS', this.stream, '>'
);
if (!result) continue;
for (const [streamName, messages] of result) {
for (const [messageId, fields] of messages) {
const data = this.parseMessage(fields);
if (await this.processMessage(messageId, data)) {
await this.redis.xack(this.stream, this.group, messageId);
console.log(`Acknowledged: ${messageId}`);
} else {
console.log(`Failed to process: ${messageId}`);
}
}
}
} catch (err) {
if (err.message.includes('NOGROUP')) {
await this.initialize();
} else {
console.error('Error:', err);
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
}
}
async claimPending(minIdleTime = 60000) {
const result = await this.redis.xautoclaim(
this.stream,
this.group,
this.consumer,
minIdleTime,
'0-0',
'COUNT', '10'
);
return result;
}
stop() {
this.running = false;
}
async close() {
await this.redis.quit();
}
}
// 使用範例
async function main() {
const processor = new OrderProcessor('order-processors', 'consumer-1');
process.on('SIGINT', async () => {
console.log('\nShutting down...');
processor.stop();
await processor.close();
process.exit(0);
});
await processor.initialize();
await processor.run();
}
main().catch(console.error);
|