主題的對象,當這個主題對象自身狀態變化時,會通知所有訂閱者,使它們能夠做出相應的處理。
Zookeeper中引入了Watcher機制來實現這種分布式通知的功能。Zookeeper允許客戶端向服務端注冊一個Watcher監聽。當服務端的一些事件出發了Watcher監聽機制,就會向指定得客戶端發送一個事件通知來實現分布式的通知功能。
觸發事件種類: 節點創建、節點刪除、節點改變、子節點改變等。
總的來說可以概括Watcher分為以下三個過程: 客戶端向服務端注冊Watcher、服務端事件觸發Watcher、客戶端回調Watcher得到觸發事件情況。
一次性觸發
事件觸發監聽,一個Watcher Event 就會被發送到設置監聽的客戶端,這種效果是一次性的,后續再發生同樣的事件,不會觸發機制。
事件封裝
Zookeeper使用WatchedEvent對象來封裝服務端事件并傳遞。
WatchedEvent包含每一個事件的三個基本屬性:
通知狀態(keeperState) 、事件類型(EventType)、節點路徑(path)
Event 異步發送
Watcher的通知事件是從服務端異步發送到客戶端的。
先注冊再觸發
Zookeeper的Watch機制,必須客戶端在服務端注冊監聽,服務器端觸發監聽機制,才會通知客戶端。
同一個事件類型在不同的通知狀態中代表的含義有所不同,下表列舉了常見的通知狀態和事件類型。
事件封裝:Watcher得到的事件是被封裝過的, 包括三個內容keeperState, eventType, path
其中連接狀態事件(type=None, path=null)不需要客戶端注冊,客戶端只要有需要直接處理就行了。
4.Shell 客戶端設置Watcher
設置節點數據變動監聽:
通過另一個客戶端更改節點數據:
此時設置監聽的節點收到通知:
5.ZooKeeperJava API操作
這里操作Zookeeper的JavaAPI使用的是一套zookeeper客戶端框架 Curator ,解決了很多Zookeeper客戶端非常底層的細節開發工作 。
Curator包含了幾個包:
curator-framework :對zookeeper的底層api的一些封裝
curator-recipes :封裝了一些高級特性,如:Cache事件監聽、選舉、分布式鎖、分布式計數器等
Maven依賴(使用curator的版本:2.12.0,對應Zookeeper的版本為:3.4.x,如果跨版本會有兼容性問題,很有可能導致節點操作失敗):
1. 引入maven坐標
2.節點的操作
發布訂單系統是日常開發中經常會用到的功能。簡單來說,就是發布者發布消息,訂閱者就會接受到消息并進行相應的處理,如下圖所示。
Redis為我們提供了發布/訂閱的功能模塊PubSub,可以用于消息傳遞。
其中發布者publisher、訂閱者subscriber都是redis客戶端,channel則是redis服務器。
發布者publisher向channel發送消息,訂閱該channel的subscriber就會接收到消息。
發布消息publish
訂閱test1、test2的客戶端會收到消息
按照上述這種方式,如果 訂閱者subscriber想要訂閱多個channel 則需要同時指定多個channel的名稱,redis為了解決這個問題提供 psubscribe模式匹配 這種訂閱方式,可以通過通配符的方式匹配頻道。
發布消息
之前訂閱ch*的客戶端就會收到cha頻道和買粉絲頻道的消息,這樣就一次性訂閱多個頻道
redis服務端存儲了訂閱頻道/模式的客戶端列表
相當于如果客戶端訂閱一個頻道 ,那么服務端的 pubsub_channels 就會存儲一條數據, pubsub_channels 其實是一個鏈表,key對應channel,value對應客戶端列表,根據key訂閱的頻道,就可以找到訂閱該頻道的所有客戶端。
同時如果客戶端訂閱一個模式 , pubsub_patterns 也會新增一條數據,記錄當前客戶端訂閱的模式, pubsub_patterns 也有自己的數據結構,其中就包含了客戶端以及模式。
當發布者向某個頻道發布消息時,就會遍歷 pubsub_channels 找到訂閱該頻道的客戶端列表,依次向這些客戶端發送消息。
然后遍歷 pubsub_patterns 找到符合當前頻道的模式,同時找到模式對應的客戶端,然后向客戶端發送消息。
雖然Redis提供了發布/訂閱的功能,但是并不完善,導致基本沒有合適的場景能夠使用。
PubSub缺點:
直到Redis5.0出現之后,出現了Stream這種數據結構,才終于完善了Redis的消息機制 。
Stream實際上就是一個消息列表,只是他幾乎實現了消息隊列所需要的所有功能,包括:
同時需要注意的是Stream只是一個數據結構,他不會主動把消息推送給消費者,需要消費者主動來消費數據 。
每個Stream都有唯一的名稱,它就是Redis的key,首次使用 xadd 指令追加消息時自動創建。
常見操作命令如下表:
如果客戶端希望知道自身消費到第幾條數據了,那么就需要記錄一下當前消費的消息ID,下次再次消費的時候就從上次消費的消息ID開始讀取數據即可。
消費組中多了一個游標 last_delivered_id ,表示當前消費到了哪一條數據。同時所有的數據都是待處理消息( PEL ),只有消費者處理完畢之后使用 ack 指令告知redis服務器,數據才會從 PEL 中移除,確認后的消息就無法再次消費。
如果接收到的消息比較多,為了避免Stream過長,可以選擇指定Stream的最大長度,一旦到達了最大長度,就會從最早的消息開始清除,保證Stream中最新的消息。