Ubuntu 22.04 Redis Streams 訊息佇列

Ubuntu 22.04 Redis Streams Message Queue Implementation

Redis Streams 是 Redis 5.0 引入的強大資料結構,專為訊息佇列和事件串流設計。本文將詳細介紹如何在 Ubuntu 22.04 上建置和使用 Redis Streams,涵蓋從基礎概念到進階應用的完整指南。

Redis Streams 概念介紹

Redis Streams 是一種僅附加(append-only)的資料結構,類似於日誌檔案的概念。每個 Stream 由多個 Entry 組成,每個 Entry 都有一個唯一的 ID 和一組欄位-值對。

Stream 的核心特性

  • 持久化儲存:訊息會保留在 Stream 中,直到被明確刪除
  • 唯一訊息 ID:每則訊息都有時間戳記加序號組成的唯一識別碼
  • 消費者群組:支援多個消費者協同處理訊息
  • 訊息確認機制:確保訊息被正確處理

安裝 Redis

首先在 Ubuntu 22.04 上安裝 Redis:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 更新套件列表
sudo apt update

# 安裝 Redis Server
sudo apt install redis-server -y

# 啟動並設定開機自動啟動
sudo systemctl start redis-server
sudo systemctl enable redis-server

# 確認 Redis 版本(需 5.0 以上支援 Streams)
redis-cli INFO server | grep redis_version

Stream 訊息 ID 結構

1
<millisecondsTime>-<sequenceNumber>

例如:1690710000000-0 表示時間戳記 1690710000000 毫秒的第一則訊息。

1
2
# 查看 Stream 資訊
redis-cli XINFO STREAM mystream

與 Pub/Sub、Lists 比較

功能比較表

特性StreamsPub/SubLists
訊息持久化
消費者群組
訊息確認
歷史訊息讀取
訊息重複消費
阻塞讀取

Pub/Sub 的限制

1
2
3
4
5
6
# Pub/Sub 範例 - 訊息不會持久化
# 訂閱者
redis-cli SUBSCRIBE notifications

# 發布者(另一個終端)
redis-cli PUBLISH notifications "Hello World"

Pub/Sub 的問題:

  • 訊息發送時若無訂閱者,訊息將遺失
  • 無法重播歷史訊息
  • 不支援消費者群組的負載平衡

Lists 的限制

1
2
3
# Lists 作為簡單佇列
redis-cli LPUSH queue "task1"
redis-cli RPOP queue

Lists 的問題:

  • 訊息一旦被取出就從 List 中移除
  • 不支援多個消費者處理同一個佇列
  • 無法追蹤訊息處理狀態

Streams 的優勢

1
2
3
4
5
6
# Streams 保留訊息歷史
redis-cli XADD events * action "login" user "alice"
redis-cli XADD events * action "purchase" user "bob"

# 可從任意位置讀取
redis-cli XRANGE events - +

XADD、XREAD 基本操作

XADD - 新增訊息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 基本語法:XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold] *|id field value [field value ...]

# 自動生成 ID(推薦)
redis-cli XADD orders * product "iPhone" quantity 1 price 999

# 指定 ID(較少使用)
redis-cli XADD orders 1690710000000-0 product "MacBook" quantity 1 price 1999

# 限制 Stream 長度(精確)
redis-cli XADD orders MAXLEN 1000 * product "iPad" quantity 2 price 599

# 限制 Stream 長度(約略,效能較佳)
redis-cli XADD orders MAXLEN ~ 1000 * product "AirPods" quantity 1 price 199

XREAD - 讀取訊息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 從頭讀取
redis-cli XREAD STREAMS orders 0

# 只讀取新訊息
redis-cli XREAD STREAMS orders $

# 阻塞讀取(等待新訊息,最多 5 秒)
redis-cli XREAD BLOCK 5000 STREAMS orders $

# 讀取多個 Stream
redis-cli XREAD STREAMS orders payments 0 0

# 限制返回數量
redis-cli XREAD COUNT 10 STREAMS orders 0

XRANGE 與 XREVRANGE - 範圍查詢

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 讀取所有訊息
redis-cli XRANGE orders - +

# 讀取特定時間範圍的訊息
redis-cli XRANGE orders 1690710000000 1690720000000

# 限制返回數量
redis-cli XRANGE orders - + COUNT 5

# 反向讀取(最新的先)
redis-cli XREVRANGE orders + - COUNT 5

XLEN 與 XINFO - 資訊查詢

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 取得 Stream 長度
redis-cli XLEN orders

# 取得 Stream 詳細資訊
redis-cli XINFO STREAM orders

# 取得消費者群組資訊
redis-cli XINFO GROUPS orders

# 取得群組中的消費者資訊
redis-cli XINFO CONSUMERS orders mygroup

Consumer Groups 消費者群組

Consumer Groups 是 Redis Streams 最強大的功能之一,允許多個消費者協同處理訊息,實現負載平衡。

建立消費者群組

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 語法:XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries-read]

# 從 Stream 開頭開始消費(處理所有歷史訊息)
redis-cli XGROUP CREATE orders order-processors 0

# 只處理新訊息
redis-cli XGROUP CREATE orders new-order-processors $ MKSTREAM

# 如果群組已存在會報錯,可用 XGROUP DESTROY 先刪除
redis-cli XGROUP DESTROY orders order-processors

XREADGROUP - 群組讀取

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 語法:XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]

# 消費者 consumer-1 讀取訊息
redis-cli XREADGROUP GROUP order-processors consumer-1 COUNT 1 STREAMS orders >

# 消費者 consumer-2 讀取訊息
redis-cli XREADGROUP GROUP order-processors consumer-2 COUNT 1 STREAMS orders >

# 阻塞讀取
redis-cli XREADGROUP GROUP order-processors consumer-1 BLOCK 5000 STREAMS orders >

# > 符號表示只讀取尚未被此消費者群組讀取的訊息
# 使用具體 ID 可重新讀取待處理訊息
redis-cli XREADGROUP GROUP order-processors consumer-1 STREAMS orders 0

消費者群組運作原理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Stream: orders
    |
    v
+-------------------+
| Entry: 1-0        |-----> Consumer Group: order-processors
| Entry: 1-1        |       |
| Entry: 1-2        |       +-- consumer-1 (處理 1-0)
| Entry: 1-3        |       +-- consumer-2 (處理 1-1)
| Entry: 1-4        |       +-- consumer-3 (處理 1-2)
+-------------------+

每則訊息只會被群組中的一個消費者處理,實現自動負載平衡。

訊息確認與待處理列表

XACK - 確認訊息

當消費者成功處理訊息後,必須使用 XACK 確認:

1
2
3
4
5
6
7
# 語法:XACK key group id [id ...]

# 確認單則訊息
redis-cli XACK orders order-processors 1690710000000-0

# 確認多則訊息
redis-cli XACK orders order-processors 1690710000000-0 1690710000001-0 1690710000002-0

XPENDING - 查看待處理訊息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 查看待處理訊息摘要
redis-cli XPENDING orders order-processors

# 輸出範例:
# 1) (integer) 3                    # 待處理訊息數
# 2) "1690710000000-0"              # 最小 ID
# 3) "1690710000002-0"              # 最大 ID
# 4) 1) 1) "consumer-1"             # 各消費者待處理數
#       2) "2"
#    2) 1) "consumer-2"
#       2) "1"

# 查看待處理訊息詳情
redis-cli XPENDING orders order-processors - + 10

# 查看特定消費者的待處理訊息
redis-cli XPENDING orders order-processors - + 10 consumer-1

# 查看閒置超過 60 秒的訊息
redis-cli XPENDING orders order-processors IDLE 60000 - + 10

XCLAIM - 轉移訊息所有權

當消費者故障時,可將其待處理訊息轉移給其他消費者:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 語法:XCLAIM key group consumer min-idle-time id [id ...]

# 將閒置超過 60 秒的訊息轉移給 consumer-2
redis-cli XCLAIM orders order-processors consumer-2 60000 1690710000000-0

# 帶選項的 XCLAIM
redis-cli XCLAIM orders order-processors consumer-2 60000 1690710000000-0 RETRYCOUNT 3

# 使用 JUSTID 只返回 ID(效能較佳)
redis-cli XCLAIM orders order-processors consumer-2 60000 1690710000000-0 JUSTID

XAUTOCLAIM - 自動認領訊息

Redis 6.2+ 提供更方便的自動認領功能:

1
2
3
4
# 語法:XAUTOCLAIM key group consumer min-idle-time start [COUNT count]

# 自動認領閒置超過 60 秒的訊息
redis-cli XAUTOCLAIM orders order-processors consumer-2 60000 0-0 COUNT 10

訊息重試與死信處理

重試計數器

1
2
3
4
5
6
7
8
# 查看訊息的重試次數
redis-cli XPENDING orders order-processors - + 10

# 輸出包含 delivery count(第四個欄位)
# 1) 1) "1690710000000-0"    # 訊息 ID
#    2) "consumer-1"         # 消費者
#    3) (integer) 120000     # 閒置時間(毫秒)
#    4) (integer) 3          # 已嘗試次數

實作死信佇列

當訊息重試次數過多,應轉移到死信佇列(Dead Letter Queue):

1
2
3
4
5
6
7
8
# 檢查重試次數超過閾值的訊息
redis-cli XPENDING orders order-processors - + 100

# 將失敗訊息移至死信佇列
redis-cli XADD orders:dlq * original_id "1690710000000-0" error "Processing failed after 5 retries"

# 從原佇列確認移除
redis-cli XACK orders order-processors 1690710000000-0

完整的重試邏輯(Shell 腳本示範)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/bin/bash

STREAM="orders"
GROUP="order-processors"
DLQ="${STREAM}:dlq"
MAX_RETRIES=5
CLAIM_IDLE_TIME=60000  # 60 秒

# 認領閒置訊息
claim_idle_messages() {
    redis-cli XAUTOCLAIM $STREAM $GROUP recovery-consumer $CLAIM_IDLE_TIME 0-0 COUNT 10
}

# 檢查並處理死信
process_dead_letters() {
    # 取得待處理訊息詳情
    pending=$(redis-cli XPENDING $STREAM $GROUP - + 100)

    # 解析並處理重試次數過多的訊息
    echo "$pending" | while read -r line; do
        id=$(echo "$line" | awk '{print $1}')
        retries=$(echo "$line" | awk '{print $4}')

        if [ "$retries" -gt "$MAX_RETRIES" ]; then
            # 讀取原始訊息
            msg=$(redis-cli XRANGE $STREAM $id $id)

            # 移至死信佇列
            redis-cli XADD $DLQ '*' original_id "$id" original_data "$msg" retries "$retries"

            # 確認原訊息
            redis-cli XACK $STREAM $GROUP $id

            echo "Moved $id to DLQ after $retries retries"
        fi
    done
}

Python 與 Node.js 整合

Python 整合

安裝 redis-py:

1
pip install redis

基本操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import redis
import time
from datetime import datetime

# 連接 Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# 新增訊息
def add_order(product: str, quantity: int, price: float) -> str:
    """新增訂單到 Stream"""
    message_id = r.xadd('orders', {
        'product': product,
        'quantity': str(quantity),
        'price': str(price),
        'timestamp': datetime.now().isoformat()
    })
    print(f"Added order: {message_id}")
    return message_id

# 讀取訊息
def read_orders(count: int = 10) -> list:
    """讀取訂單"""
    messages = r.xrange('orders', '-', '+', count=count)
    return messages

# 使用範例
add_order('iPhone 15', 1, 999.99)
add_order('MacBook Pro', 1, 2499.99)
orders = read_orders()
for order_id, data in orders:
    print(f"Order {order_id}: {data}")

Consumer Group 處理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import redis
import signal
import sys

class OrderProcessor:
    def __init__(self, group: str, consumer: str):
        self.r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
        self.stream = 'orders'
        self.group = group
        self.consumer = consumer
        self.running = True

        # 建立消費者群組(忽略已存在的錯誤)
        try:
            self.r.xgroup_create(self.stream, self.group, id='0', mkstream=True)
        except redis.ResponseError as e:
            if 'BUSYGROUP' not in str(e):
                raise

    def process_message(self, message_id: str, data: dict) -> bool:
        """處理訊息的業務邏輯"""
        try:
            print(f"Processing order {message_id}: {data}")
            # 模擬處理邏輯
            time.sleep(0.1)
            return True
        except Exception as e:
            print(f"Error processing {message_id}: {e}")
            return False

    def run(self):
        """主要消費迴圈"""
        print(f"Consumer {self.consumer} started")

        while self.running:
            try:
                # 阻塞讀取新訊息
                messages = self.r.xreadgroup(
                    self.group,
                    self.consumer,
                    {self.stream: '>'},
                    count=1,
                    block=5000
                )

                if not messages:
                    continue

                for stream_name, stream_messages in messages:
                    for message_id, data in stream_messages:
                        if self.process_message(message_id, data):
                            # 處理成功,確認訊息
                            self.r.xack(self.stream, self.group, message_id)
                            print(f"Acknowledged: {message_id}")
                        else:
                            print(f"Failed to process: {message_id}")

            except redis.ConnectionError:
                print("Connection lost, reconnecting...")
                time.sleep(1)

    def claim_pending(self, min_idle_time: int = 60000):
        """認領閒置訊息"""
        result = self.r.xautoclaim(
            self.stream,
            self.group,
            self.consumer,
            min_idle_time,
            start_id='0-0',
            count=10
        )
        return result

    def stop(self):
        self.running = False

# 使用範例
if __name__ == '__main__':
    processor = OrderProcessor('order-processors', 'consumer-1')

    def signal_handler(sig, frame):
        print("\nShutting down...")
        processor.stop()
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handler)
    processor.run()

死信處理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class DeadLetterHandler:
    def __init__(self):
        self.r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
        self.stream = 'orders'
        self.group = 'order-processors'
        self.dlq = 'orders:dlq'
        self.max_retries = 5

    def check_and_move_to_dlq(self):
        """檢查並移動失敗訊息到死信佇列"""
        pending = self.r.xpending_range(
            self.stream,
            self.group,
            min='-',
            max='+',
            count=100
        )

        for item in pending:
            message_id = item['message_id']
            times_delivered = item['times_delivered']

            if times_delivered > self.max_retries:
                # 讀取原始訊息
                messages = self.r.xrange(self.stream, message_id, message_id)

                if messages:
                    original_data = messages[0][1]

                    # 移至死信佇列
                    self.r.xadd(self.dlq, {
                        'original_id': message_id,
                        'original_data': str(original_data),
                        'retries': str(times_delivered),
                        'moved_at': datetime.now().isoformat()
                    })

                    # 確認原訊息
                    self.r.xack(self.stream, self.group, message_id)
                    print(f"Moved {message_id} to DLQ after {times_delivered} retries")

    def reprocess_dlq(self, handler_func):
        """重新處理死信佇列中的訊息"""
        messages = self.r.xrange(self.dlq, '-', '+')

        for message_id, data in messages:
            try:
                if handler_func(data):
                    self.r.xdel(self.dlq, message_id)
                    print(f"Successfully reprocessed: {message_id}")
            except Exception as e:
                print(f"Still failing: {message_id}, error: {e}")

Node.js 整合

安裝 ioredis:

1
npm install ioredis

基本操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
const Redis = require('ioredis');

const redis = new Redis({
  host: 'localhost',
  port: 6379
});

// 新增訊息
async function addOrder(product, quantity, price) {
  const messageId = await redis.xadd(
    'orders',
    '*',
    'product', product,
    'quantity', quantity.toString(),
    'price', price.toString(),
    'timestamp', new Date().toISOString()
  );
  console.log(`Added order: ${messageId}`);
  return messageId;
}

// 讀取訊息
async function readOrders(count = 10) {
  const messages = await redis.xrange('orders', '-', '+', 'COUNT', count);
  return messages;
}

// 使用範例
async function main() {
  await addOrder('iPhone 15', 1, 999.99);
  await addOrder('MacBook Pro', 1, 2499.99);

  const orders = await readOrders();
  orders.forEach(([id, fields]) => {
    const data = {};
    for (let i = 0; i < fields.length; i += 2) {
      data[fields[i]] = fields[i + 1];
    }
    console.log(`Order ${id}:`, data);
  });

  await redis.quit();
}

main().catch(console.error);

Consumer Group 處理

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
const Redis = require('ioredis');

class OrderProcessor {
  constructor(group, consumer) {
    this.redis = new Redis({ host: 'localhost', port: 6379 });
    this.stream = 'orders';
    this.group = group;
    this.consumer = consumer;
    this.running = true;
  }

  async initialize() {
    // 建立消費者群組
    try {
      await this.redis.xgroup('CREATE', this.stream, this.group, '0', 'MKSTREAM');
    } catch (err) {
      if (!err.message.includes('BUSYGROUP')) {
        throw err;
      }
    }
  }

  async processMessage(messageId, data) {
    try {
      console.log(`Processing order ${messageId}:`, data);
      // 模擬處理邏輯
      await new Promise(resolve => setTimeout(resolve, 100));
      return true;
    } catch (err) {
      console.error(`Error processing ${messageId}:`, err);
      return false;
    }
  }

  parseMessage(fields) {
    const data = {};
    for (let i = 0; i < fields.length; i += 2) {
      data[fields[i]] = fields[i + 1];
    }
    return data;
  }

  async run() {
    console.log(`Consumer ${this.consumer} started`);

    while (this.running) {
      try {
        const result = await this.redis.xreadgroup(
          'GROUP', this.group, this.consumer,
          'COUNT', '1',
          'BLOCK', '5000',
          'STREAMS', this.stream, '>'
        );

        if (!result) continue;

        for (const [streamName, messages] of result) {
          for (const [messageId, fields] of messages) {
            const data = this.parseMessage(fields);

            if (await this.processMessage(messageId, data)) {
              await this.redis.xack(this.stream, this.group, messageId);
              console.log(`Acknowledged: ${messageId}`);
            } else {
              console.log(`Failed to process: ${messageId}`);
            }
          }
        }
      } catch (err) {
        if (err.message.includes('NOGROUP')) {
          await this.initialize();
        } else {
          console.error('Error:', err);
          await new Promise(resolve => setTimeout(resolve, 1000));
        }
      }
    }
  }

  async claimPending(minIdleTime = 60000) {
    const result = await this.redis.xautoclaim(
      this.stream,
      this.group,
      this.consumer,
      minIdleTime,
      '0-0',
      'COUNT', '10'
    );
    return result;
  }

  stop() {
    this.running = false;
  }

  async close() {
    await this.redis.quit();
  }
}

// 使用範例
async function main() {
  const processor = new OrderProcessor('order-processors', 'consumer-1');

  process.on('SIGINT', async () => {
    console.log('\nShutting down...');
    processor.stop();
    await processor.close();
    process.exit(0);
  });

  await processor.initialize();
  await processor.run();
}

main().catch(console.error);

死信處理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class DeadLetterHandler {
  constructor() {
    this.redis = new Redis({ host: 'localhost', port: 6379 });
    this.stream = 'orders';
    this.group = 'order-processors';
    this.dlq = 'orders:dlq';
    this.maxRetries = 5;
  }

  async checkAndMoveToDLQ() {
    const pending = await this.redis.xpending(
      this.stream,
      this.group,
      '-',
      '+',
      100
    );

    for (const item of pending) {
      const [messageId, consumer, idleTime, timesDelivered] = item;

      if (timesDelivered > this.maxRetries) {
        // 讀取原始訊息
        const messages = await this.redis.xrange(
          this.stream,
          messageId,
          messageId
        );

        if (messages.length > 0) {
          const [, fields] = messages[0];

          // 移至死信佇列
          await this.redis.xadd(
            this.dlq,
            '*',
            'original_id', messageId,
            'original_data', JSON.stringify(fields),
            'retries', timesDelivered.toString(),
            'moved_at', new Date().toISOString()
          );

          // 確認原訊息
          await this.redis.xack(this.stream, this.group, messageId);
          console.log(`Moved ${messageId} to DLQ after ${timesDelivered} retries`);
        }
      }
    }
  }

  async reprocessDLQ(handlerFunc) {
    const messages = await this.redis.xrange(this.dlq, '-', '+');

    for (const [messageId, fields] of messages) {
      const data = {};
      for (let i = 0; i < fields.length; i += 2) {
        data[fields[i]] = fields[i + 1];
      }

      try {
        if (await handlerFunc(data)) {
          await this.redis.xdel(this.dlq, messageId);
          console.log(`Successfully reprocessed: ${messageId}`);
        }
      } catch (err) {
        console.log(`Still failing: ${messageId}, error: ${err.message}`);
      }
    }
  }

  async close() {
    await this.redis.quit();
  }
}

監控與效能調校

Redis CLI 監控命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 即時監控所有命令
redis-cli MONITOR

# 查看 Stream 記憶體使用
redis-cli MEMORY USAGE orders

# 查看 Stream 詳細資訊
redis-cli XINFO STREAM orders FULL

# 查看所有 Consumer Groups
redis-cli XINFO GROUPS orders

# 查看群組內的消費者
redis-cli XINFO CONSUMERS orders order-processors

# 查看 Redis 整體統計
redis-cli INFO stats

# 查看記憶體使用
redis-cli INFO memory

效能指標監控腳本

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/bin/bash

STREAM="orders"
GROUP="order-processors"

while true; do
    echo "=== $(date) ==="

    # Stream 長度
    length=$(redis-cli XLEN $STREAM)
    echo "Stream length: $length"

    # 待處理訊息數
    pending=$(redis-cli XPENDING $STREAM $GROUP | head -1)
    echo "Pending messages: $pending"

    # 記憶體使用
    memory=$(redis-cli MEMORY USAGE $STREAM)
    echo "Memory usage: $memory bytes"

    # 消費者狀態
    echo "Consumers:"
    redis-cli XINFO CONSUMERS $STREAM $GROUP

    echo ""
    sleep 5
done

Stream 修剪策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 依訊息數量修剪(精確)
redis-cli XTRIM orders MAXLEN 10000

# 依訊息數量修剪(約略,效能較佳)
redis-cli XTRIM orders MAXLEN ~ 10000

# 依最小 ID 修剪(刪除指定 ID 之前的訊息)
redis-cli XTRIM orders MINID 1690710000000-0

# 在 XADD 時自動修剪
redis-cli XADD orders MAXLEN ~ 10000 * key value

Redis 設定優化

編輯 /etc/redis/redis.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# 記憶體上限
maxmemory 2gb

# 記憶體策略(當達到上限時)
# noeviction: 不淘汰,寫入操作會報錯
# allkeys-lru: 淘汰最少使用的 key
maxmemory-policy noeviction

# 持久化設定
# RDB 快照
save 900 1
save 300 10
save 60 10000

# AOF 持久化
appendonly yes
appendfsync everysec

# Stream 相關設定
stream-node-max-bytes 4096
stream-node-max-entries 100

效能調校建議

  1. 批次讀取:使用 COUNT 參數一次讀取多則訊息
1
2
# 一次讀取 100 則訊息
messages = r.xreadgroup(group, consumer, {stream: '>'}, count=100, block=5000)
  1. 適當的 MAXLEN:避免 Stream 無限增長
1
2
# 使用約略修剪(~)效能較佳
r.xadd('orders', {'key': 'value'}, maxlen=10000, approximate=True)
  1. 合理的 BLOCK 時間:平衡響應速度與資源消耗
1
2
# 5 秒是常見的平衡點
messages = r.xreadgroup(group, consumer, {stream: '>'}, block=5000)
  1. 監控待處理訊息:防止積壓
1
2
3
4
# 定期檢查待處理數量
pending_info = r.xpending(stream, group)
if pending_info['pending'] > 1000:
    alert("Too many pending messages!")
  1. 使用 Pipeline:批次操作提升效能
1
2
3
4
pipe = r.pipeline()
for i in range(100):
    pipe.xadd('orders', {'order': str(i)})
pipe.execute()

Prometheus 監控整合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from prometheus_client import Gauge, start_http_server
import redis
import time

# 定義指標
stream_length = Gauge('redis_stream_length', 'Stream length', ['stream'])
pending_messages = Gauge('redis_pending_messages', 'Pending messages', ['stream', 'group'])
consumer_lag = Gauge('redis_consumer_lag', 'Consumer lag', ['stream', 'group', 'consumer'])

def collect_metrics():
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)

    while True:
        # Stream 長度
        length = r.xlen('orders')
        stream_length.labels(stream='orders').set(length)

        # 待處理訊息
        pending = r.xpending('orders', 'order-processors')
        pending_messages.labels(stream='orders', group='order-processors').set(pending['pending'])

        # 消費者延遲
        consumers = r.xinfo_consumers('orders', 'order-processors')
        for consumer in consumers:
            consumer_lag.labels(
                stream='orders',
                group='order-processors',
                consumer=consumer['name']
            ).set(consumer['pending'])

        time.sleep(15)

if __name__ == '__main__':
    start_http_server(8000)
    collect_metrics()

總結

Redis Streams 提供了一個強大且靈活的訊息佇列解決方案,特別適合:

  • 事件驅動架構:記錄和處理系統事件
  • 微服務通訊:服務間的非同步訊息傳遞
  • 日誌收集:集中式日誌處理
  • 即時資料處理:串流資料分析

最佳實踐總結

  1. 使用 Consumer Groups 實現水平擴展和負載平衡
  2. 實作訊息確認機制 確保可靠性
  3. 設定死信佇列 處理無法處理的訊息
  4. 定期修剪 Stream 控制記憶體使用
  5. 監控待處理訊息數 及早發現問題
  6. 使用批次操作 提升效能

透過本文的介紹,您應該能夠在 Ubuntu 22.04 上成功建置 Redis Streams 訊息佇列系統,並整合到您的應用程式中。

參考資源

comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy