Redis

Redis 5.0 新特性 Stream 嚐鮮

Google+ Pinterest LinkedIn Tumblr
Redis 5.0 新特性 Stream 嚐鮮

Redis5.0最近被作者突然放出來了,增加了很多新的特色功能。而Redis5.0最大的新特性就是多出了一個數據結構Stream,它是一個新的強大的支援多播的可持久化的訊息佇列,作者坦言Redis Stream狠狠地借鑑了Kafka的設計。

Redis Stream的結構如上圖所示,它有一個訊息連結串列,將所有加入的訊息都串起來,每個訊息都有一個唯一的ID和對應的內容。訊息是持久化的,Redis重啟後,內容還在。

每個Stream都有唯一的名稱,它就是Redis的key,在我們首次使用 xadd 指令追加訊息時自動建立。

每個Stream都可以掛多個消費組,每個消費組會有個遊標 last_delivered_id 在Stream陣列之上往前移動,表示當前消費組已經消費到哪條訊息了。每個消費組都有一個Stream內唯一的名稱,消費組不會自動建立,它需要單獨的指令 xgroup create 進行建立,需要指定從Stream的某個訊息ID開始消費,這個ID用來初始化 last_delivered_id 變數。

每個消費組(Consumer Group)的狀態都是獨立的,相互不受影響。也就是說同一份Stream內部的訊息會被每個消費組都消費到。

同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關係,任意一個消費者讀取了訊息都會使遊標 last_delivered_id 往前移動。每個消費者者有一個組內唯一名稱。

消費者(Consumer)內部會有個狀態變數 pending_ids ,它記錄了當前已經被客戶端讀取的訊息,但是還沒有ack。如果客戶端沒有ack,這個變數裏面的訊息ID會越來越多,一旦某個訊息被ack,它就開始減少。這個pending_ids變數在Redis官方被稱之為 PEL ,也就是 Pending Entries List ,這是一個很核心的數據結構,它用來確保客戶端至少消費了訊息一次,而不會在網路傳輸的中途丟失了沒處理。

訊息ID

訊息ID的形式是 timestampInMillis-sequence ,例如 1527846880572-5 ,它表示當前的訊息在毫米時間戳 1527846880572 時產生,並且是該毫秒內產生的第5條訊息。訊息ID可以由伺服器自動生成,也可以由客戶端自己指定,但是形式必須是 整數-整數 ,而且必須是後面加入的訊息的ID要大於前面的訊息ID。

訊息內容

訊息內容就是鍵值對,形如hash結構的鍵值對,這沒什麼特別之處。

增刪改查

  1. xadd 追加訊息

  2. xdel 刪除訊息,這裏的刪除僅僅是設定了標誌位,不影響訊息總長度

  3. xrange 獲取訊息列表,會自動過濾已經刪除的訊息

  4. xlen 訊息長度

  5. del 刪除Stream

# *號表示伺服器自動生成ID,後面順序跟著一堆key/value
127.0.0.1:6379> xadd codehole * name laoqian age 30 # 名字叫laoqian,年齡30歲
1527849609889-0 # 生成的訊息ID
127.0.0.1:6379> xadd codehole * name xiaoyu age 29
1527849629172-0
127.0.0.1:6379> xadd codehole * name xiaoqian age 1
1527849637634-0
127.0.0.1:6379> xlen codehole
(integer) 3
127.0.0.1:6379> xrange codehole - + # -表示最小值, +表示最大值
127.0.0.1:6379> xrange codehole - +
1) 1) 1527849609889-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
2) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
3) 1) 1527849637634-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
127.0.0.1:6379> xrange codehole 1527849629172-0 + # 指定最小訊息ID的列表
1) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
2) 1) 1527849637634-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
127.0.0.1:6379> xrange codehole - 1527849629172-0 # 指定最大訊息ID的列表
1) 1) 1527849609889-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
2) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
127.0.0.1:6379> xdel codehole 1527849609889-0
(integer) 1
127.0.0.1:6379> xlen codehole # 長度不受影響
(integer) 3
127.0.0.1:6379> xrange codehole - + # 被刪除的訊息沒了
1) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
2) 1) 1527849637634-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
127.0.0.1:6379> del codehole # 刪除整個Stream
(integer) 1

獨立消費

我們可以在不定義消費組的情況下進行Stream訊息的獨立消費,當Stream沒有新訊息時,甚至可以阻塞等待。Redis設計了一個單獨的消費指令 xread ,可以將Stream當成普通的訊息佇列(list)來使用。使用xread時,我們可以完全忽略消費組(Consumer Group)的存在,就好比Stream就是一個普通的列表(list)。

# 從Stream頭部讀取兩條訊息
127.0.0.1:6379> xread count 2 streams codehole 0-0
1) 1) "codehole"
2) 1) 1) 1527851486781-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
2) 1) 1527851493405-0
2) 1) "name"
2) "yurui"
3) "age"
4) "29"
# 從Stream尾部讀取一條訊息,毫無疑問,這裏不會返回任何訊息
127.0.0.1:6379> xread count 1 streams codehole $
(nil)
# 從尾部阻塞等待新訊息到來,下面的指令會堵住,直到新訊息到來
127.0.0.1:6379> xread block 0 count 1 streams codehole $
# 我們從新開啟一個視窗,在這個視窗往Stream裡塞訊息
127.0.0.1:6379> xadd codehole * name youming age 60
1527852774092-0
# 再切換到前面的視窗,我們可以看到阻塞解除了,返回了新的訊息內容
# 而且還顯示了一個等待時間,這裏我們等待了93s
127.0.0.1:6379> xread block 0 count 1 streams codehole $
1) 1) "codehole"
2) 1) 1) 1527852774092-0
2) 1) "name"
2) "youming"
3) "age"
4) "60"
(93.11s)

客戶端如果想要使用xread進行順序消費,一定要記住當前消費到哪裏了,也就是返回的訊息ID。下次繼續呼叫xread時,將上次返回的最後一個訊息ID作為引數傳遞進去,就可以繼續消費後續的訊息。

block 0表示永遠阻塞,直到訊息到來,block 1000表示阻塞1s,如果1s內沒有任何訊息到來,就返回nil

127.0.0.1:6379> xread block 1000 count 1 streams codehole $
(nil)
(1.07s)
Redis 5.0 新特性 Stream 嚐鮮

建立消費組

Stream通過 xgroup create 指令建立消費組(Consumer Group),需要傳遞起始訊息ID引數用來初始化 last_delivered_id 變數。

127.0.0.1:6379> xgroup create codehole cg1 0-0  #  表示從頭開始消費
OK
# $表示從尾部開始消費,只接受新訊息,當前Stream訊息會全部忽略
127.0.0.1:6379> xgroup create codehole cg2 $
OK
127.0.0.1:6379> xinfo codehole # 獲取Stream資訊
1) length
2) (integer) 3 # 共3個訊息
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2 # 兩個消費組
9) first-entry # 第一個訊息
10) 1) 1527851486781-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
11) last-entry # 最後一個訊息
12) 1) 1527851498956-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
127.0.0.1:6379> xinfo groups codehole # 獲取Stream的消費組資訊

Write A Comment