1 前言
之前旁邊的小伙伴問我熱點數據相關問題,在給他粗略的講解一波redis數據傾斜的案例之后,自己也順道回顧了一些關于熱點數據處理的方法論,同時也想起去年所學習JD開源項目hotkey——專門用來解決熱點數據問題的框架。在這里結合兩者所關聯到的知識點,通過幾個小圖和部分粗略的講解,來讓大家了解相關方法論以及hotkey的源碼解析。
2 Redis數據傾斜
2.1 定義與危害
先說說數據傾斜的定義,借用百度詞條的解釋:
對于集群系統,一般緩存是分布式的,即不同節點負責一定范圍的緩存數據。我們把緩存數據分散度不夠,導致大量的緩存數據集中到了一臺或者幾臺服務節點上,稱為數據傾斜。一般來說數據傾斜是由于負載均衡實施的效果不好引起的。
從上面的定義中可以得知,數據傾斜的原因一般是因為LB的效果不好,導致部分節點數據量非常集中。
那這又會有什么危害呢?
如果發生了數據傾斜,那么保存了大量數據,或者是保存了熱點數據的實例的處理壓力就會增大,速度變慢,甚至還可能會引起這個實例的內存資源耗盡,從而崩潰。這是我們在應用切片集群時要避免的。
2.2 數據傾斜的分類
2.2.1 數據量傾斜(寫入傾斜)
1.圖示
如圖,在某些情況下,實例上的數據分布不均衡,某個實例上的數據特別多。
2.bigkey導致傾斜
某個實例上正好保存了 bigkey。bigkey 的 value 值很大(String 類型),或者是 bigkey 保存了大量集合元素(集合類型),會導致這個實例的數據量增加,內存資源消耗也相應增加。
應對方法
在業務層生成數據時,要盡量避免把過多的數據保存在同一個鍵值對中。
如果 bigkey 正好是集合類型,還有一個方法,就是把 bigkey 拆分成很多個小的集合類型數據,分散保存在不同的實例上。
3.Slot分配不均導致傾斜
先簡單的介紹一下slot的概念,slot其實全名是Hash Slot(哈希槽),在Redis Cluster切片集群中一共有16384 個 Slot,這些哈希槽類似于數據分區,每個鍵值對都會根據它的 key,被映射到一個哈希槽中。Redis Cluster 方案采用哈希槽來處理數據和實例之間的映射關系。
一張圖來解釋,數據、哈希槽、實例這三者的映射分布情況。
這里的CRC16(city)%16384可以簡單的理解為將key1根據CRC16算法取hash值然后對slot個數取模,得到的就是slot位置為14484,他所對應的實例節點是第三個。
運維在構建切片集群時候,需要手動分配哈希槽,并且把16384 個槽都分配完,否則 Redis 集群無法正常工作。由于是手動分配,則可能會導致部分實例所分配的slot過多,導致數據傾斜。
應對方法
使用CLUSTER SLOTS 命令來查看slot分配情況,使用CLUSTER SETSLOT,CLUSTER GETKEYSINSLOT,MIGRATE這三個命令來進行slot數據的遷移,具體內容不再這里細說,感興趣的同學可以自行學習一下。
4.Hash Tag導致傾斜
Hash Tag 定義 :指當一個key包含 {} 的時候,就不對整個key做hash,而僅對 {} 包括的字符串做hash。
假設hash算法為sha1。對user:{user1}:ids和user:{user1}:tweets,其hash值都等同于sha1(user1)。
Hash Tag 優勢 :如果不同 key 的 Hash Tag 內容都是一樣的,那么,這些 key 對應的數據會被映射到同一個 Slot 中,同時會被分配到同一個實例上。
Hash Tag 劣勢 :如果不合理使用,會導致大量的數據可能被集中到一個實例上發生數據傾斜,集群中的負載不均衡。
2.2.2 數據訪問傾斜(讀取傾斜-熱key問題)
一般來說數據訪問傾斜就是熱key問題導致的,如何處理redis熱key問題也是面試中常會問到的。所以了解相關概念及方法論也是不可或缺的一環。
1.圖示
如圖,雖然每個集群實例上的數據量相差不大,但是某個實例上的數據是熱點數據,被訪問得非常頻繁。
但是為啥會有熱點數據的產生呢?
2.產生熱key的原因及危害
1)用戶消費的數據遠大于生產的數據(熱賣商品、熱點新聞、熱點評論、明星直播)。
在日常工作生活中一些突發的的事件,例如:雙十一期間某些熱門商品的降價促銷,當這其中的某一件商品被數萬次點擊瀏覽或者購買時,會形成一個較大的需求量,這種情況下就會造成熱點問題。
同理,被大量刊發、瀏覽的熱點新聞、熱點評論、明星直播等,這些典型的讀多寫少的場景也會產生熱點問題。
2)請求分片集中,超過單 Server 的性能極限。
在服務端讀數據進行訪問時,往往會對數據進行分片切分,此過程中會在某一主機 Server 上對相應的 Key 進行訪問,當訪問超過 Server 極限時,就會導致熱點 Key 問題的產生。
如果熱點過于集中,熱點 Key 的緩存過多,超過目前的緩存容量時,就會導致緩存分片服務被打垮現象的產生。當緩存服務崩潰后,此時再有請求產生,會緩存到后臺 DB 上,由于DB 本身性能較弱,在面臨大請求時很容易發生請求穿透現象,會進一步導致雪崩現象,嚴重影響設備的性能。
3.常用的熱key問題解決辦法:
解決方案一: 備份熱key
可以把熱點數據復制多份,在每一個數據副本的 key 中增加一個隨機后綴,讓它和其它副本數據不會被映射到同一個 Slot 中。
這里相當于把一份數據復制到其他實例上,這樣在訪問的時候也增加隨機前綴,將對一個實例的訪問壓力,均攤到其他實例上
例如:
我們在放入緩存時就將對應業務的緩存key拆分成多個不同的key。如下圖所示,我們首先在更新緩存的一側,將key拆成N份,比如一個key名字叫做”good_100”,那我們就可以把它拆成四份,“good_100_copy1”、“good_100_copy2”、“good_100_copy3”、“good_100_copy4”,每次更新和新增時都需要去改動這N個key,這一步就是拆key。
對于service端來講,我們就需要想辦法盡量將自己訪問的流量足夠的均勻。
如何給自己即將訪問的熱key上加入后綴?幾種辦法,根據本機的ip或mac地址做hash,之后的值與拆key的數量做取余,最終決定拼接成什么樣的key后綴,從而打到哪臺機器上;服務啟動時的一個隨機數對拆key的數量做取余。
偽代碼如下:
const M = N * 2
//生成隨機數
random = GenRandom(0, M)
//構造備份新key
bakHotKey = hotKey + “_” + random
data = redis.GET(bakHotKey)
if data == NULL {
data = GetFromDB()
redis.SET(bakHotKey, expireTime + GenRandom(0,5))
}
解決方案二: 本地緩存+動態計算自動發現熱點緩存 基本流程圖
該方案通過主動發現熱點并對其進行存儲來解決熱點 Key 的問題。首先 Client 也會訪問 SLB,并且通過 SLB 將各種請求分發至 Proxy 中,Proxy 會按照基于路由的方式將請求轉發至后端的 Redis 中。 在熱點 key 的解決上是采用在服務端增加緩存的方式進行。具體來說就是在 Proxy 上增加本地緩存,本地緩存采用 LRU 算法來緩存熱點數據,后端節點增加熱點數據計算模塊來返回熱點數據。
Proxy 架構的主要有以下優點:
Proxy 本地緩存熱點,讀能力可水平擴展
DB 節點定時計算熱點數據集合
DB 反饋 Proxy 熱點數據
對客戶端完全透明,不需做任何兼容
熱點數據的發現與存儲
對于熱點數據的發現,首先會在一個周期內對 Key 進行請求統計,在達到請求量級后會對熱點 Key 進行熱點定位,并將所有的熱點 Key 放入一個小的 LRU 鏈表內,在通過 Proxy 請求進行訪問時,若 Redis 發現待訪點是一個熱點,就會進入一個反饋階段,同時對該數據進行標記。 可以使用一個etcd或者zk集群來存儲反饋的熱點數據,然后本地所有節點監聽該熱點數據,進而加載到本地JVM緩存中。
熱點數據的獲取
在熱點 Key 的處理上主要分為寫入跟讀取兩種形式,在數據寫入過程當 SLB 收到數據 K1 并將其通過某一個 Proxy 寫入一個 Redis,完成數據的寫入。 假若經過后端熱點模塊計算發現 K1 成為熱點 key 后, Proxy 會將該熱點進行緩存,當下次客戶端再進行訪問 K1 時,可以不經 Redis。 最后由于 proxy 是可以水平擴充的,因此可以任意增強熱點數據的訪問能力。
最佳成熟方案: JD開源hotKey 這是目前較為成熟的自動探測熱key、分布式一致性緩存解決方案。原理就是在client端做洞察,然后上報對應hotkey,server端檢測到后,將對應hotkey下發到對應服務端做本地緩存,并且能保證本地緩存和遠程緩存的一致性。
在這里咱們就不細談了,這篇文章的第三部分:JD開源hotkey源碼解析里面會帶領大家了解其整體原理。
3 JD開源hotkey—自動探測熱key、分布式一致性緩存解決方案
3.1 解決痛點
從上面可知,熱點key問題在并發量比較高的系統中(特別是做秒殺活動)出現的頻率會比較高,對系統帶來的危害也很大。 那么針對此,hotkey誕生的目的是什么?需要解決的痛點是什么?以及它的實現原理。
在這里引用項目上的一段話來概述: 對任意突發性的無法預先感知的熱點數據,包括并不限于熱點數據(如突發大量請求同一個商品)、熱用戶(如惡意爬蟲刷子)、熱接口(突發海量請求同一個接口)等,進行毫秒級精準探測到。然后對這些熱數據、熱用戶等,推送到所有服務端JVM內存中,以大幅減輕對后端數據存儲層的沖擊,并可以由使用者決定如何分配、使用這些熱key(譬如對熱商品做本地緩存、對熱用戶進行拒絕訪問、對熱接口進行熔斷或返回默認值)。這些熱數據在整個服務端集群內保持一致性,并且業務隔離。
核心功能:熱數據探測并推送至集群各個服務器
3.2 集成方式
集成方式在這里就不詳述了,感興趣的同學可以自行搜索。
3.3 源碼解析
3.3.1 架構簡介
1.全景圖一覽
流程介紹:
客戶端通過引用hotkey的client包,在啟動的時候上報自己的信息給worker,同時和worker之間建立長連接。定時拉取配置中心上面的規則信息和worker集群信息。
客戶端調用hotkey的ishot()的方法來首先匹配規則,然后統計是不是熱key。
通過定時任務把熱key數據上傳到worker節點。
worker集群在收取到所有關于這個key的數據以后(因為通過hash來決定key 上傳到哪個worker的,所以同一個key只會在同一個worker節點上),在和定義的規則進行匹配后判斷是不是熱key,如果是則推送給客戶端,完成本地緩存。
2.角色構成
這里直接借用作者的描述: 1)etcd集群 etcd作為一個高性能的配置中心,可以以極小的資源占用,提供高效的監聽訂閱服務。主要用于存放規則配置,各worker的ip地址,以及探測出的熱key、手工添加的熱key等。
2)client端jar包 就是在服務中添加的引用jar,引入后,就可以以便捷的方式去判斷某key是否熱key。同時,該jar完成了key上報、監聽etcd里的rule變化、worker信息變化、熱key變化,對熱key進行本地caffeine緩存等。
3) worker端集群 worker端是一個獨立部署的Java程序,啟動后會連接etcd,并定期上報自己的ip信息,供client端獲取地址并進行長連接。之后,主要就是對各個client發來的待測key進行累加計算,當達到etcd里設定的rule閾值后,將熱key推送到各個client。
4) dashboard控制臺 控制臺是一個帶可視化界面的Java程序,也是連接到etcd,之后在控制臺設置各個APP的key規則,譬如2秒20次算熱。然后當worker探測出來熱key后,會將key發往etcd,dashboard也會監聽熱key信息,進行入庫保存記錄。同時,dashboard也可以手工添加、刪除熱key,供各個client端監聽。
3.hotkey工程結構
3.3.2 client端
主要從下面三個方面來解析源碼:
4.客戶端啟動器
1)啟動方式
@PostConstruct
public void init() {
ClientStarter.Builder builder = new ClientStarter.Builder();
ClientStarter starter = builder.setAppName(appName).setEtcdServer(etcd).build();
starter.startPipeline();
}
appName:是這個應用的名稱,一般為${spring.application.name}的值,后續所有的配置都以此為開頭 etcd:是etcd集群的地址,用逗號分隔,配置中心。 還可以看到ClientStarter實現了建造者模式,使代碼更為簡介。
2)核心入口 com.jd.platform.hotkey.client.ClientStarter#startPipeline
/**
* 啟動監聽etcd
*/
public void startPipeline() {
JdLogger.info(getClass(), "etcdServer:" + etcdServer);
//設置caffeine的最大容量
Context.CAFFEINE_SIZE = caffeineSize;
//設置etcd地址
EtcdConfigFactory.buildConfigCenter(etcdServer);
//開始定時推送
PushSchedulerStarter.startPusher(pushPeriod);
PushSchedulerStarter.startCountPusher(10);
//開啟worker重連器
WorkerRetryConnector.retryConnectWorkers();
registEventBus();
EtcdStarter starter = new EtcdStarter();
//與etcd相關的監聽都開啟
starter.start();
}
該方法主要有五個功能:
① 設置本地緩存(caffeine)的最大值,并創建etcd實例
//設置caffeine的最大容量
Context.CAFFEINE_SIZE = caffeineSize;
//設置etcd地址
EtcdConfigFactory.buildConfigCenter(etcdServer);
caffeineSize是本地緩存的最大值,在啟動的時候可以設置,不設置默認為200000。 etcdServer是上面說的etcd集群地址。
Context可以理解為一個配置類,里面就包含兩個字段:
public class Context {
public static String APP_NAME;
public static int CAFFEINE_SIZE;
}
EtcdConfigFactory是ectd配置中心的工廠類
public class EtcdConfigFactory {
private static IConfigCenter configCenter;
private EtcdConfigFactory() {}
public static IConfigCenter configCenter() {
return configCenter;
}
public static void buildConfigCenter(String etcdServer) {
//連接多個時,逗號分隔
configCenter = JdEtcdBuilder.build(etcdServer);
}
}
通過其configCenter()方法獲取創建etcd實例對象,IConfigCenter接口封裝了etcd實例對象的行為(包括基本的crud、監控、續約等)
② 創建并啟動定時任務:PushSchedulerStarter
//開始定時推送
PushSchedulerStarter.startPusher(pushPeriod);//每0.5秒推送一次待測key
PushSchedulerStarter.startCountPusher(10);//每10秒推送一次數量統計,不可配置
pushPeriod是推送的間隔時間,可以再啟動的時候設置,最小為0.05s,推送越快,探測的越密集,會越快探測出來,但對client資源消耗相應增大
PushSchedulerStarter類
/**
* 每0.5秒推送一次待測key
*/
public static void startPusher(Long period) {
if (period == null || period <= 0) {
period = 500L;
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-pusher-service-executor", true));
scheduledExecutorService.scheduleAtFixedRate(() -> {
//熱key的收集器
IKeyCollector
//這里相當于每0.5秒,通過netty來給worker來推送收集到的熱key的信息,主要是一些熱key的元數據信息(熱key來源的app和key的類型和是否是刪除事件,還有該熱key的上報次數)
//這里面還有就是該熱key在每次上報的時候都會生成一個全局的唯一id,還有該熱key每次上報的創建時間是在netty發送的時候來生成,同一批次的熱key時間是相同的
List
if(CollectionUtil.isNotEmpty(hotKeyModels)){
//積攢了半秒的key集合,按照hash分發到不同的worker
KeyHandlerFactory.getPusher().send(Context.APP_NAME, hotKeyModels);
collectHK.finishOnce();
}
},0, period, TimeUnit.MILLISECONDS);
}
/**
* 每10秒推送一次數量統計
*/
public static void startCountPusher(Integer period) {
if (period == null || period <= 0) {
period = 10;
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-count-pusher-service-executor", true));
scheduledExecutorService.scheduleAtFixedRate(() -> {
IKeyCollector
List
if(CollectionUtil.isNotEmpty(keyCountModels)){
//積攢了10秒的數量,按照hash分發到不同的worker
KeyHandlerFactory.getPusher().sendCount(Context.APP_NAME, keyCountModels);
collectHK.finishOnce();
}
},0, period, TimeUnit.SECONDS);
}
從上面兩個方法可知,都是通過定時線程池來實現定時任務的,都是守護線程。
咱們重點關注一下KeyHandlerFactory類,它是client端設計的一個比較巧妙的地方,從類名上直譯為key處理工廠。具體的實例對象是DefaultKeyHandler:
public class DefaultKeyHandler {
//推送HotKeyMsg消息到Netty的推送者
private IKeyPusher iKeyPusher = new NettyKeyPusher();
//待測key的收集器,這里面包含兩個map,key主要是熱key的名字,value主要是熱key的元數據信息(比如:熱key來源的app和key的類型和是否是刪除事件)
private IKeyCollector
//數量收集器,這里面包含兩個map,這里面key是相應的規則,HitCount里面是這個規則的總訪問次數和熱后訪問次數
private IKeyCollector
public IKeyPusher keyPusher() {
return iKeyPusher;
}
public IKeyCollector
return iKeyCollector;
}
public IKeyCollector
return iKeyCounter;
}
}
這里面有三個成員對象,分別是封裝推送消息到netty的NettyKeyPusher、待測key收集器TurnKeyCollector、數量收集器TurnCountCollector,其中后兩者都實現了接口IKeyCollector,能對hotkey的處理起到有效的聚合,充分體現了代碼的高內聚。 先來看看封裝推送消息到netty的NettyKeyPusher:
/**
* 將msg推送到netty的pusher
* @author wuweifeng wrote on 2020-01-06
* @version 1.0
*/
public class NettyKeyPusher implements IKeyPusher {
@Override
public void send(String appName, List
//積攢了半秒的key集合,按照hash分發到不同的worker
long now = System.currentTimeMillis();
Map
for(HotKeyModel model : list) {
model.setCreateTime(now);
Channel channel = WorkerInfoHolder.chooseChannel(model.getKey());
if (channel == null) {
continue;
}
List
newList.add(model);
}
for (Channel channel : map.keySet()) {
try {
List
HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_NEW_KEY, Context.APP_NAME);
hotKeyMsg.setHotKeyModels(batch);
channel.writeAndFlush(hotKeyMsg).sync();
} catch (Exception e) {
try {
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());
} catch (Exception ex) {
JdLogger.error(getClass(),"flush error");
}
}
}
}
@Override
public void sendCount(String appName, List
//積攢了10秒的數量,按照hash分發到不同的worker
long now = System.currentTimeMillis();
Map
for(KeyCountModel model : list) {
model.setCreateTime(now);
Channel channel = WorkerInfoHolder.chooseChannel(model.getRuleKey());
if (channel == null) {
continue;
}
List
newList.add(model);
}
for (Channel channel : map.keySet()) {
try {
List
HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_HIT_COUNT, Context.APP_NAME);
hotKeyMsg.setKeyCountModels(batch);
channel.writeAndFlush(hotKeyMsg).sync();
} catch (Exception e) {
try {
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());
} catch (Exception ex) {
JdLogger.error(getClass(),"flush error");
}
}
}
}
}
send(String appName, Listlist) 主要是將TurnKeyCollector收集的待測key通過netty推送給worker,HotKeyModel對象主要是一些熱key的元數據信息(熱key來源的app和key的類型和是否是刪除事件,還有該熱key的上報次數) sendCount(String appName, Listlist) 主要是將TurnCountCollector收集的規則所對應的key通過netty推送給worker,KeyCountModel對象主要是一些key所對應的規則信息以及訪問次數等 WorkerInfoHolder.chooseChannel(model.getRuleKey()) 根據hash算法獲取key對應的服務器,分發到對應服務器相應的Channel 連接,所以服務端可以水平無限擴容,毫無壓力問題。
再來分析一下key收集器:TurnKeyCollector與TurnCountCollector: 實現IKeyCollector接口:
/**
* 對hotkey進行聚合
* @author wuweifeng wrote on 2020-01-06
* @version 1.0
*/
public interface IKeyCollector
/**
* 鎖定后的返回值
*/
List
/**
* 輸入的參數
*/
void collect(T t);
void finishOnce();
}
lockAndGetResult() 主要是獲取返回collect方法收集的信息,并將本地暫存的信息清空,方便下個統計周期積攢數據。 collect(T t) 顧名思義他是收集api調用的時候,將收集的到key信息放到本地存儲。 finishOnce() 該方法目前實現都是空,無需關注。
待測key收集器:TurnKeyCollector
public class TurnKeyCollector implements IKeyCollector
//這map里面的key主要是熱key的名字,value主要是熱key的元數據信息(比如:熱key來源的app和key的類型和是否是刪除事件)
private ConcurrentHashMap
private ConcurrentHashMap
private AtomicLong atomicLong = new AtomicLong(0);
@Override
public List
//自增后,對應的map就會停止被寫入,等待被讀取
atomicLong.addAndGet(1);
List
//可以觀察這里與collect方法里面的相同位置,會發現一個是操作map0一個是操作map1,這樣保證在讀map的時候,不會阻塞寫map,
//兩個map同時提供輪流提供讀寫能力,設計的很巧妙,值得學習
if (atomicLong.get() % 2 == 0) {
list = get(map1);
map1.clear();
} else {
list = get(map0);
map0.clear();
}
return list;
}
private List
return CollectionUtil.list(false, map.values());
}
@Override
public void collect(HotKeyModel hotKeyModel) {
String key = hotKeyModel.getKey();
if (StrUtil.isEmpty(key)) {
return;
}
if (atomicLong.get() % 2 == 0) {
//不存在時返回null并將key-value放入,已有相同key時,返回該key對應的value,并且不覆蓋
HotKeyModel model = map0.putIfAbsent(key, hotKeyModel);
if (model != null) {
//增加該hotMey上報的次數
model.add(hotKeyModel.getCount());
}
} else {
HotKeyModel model = map1.putIfAbsent(key, hotKeyModel);
if (model != null) {
model.add(hotKeyModel.getCount());
}
}
}
@Override
public void finishOnce() {}
}
可以看到該類中有兩個ConcurrentHashMap和一個AtomicLong,通過對AtomicLong來自增,然后對2取模,來分別控制兩個map的讀寫能力,保證每個map都能做讀寫,并且同一個map不能同時讀寫,這樣可以避免并發集合讀寫不阻塞,這一塊無鎖化的設計還是非常巧妙的,極大的提高了收集的吞吐量。 key數量收集器:TurnCountCollector 這里的設計與TurnKeyCollector大同小異,咱們就不細談了。值得一提的是它里面有個并行處理的機制,當收集的數量超過DATA_CONVERT_SWITCH_THRESHOLD=5000的閾值時,lockAndGetResult處理是使用java Stream并行流處理,提升處理的效率。
③ 開啟worker重連器
//開啟worker重連器
WorkerRetryConnector.retryConnectWorkers();
public class WorkerRetryConnector {
/**
* 定時去重連沒連上的workers
*/
public static void retryConnectWorkers() {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("worker-retry-connector-service-executor", true));
//開啟拉取etcd的worker信息,如果拉取失敗,則定時繼續拉取
scheduledExecutorService.scheduleAtFixedRate(WorkerRetryConnector::reConnectWorkers, 30, 30, TimeUnit.SECONDS);
}
private static void reConnectWorkers() {
List
if (nonList.size() == 0) {
return;
}
JdLogger.info(WorkerRetryConnector.class, "trying to reConnect to these workers :" + nonList);
NettyClient.getInstance().connect(nonList);//這里會觸發netty連接方法channelActive
}
}
也是通過定時線程來執行,默認時間間隔是30s,不可設置。 通過WorkerInfoHolder來控制client的worker連接信息,連接信息是個List,用的CopyOnWriteArrayList,畢竟是一個讀多寫少的場景,類似與元數據信息。
/**
* 保存worker的ip地址和Channel的映射關系,這是有序的。每次client發送消息時,都會根據該map的size進行hash
* 如key-1就發送到workerHolder的第1個Channel去,key-2就發到第2個Channel去
*/
private static final List
④ 注冊EventBus事件訂閱者
private void registEventBus() {
//netty連接器會關注WorkerInfoChangeEvent事件
EventBusCenter.register(new WorkerChangeSubscriber());
//熱key探測回調關注熱key事件
EventBusCenter.register(new ReceiveNewKeySubscribe());
//Rule的變化的事件
EventBusCenter.register(new KeyRuleHolder());
}
使用guava的EventBus事件消息總線,利用發布/訂閱者模式來對項目進行解耦。它可以利用很少的代碼,來實現多組件間通信。 基本原理圖如下:
監聽worker信息變動:WorkerChangeSubscriber
/**
* 監聽worker信息變動
*/
@Subscribe
public void connectAll(WorkerInfoChangeEvent event) {
List
if (addresses == null) {
addresses = new ArrayList<>();
}
WorkerInfoHolder.mergeAndConnectNew(addresses);
}
/**
* 當client與worker的連接斷開后,刪除
*/
@Subscribe
public void channelInactive(ChannelInactiveEvent inactiveEvent) {
//獲取斷線的channel
Channel channel = inactiveEvent.getChannel();
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
String address = socketAddress.getHostName() + ":" + socketAddress.getPort();
JdLogger.warn(getClass(), "this channel is inactive : " + socketAddress + " trying to remove this connection");
WorkerInfoHolder.dealChannelInactive(address);
}
監聽熱key回調事件:ReceiveNewKeySubscribe
private ReceiveNewKeyListener receiveNewKeyListener = new DefaultNewKeyListener();
@Subscribe
public void newKeyComing(ReceiveNewKeyEvent event) {
HotKeyModel hotKeyModel = event.getModel();
if (hotKeyModel == null) {
return;
}
//收到新key推送
if (receiveNewKeyListener != null) {
receiveNewKeyListener.newKey(hotKeyModel);
}
}
該方法會收到新的熱key訂閱事件之后,會將其加入到KeyHandlerFactory的收集器里面處理。
核心處理邏輯
@Override
public void newKey(HotKeyModel hotKeyModel) {
long now = System.currentTimeMillis();
//如果key到達時已經過去1秒了,記錄一下。手工刪除key時,沒有CreateTime
if (hotKeyModel.getCreateTime() != 0 && Math.abs(now - hotKeyModel.getCreateTime()) > 1000) {
JdLogger.warn(getClass(), "the key comes too late : " + hotKeyModel.getKey() + " now " +
+now + " keyCreateAt " + hotKeyModel.getCreateTime());
}
if (hotKeyModel.isRemove()) {
//如果是刪除事件,就直接刪除
deleteKey(hotKeyModel.getKey());
return;
}
//已經是熱key了,又推過來同樣的熱key,做個日志記錄,并刷新一下
if (JdHotKeyStore.isHot(hotKeyModel.getKey())) {
JdLogger.warn(getClass(), "receive repeat hot key :" + hotKeyModel.getKey() + " at " + now);
}
addKey(hotKeyModel.getKey());
}
private void deleteKey(String key) {
CacheFactory.getNonNullCache(key).delete(key);
}
private void addKey(String key) {
ValueModel valueModel = ValueModel.defaultValue(key);
if (valueModel == null) {
//不符合任何規則
deleteKey(key);
return;
}
//如果原來該key已經存在了,那么value就被重置,過期時間也會被重置。如果原來不存在,就新增的熱key
JdHotKeyStore.setValueDirectly(key, valueModel);
}
如果該HotKeyModel里面是刪除事件,則獲取RULE_CACHE_MAP里面該key超時時間對應的caffeine,然后從中刪除該key緩存,然后返回(這里相當于刪除了本地緩存)。
如果不是刪除事件,則在RULE_CACHE_MAP對應的caffeine緩存中添加該key的緩存。
這里有個注意點,如果不為刪除事件,調用addKey()方法在caffeine增加緩存的時候,value是一個魔術值0x12fcf76,這個值只代表加了這個緩存,但是這個緩存在查詢的時候相當于為null。
監聽Rule的變化事件:KeyRuleHolder
可以看到里面有兩個成員屬性:RULE_CACHE_MAP,KEY_RULES
/**
* 保存超時時間和caffeine的映射,key是超時時間,value是caffeine[(String,Object)]
*/
private static final ConcurrentHashMap
/**
* 這里KEY_RULES是保存etcd里面該appName所對應的所有rule
*/
private static final List
ConcurrentHashMapRULE_CACHE_MAP:
保存超時時間和caffeine的映射,key是超時時間,value是caffeine[(String,Object)]。
巧妙的設計:這里將key的過期時間作為分桶策略,這樣同一個過期時間的key就會在一個桶(caffeine)里面,這里面每一個caffeine都是client的本地緩存,也就是說hotKey的本地緩存的KV實際上是存儲在這里面的。
ListKEY_RULES:
這里KEY_RULES是保存etcd里面該appName所對應的所有rule。
具體監聽KeyRuleInfoChangeEvent事件方法:
@Subscribe
public void ruleChange(KeyRuleInfoChangeEvent event) {
JdLogger.info(getClass(), "new rules info is :" + event.getKeyRules());
List
if (ruleList == null) {
return;
}
putRules(ruleList);
}
核心處理邏輯
/**
* 所有的規則,如果規則的超時時間變化了,會重建caffeine
*/
public static void putRules(List
synchronized (KEY_RULES) {
//如果規則為空,清空規則表
if (CollectionUtil.isEmpty(keyRules)) {
KEY_RULES.clear();
RULE_CACHE_MAP.clear();
return;
}
KEY_RULES.clear();
KEY_RULES.addAll(keyRules);
Set
for (Integer duration : RULE_CACHE_MAP.keySet()) {
//先清除掉那些在RULE_CACHE_MAP里存的,但是rule里已沒有的
if (!durationSet.contains(duration)) {
RULE_CACHE_MAP.remove(duration);
}
}
//遍歷所有的規則
for (KeyRule keyRule : keyRules) {
int duration = keyRule.getDuration();
//這里如果RULE_CACHE_MAP里面沒有超時時間為duration的value,則新建一個放入到RULE_CACHE_MAP里面
//比如RULE_CACHE_MAP本來就是空的,則在這里來構建RULE_CACHE_MAP的映射關系
//TODO 如果keyRules里面包含相同duration的keyRule,則也只會建一個key為duration,value為caffeine,其中caffeine是(string,object)
if (RULE_CACHE_MAP.get(duration) == null) {
LocalCache cache = CacheFactory.build(duration);
RULE_CACHE_MAP.put(duration, cache);
}
}
}
}
使用synchronized關鍵字來保證線程安全;
如果規則為空,清空規則表(RULE_CACHE_MAP、KEY_RULES);
使用傳遞進來的keyRules來覆蓋KEY_RULES;
清除掉RULE_CACHE_MAP里面在keyRules沒有的映射關系;
遍歷所有的keyRules,如果RULE_CACHE_MAP里面沒有相關的超時時間key,則在里面賦值;
⑤ 啟動EtcdStarter(etcd連接管理器)
EtcdStarter starter = new EtcdStarter();
//與etcd相關的監聽都開啟
starter.start();
public void start() {
fetchWorkerInfo();
fetchRule();
startWatchRule();
//監聽熱key事件,只監聽手工添加、刪除的key
startWatchHotKey();
}
fetchWorkerInfo() 從etcd里面拉取worker集群地址信息allAddress,并更新WorkerInfoHolder里面的WORKER_HOLDER
/**
* 每隔30秒拉取worker信息
*/
private void fetchWorkerInfo() {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
//開啟拉取etcd的worker信息,如果拉取失敗,則定時繼續拉取
scheduledExecutorService.scheduleAtFixedRate(() -> {
JdLogger.info(getClass(), "trying to connect to etcd and fetch worker info");
fetch();
}, 0, 30, TimeUnit.SECONDS);
}
使用定時線程池來執行,單線程。
定時從etcd里面獲取,地址/jd/workers/+$appName或default,時間間隔不可設置,默認30秒,這里面存儲的是worker地址的ip+port。
發布WorkerInfoChangeEvent事件。
備注:地址有$appName或default,在worker里面配置,如果把worker放到某個appName下,則該worker只會參與該app的計算。
fetchRule() 定時線程來執行,單線程,時間間隔不可設置,默認是5秒,當拉取規則配置和手動配置的hotKey成功后,該線程被終止(也就是說只會成功執行一次),執行失敗繼續執行
private void fetchRule() {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
//開啟拉取etcd的worker信息,如果拉取失敗,則定時繼續拉取
scheduledExecutorService.scheduleAtFixedRate(() -> {
JdLogger.info(getClass(), "trying to connect to etcd and fetch rule info");
boolean success = fetchRuleFromEtcd();
if (success) {
//拉取已存在的熱key
fetchExistHotKey();
//這里如果拉取規則和拉取手動配置的hotKey成功之后,則該定時執行線程停止
scheduledExecutorService.shutdown();
}
}, 0, 5, TimeUnit.SECONDS);
}
fetchRuleFromEtcd()
從etcd里面獲取該appName配置的rule規則,地址/jd/rules/+$appName。
如果查出來規則rules為空,會通過發布KeyRuleInfoChangeEvent事件來清空本地的rule配置緩存和所有的規則key緩存。
發布KeyRuleInfoChangeEvent事件。
fetchExistHotKey()
從etcd里面獲取該appName手動配置的熱key,地址/jd/hotkeys/+$appName。
發布ReceiveNewKeyEvent事件,并且內容HotKeyModel不是刪除事件。
startWatchRule()
/**
* 異步監聽rule規則變化
*/
private void startWatchRule() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
JdLogger.info(getClass(), "--- begin watch rule change ----");
try {
IConfigCenter configCenter = EtcdConfigFactory.configCenter();
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.rulePath + Context.APP_NAME);
//如果有新事件,即rule的變更,就重新拉取所有的信息
while (watchIterator.hasNext()) {
//這句必須寫,next會讓他卡住,除非真的有新rule變更
WatchUpdate watchUpdate = watchIterator.next();
List
JdLogger.info(getClass(), "rules info changed. begin to fetch new infos. rule change is " + eventList);
//全量拉取rule信息
fetchRuleFromEtcd();
}
} catch (Exception e) {
JdLogger.error(getClass(), "watch err");
}
});
}
異步監聽rule規則變化,使用etcd監聽地址為/jd/rules/+$appName的節點變化。
使用線程池,單線程,異步監聽rule規則變化,如果有事件變化,則調用fetchRuleFromEtcd()方法。
startWatchHotKey() 異步開始監聽熱key變化信息,使用etcd監聽地址前綴為/jd/hotkeys/+$appName
/**
* 異步開始監聽熱key變化信息,該目錄里只有手工添加的key信息
*/
private void startWatchHotKey() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
JdLogger.info(getClass(), "--- begin watch hotKey change ----");
IConfigCenter configCenter = EtcdConfigFactory.configCenter();
try {
KvClient.WatchIterator watchIterator = configCenter.watchPrefix(ConfigConstant.hotKeyPath + Context.APP_NAME);
//如果有新事件,即新key產生或刪除
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List
KeyValue keyValue = eventList.get(0).getKv();
Event.EventType eventType = eventList.get(0).getType();
try {
//從這個地方可以看出,etcd給的返回是節點的全路徑,而我們需要的key要去掉前綴
String key = keyValue.getKey().toStringUtf8().replace(ConfigConstant.hotKeyPath + Context.APP_NAME + "/", "");
//如果是刪除key,就立刻刪除
if (Event.EventType.DELETE == eventType) {
HotKeyModel model = new HotKeyModel();
model.setRemove(true);
model.setKey(key);
EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
} else {
HotKeyModel model = new HotKeyModel();
model.setRemove(false);
String value = keyValue.getValue().toStringUtf8();
//新增熱key
JdLogger.info(getClass(), "etcd receive new key : " + key + " --value:" + value);
//如果這是一個刪除指令,就什么也不干
//TODO 這里有個疑問,監聽到worker自動探測發出的惰性刪除指令,這里之間跳過了,但是本地緩存沒有更新吧?
//TODO 所以我猜測在客戶端使用判斷緩存是否存在的api里面,應該會判斷相關緩存的value值是否為"#[DELETE]#"刪除標記
//解疑:這里確實只監聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動配置hotKey,worker自動探測的hotKey是直接通過netty通道來告知client的
if (Constant.DEFAULT_DELETE_VALUE.equals(value)) {
continue;
}
//手工創建的value是時間戳
model.setCreateTime(Long.valueOf(keyValue.getValue().toStringUtf8()));
model.setKey(key);
EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
}
} catch (Exception e) {
JdLogger.error(getClass(), "new key err :" + keyValue);
}
}
} catch (Exception e) {
JdLogger.error(getClass(), "watch err");
}
});
}
使用線程池,單線程,異步監聽熱key變化
使用etcd監聽前綴地址的當前節點以及子節點的所有變化值
刪除節點動作
發布ReceiveNewKeyEvent事件,并且內容HotKeyModel是刪除事件
新增or更新節點動作
事件變化的value值為刪除標記#[DELETE]#
如果是刪除標記的話,代表是worker自動探測或者client需要刪除的指令。
如果是刪除標記則什么也不做,直接跳過(這里從HotKeyPusher#push方法可以看到,做刪除事件的操作時候,他會給/jd/hotkeys/+$appName的節點里面增加一個值為刪除標記的節點,然后再刪除相同路徑的節點,這樣就可以觸發上面的刪除節點事件,所以這里判斷如果是刪除標記直接跳過)。
不為刪除標記
發布ReceiveNewKeyEvent事件,事件內容HotKeyModel里面的createTime是kv對應的時間戳
疑問: 這里代碼注釋里面說只監聽手工添加或者刪除的hotKey,難道說/jd/hotkeys/+$appName地址只是手工配置的地址嗎? 解疑: 這里確實只監聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動配置hotKey,worker自動探測的hotKey是直接通過netty通道來告知client的
5.API解析
1)流程圖示 ① 查詢流程
② 刪除流程:
從上面的流程圖中,大家應該知道該熱點key在代碼中是如何扭轉的,這里再給大家講解一下核心API的源碼解析,限于篇幅的原因,咱們不一個個貼相關源碼了,只是單純的告訴你它的內部邏輯是怎么樣的。 2)核心類:JdHotKeyStore
JdHotKeyStore是封裝client調用的api核心類,包含上面10個公共方法,咱們重點解析其中6個重要方法: ① isHotKey(String key) 判斷是否在規則內,如果不在返回false 判斷是否是熱key,如果不是或者是且過期時間在2s內,則給TurnKeyCollector#collect收集 最后給TurnCountCollector#collect做統計收集 ② get(String key) 從本地caffeine取值 如果取到的value是個魔術值,只代表加入到caffeine緩存里面了,查詢的話為null ③ smartSet(String key, Object value) 判斷是否是熱key,這里不管它在不在規則內,如果是熱key,則給value賦值,如果不為熱key什么也不做 ④ forceSet(String key, Object value) 強制給value賦值 如果該key不在規則配置內,則傳遞的value不生效,本地緩存的賦值value會被變為null ⑤ getValue(String key, KeyType keyType) 獲取value,如果value不存在則調用HotKeyPusher#push方法發往netty 如果沒有為該key配置規則,就不用上報key,直接返回null 如果取到的value是個魔術值,只代表加入到caffeine緩存里面了,查詢的話為null ⑥ remove(String key) 刪除某key(本地的caffeine緩存),會通知整個集群刪除(通過etcd來通知集群刪除) 3)client上傳熱key入口調用類:HotKeyPusher 核心方法:
public static void push(String key, KeyType keyType, int count, boolean remove) {
if (count <= 0) {
count = 1;
}
if (keyType == null) {
keyType = KeyType.REDIS_KEY;
}
if (key == null) {
return;
}
//這里之所以用LongAdder是為了保證多線程計數的線程安全性,雖然這里是在方法內調用的,但是在TurnKeyCollector的兩個map里面,
//存儲了HotKeyModel的實例對象,這樣在多個線程同時修改count的計數屬性時,會存在線程安全計數不準確問題
LongAdder adderCnt = new LongAdder();
adderCnt.add(count);
HotKeyModel hotKeyModel = new HotKeyModel();
hotKeyModel.setAppName(Context.APP_NAME);
hotKeyModel.setKeyType(keyType);
hotKeyModel.setCount(adderCnt);
hotKeyModel.setRemove(remove);
hotKeyModel.setKey(key);
if (remove) {
//如果是刪除key,就直接發到etcd去,不用做聚合。但是有點問題現在,這個刪除只能刪手工添加的key,不能刪worker探測出來的
//因為各個client都在監聽手工添加的那個path,沒監聽自動探測的path。所以如果手工的那個path下,沒有該key,那么是刪除不了的。
//刪不了,就達不到集群監聽刪除事件的效果,怎么辦呢?可以通過新增的方式,新增一個熱key,然后刪除它
//TODO 這里為啥不直接刪除該節點,難道worker自動探測處理的hotKey不會往該節點增加新增事件嗎?
//釋疑:worker根據探測配置的規則,當判斷出某個key為hotKey后,確實不會往keyPath里面加入節點,他只是單純的往本地緩存里面加入一個空值,代表是熱點key
EtcdConfigFactory.configCenter().putAndGrant(HotKeyPathTool.keyPath(hotKeyModel), Constant.DEFAULT_DELETE_VALUE, 1);
EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyPath(hotKeyModel));//TODO 這里很巧妙待補充描述
//也刪worker探測的目錄
EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyRecordPath(hotKeyModel));
} else {
//如果key是規則內的要被探測的key,就積累等待傳送
if (KeyRuleHolder.isKeyInRule(key)) {
//積攢起來,等待每半秒發送一次
KeyHandlerFactory.getCollector().collect(hotKeyModel);
}
}
}
從上面的源碼中可知:
這里之所以用LongAdder是為了保證多線程計數的線程安全性,雖然這里是在方法內調用的,但是在TurnKeyCollector的兩個map里面,存儲了HotKeyModel的實例對象,這樣在多個線程同時修改count的計數屬性時,會存在線程安全計數不準確問題。
如果是remove刪除類型,在刪除手動配置的熱key配置路徑的同時,還會刪除dashboard展示熱key的配置路徑。
只有在規則配置的key,才會被積攢探測發送到worker內進行計算。
6.通訊機制(與worker交互)
1)NettyClient:netty連接器
public class NettyClient {
private static final NettyClient nettyClient = new NettyClient();
private Bootstrap bootstrap;
public static NettyClient getInstance() {
return nettyClient;
}
private NettyClient() {
if (bootstrap == null) {
bootstrap = initBootstrap();
}
}
private Bootstrap initBootstrap() {
//少線程
EventLoopGroup group = new NioEventLoopGroup(2);
Bootstrap bootstrap = new Bootstrap();
NettyClientHandler nettyClientHandler = new NettyClientHandler();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer
@Override
protected void initChannel(SocketChannel ch) {
ByteBuf delimiter = Unpooled.copiedBuffer(Constant.DELIMITER.getBytes());
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder(Constant.MAX_LENGTH, delimiter))//這里就是定義TCP多個包之間的分隔符,為了更好的做拆包
.addLast(new MsgDecoder())
.addLast(new MsgEncoder())
//30秒沒消息時,就發心跳包過去
.addLast(new IdleStateHandler(0, 0, 30))
.addLast(nettyClientHandler);
}
});
return bootstrap;
}
}
使用Reactor線程模型,只有2個工作線程,沒有單獨設置主線程
長連接,開啟TCP_NODELAY
netty的分隔符”$()$”,類似TCP報文分段的標準,方便拆包
Protobuf序列化與反序列化
30s沒有消息發給對端的時候,發送一個心跳包判活
工作線程處理器NettyClientHandler
JDhotkey的tcp協議設計就是收發字符串,每個tcp消息包使用特殊字符$()$來分割 優點:這樣實現非常簡單。 獲得消息包后進行json或者protobuf反序列化。 缺點:是需要,從字節流-》反序列化成字符串-》反序列化成消息對象,兩層序列化損耗了一部分性能。 protobuf還好序列化很快,但是json序列化的速度只有幾十萬每秒,會損耗一部分性能。 2)NettyClientHandler:工作線程處理器
@ChannelHandler.Sharable
public class NettyClientHandler extends SimpleChannelInboundHandler
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
//這里表示如果讀寫都掛了
if (idleStateEvent.state() == IdleState.ALL_IDLE) {
//向服務端發送消息
ctx.writeAndFlush(new HotKeyMsg(MessageType.PING, Context.APP_NAME));
}
}
super.userEventTriggered(ctx, evt);
}
//在Channel注冊EventLoop、綁定SocketAddress和連接ChannelFuture的時候都有可能會觸發ChannelInboundHandler的channelActive方法的調用
//類似TCP三次握手成功之后觸發
@Override
public void channelActive(ChannelHandlerContext ctx) {
JdLogger.info(getClass(), "channelActive:" + ctx.name());
ctx.writeAndFlush(new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME));
}
//類似TCP四次揮手之后,等待2MSL時間之后觸發(大概180s),比如channel通道關閉會觸發(channel.close())
//客戶端channel主動關閉連接時,會向服務端發送一個寫請求,然后服務端channel所在的selector會監聽到一個OP_READ事件,然后
//執行數據讀取操作,而讀取時發現客戶端channel已經關閉了,則讀取數據字節個數返回-1,然后執行close操作,關閉該channel對應的底層socket,
//并在pipeline中,從head開始,往下將InboundHandler,并觸發handler的channelInactive和channelUnregistered方法的執行,以及移除pipeline中的handlers一系列操作。
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
//斷線了,可能只是client和server斷了,但都和etcd沒斷。也可能是client自己斷網了,也可能是server斷了
//發布斷線事件。后續10秒后進行重連,根據etcd里的worker信息來決定是否重連,如果etcd里沒了,就不重連。如果etcd里有,就重連
notifyWorkerChange(ctx.channel());
}
private void notifyWorkerChange(Channel channel) {
EventBusCenter.getInstance().post(new ChannelInactiveEvent(channel));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HotKeyMsg msg) {
if (MessageType.PONG == msg.getMessageType()) {
JdLogger.info(getClass(), "heart beat");
return;
}
if (MessageType.RESPONSE_NEW_KEY == msg.getMessageType()) {
JdLogger.info(getClass(), "receive new key : " + msg);
if (CollectionUtil.isEmpty(msg.getHotKeyModels())) {
return;
}
for (HotKeyModel model : msg.getHotKeyModels()) {
EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
}
}
}
}
userEventTriggered
收到對端發來的心跳包,返回new HotKeyMsg(MessageType.PING, Context.APP_NAME)
channelActive
在Channel注冊EventLoop、綁定SocketAddress和連接ChannelFuture的時候都有可能會觸發ChannelInboundHandler的channelActive方法的調用
類似TCP三次握手成功之后觸發,給對端發送new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME)
channelInactive
類似TCP四次揮手之后,等待2MSL時間之后觸發(大概180s),比如channel通道關閉會觸發(channel.close())該方法,發布ChannelInactiveEvent事件,來10s后重連
channelRead0
接收PONG消息類型時,打個日志返回
接收RESPONSE_NEW_KEY消息類型時,發布ReceiveNewKeyEvent事件
3.3.3 worker端
1.入口啟動加載:7個@PostConstruct
1)worker端對etcd相關的處理:EtcdStarter ① 第一個@PostConstruct:watchLog()
@PostConstruct
public void watchLog() {
AsyncPool.asyncDo(() -> {
try {
//取etcd的是否開啟日志配置,地址/jd/logOn
String loggerOn = configCenter.get(ConfigConstant.logToggle);
LOGGER_ON = "true".equals(loggerOn) || "1".equals(loggerOn);
} catch (StatusRuntimeException ex) {
logger.error(ETCD_DOWN);
}
//監聽etcd地址/jd/logOn是否開啟日志配置,并實時更改開關
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.logToggle);
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List
KeyValue keyValue = eventList.get(0).getKv();
logger.info("log toggle changed : " + keyValue);
String value = keyValue.getValue().toStringUtf8();
LOGGER_ON = "true".equals(value) || "1".equals(value);
}
});
}
放到線程池里面異步執行
取etcd的是否開啟日志配置,地址/jd/logOn,默認true
監聽etcd地址/jd/logOn是否開啟日志配置,并實時更改開關
由于有etcd的監聽,所以會一直執行,而不是執行一次結束
② 第二個@PostConstruct:watch()
/**
* 啟動回調監聽器,監聽rule變化
*/
@PostConstruct
public void watch() {
AsyncPool.asyncDo(() -> {
KvClient.WatchIterator watchIterator;
if (isForSingle()) {
watchIterator = configCenter.watch(ConfigConstant.rulePath + workerPath);
} else {
watchIterator = configCenter.watchPrefix(ConfigConstant.rulePath);
}
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List
KeyValue keyValue = eventList.get(0).getKv();
logger.info("rule changed : " + keyValue);
try {
ruleChange(keyValue);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* rule發生變化時,更新緩存的rule
*/
private synchronized void ruleChange(KeyValue keyValue) {
String appName = keyValue.getKey().toStringUtf8().replace(ConfigConstant.rulePath, "");
if (StrUtil.isEmpty(appName)) {
return;
}
String ruleJson = keyValue.getValue().toStringUtf8();
List
KeyRuleHolder.put(appName, keyRules);
}
通過etcd.workerPath配置,來判斷該worker是否為某個app單獨服務的,默認為”default”,如果是默認值,代表該worker參與在etcd上所有app client的計算,否則只為某個app來服務計算 使用etcd來監聽rule規則變化,如果是共享的worker,監聽地址前綴為”/jd/rules/“,如果為某個app獨享,監聽地址為”/jd/rules/“+$etcd.workerPath 如果規則變化,則修改對應app在本地存儲的rule緩存,同時清理該app在本地存儲的KV緩存
KeyRuleHolder:rule緩存本地存儲
Map,>
相對于client的KeyRuleHolder的區別:worker是存儲所有app規則,每個app對應一個規則桶,所以用map
CaffeineCacheHolder:key緩存本地存儲
Map,>
相對于client的caffeine,第一是worker沒有做緩存接口比如LocalCache,第二是client的map的kv分別是超時時間、以及相同超時時間所對應key的緩存桶
放到線程池里面異步執行,由于有etcd的監聽,所以會一直執行,而不是執行一次結束
③ 第三個@PostConstruct:watchWhiteList()
/**
* 啟動回調監聽器,監聽白名單變化,只監聽自己所在的app,白名單key不參與熱key計算,直接忽略
*/
@PostConstruct
public void watchWhiteList() {
AsyncPool.asyncDo(() -> {
//從etcd配置中獲取所有白名單
fetchWhite();
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.whiteListPath + workerPath);
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
logger.info("whiteList changed ");
try {
fetchWhite();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
拉取并監聽etcd白名單key配置,地址為/jd/whiteList/+$etcd.workerPath
在白名單的key,不參與熱key計算,直接忽略
放到線程池里面異步執行,由于有etcd的監聽,所以會一直執行,而不是執行一次結束 ④ 第四個@PostConstruct:makeSureSelfOn()
/**
* 每隔一會去check一下,自己還在不在etcd里
*/
@PostConstruct
public void makeSureSelfOn() {
//開啟上傳worker信息
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
if (canUpload) {
uploadSelfInfo();
}
} catch (Exception e) {
//do nothing
}
}, 0, 5, TimeUnit.SECONDS);
}
在線程池里面異步執行,定時執行,時間間隔為5s
將本機woker的hostName,ip+port以kv的形式定時上報給etcd,地址為/jd/workers/+$etcd.workPath+”/“+$hostName,續期時間為8s
有一個canUpload的開關來控制worker是否向etcd來定時續期,如果這個開關關閉了,代表worker不向etcd來續期,這樣當上面地址的kv到期之后,etcd會刪除該節點,這樣client循環判斷worker信息變化了
2)將熱key推送到dashboard供入庫:DashboardPusher ① 第五個@PostConstruct:uploadToDashboard()
@Component
public class DashboardPusher implements IPusher {
/**
* 熱key集中營
*/
private static LinkedBlockingQueue
@PostConstruct
public void uploadToDashboard() {
AsyncPool.asyncDo(() -> {
while (true) {
try {
//要么key達到1千個,要么達到1秒,就匯總上報給etcd一次
List
Queues.drain(hotKeyStoreQueue, tempModels, 1000, 1, TimeUnit.SECONDS);
if (CollectionUtil.isEmpty(tempModels)) {
continue;
}
//將熱key推到dashboard
DashboardHolder.flushToDashboard(FastJsonUtils.convertObjectToJSON(tempModels));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
當熱key的數量達到1000或者每隔1s,把熱key的數據通過與dashboard的netty通道來發送給dashboard,數據類型為REQUEST_HOT_KEY
LinkedBlockingQueue
hotKeyStoreQueue:worker計算的給dashboard熱key的集中營,所有給dashboard推送熱key存儲在里面 3)推送到各客戶端服務器:AppServerPusher ① 第六個@PostConstruct:batchPushToClient()
public class AppServerPusher implements IPusher {
/**
* 熱key集中營
*/
private static LinkedBlockingQueue
/**
* 和dashboard那邊的推送主要區別在于,給app推送每10ms一次,dashboard那邊1s一次
*/
@PostConstruct
public void batchPushToClient() {
AsyncPool.asyncDo(() -> {
while (true) {
try {
List
//每10ms推送一次
Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);
if (CollectionUtil.isEmpty(tempModels)) {
continue;
}
Map
//拆分出每個app的熱key集合,按app分堆
for (HotKeyModel hotKeyModel : tempModels) {
List
oneAppModels.add(hotKeyModel);
}
//遍歷所有app,進行推送
for (AppInfo appInfo : ClientInfoHolder.apps) {
List
if (CollectionUtil.isEmpty(list)) {
continue;
}
HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.RESPONSE_NEW_KEY);
hotKeyMsg.setHotKeyModels(list);
//整個app全部發送
appInfo.groupPush(hotKeyMsg);
}
//推送完,及時清理不使用內存
allAppHotKeyModels = null;
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
會按照key的appName來進行分組,然后通過對應app的channelGroup來推送
當熱key的數量達到10或者每隔10ms,把熱key的數據通過與app的netty通道來發送給app,數據類型為RESPONSE_NEW_KEY
LinkedBlockingQueue
hotKeyStoreQueue:worker計算的給client熱key的集中營,所有給client推送熱key存儲在里面 4)client實例節點處理:NodesServerStarter ① 第七個@PostConstruct:start()
public class NodesServerStarter {
@Value("${netty.port}")
private int port;
private Logger logger = LoggerFactory.getLogger(getClass());
@Resource
private IClientChangeListener iClientChangeListener;
@Resource
private List
@PostConstruct
public void start() {
AsyncPool.asyncDo(() -> {
logger.info("netty server is starting");
NodesServer nodesServer = new NodesServer();
nodesServer.setClientChangeListener(iClientChangeListener);
nodesServer.setMessageFilters(messageFilters);
try {
nodesServer.startNettyServer(port);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
線程池里面異步執行,啟動client端的nettyServer
iClientChangeListener和messageFilters這兩個依賴最終會被傳遞到netty消息處理器里面,iClientChangeListener會作為channel下線處理來刪除ClientInfoHolder下線或者超時的通道,messageFilters會作為netty收到事件消息的處理過濾器(責任鏈模式) ② 依賴的bean:IClientChangeListener iClientChangeListener
public interface IClientChangeListener {
/**
* 發現新連接
*/
void newClient(String appName, String channelId, ChannelHandlerContext ctx);
/**
* 客戶端掉線
*/
void loseClient(ChannelHandlerContext ctx);
}
對客戶端的管理,新來(newClient)(會觸發netty的連接方法channelActive)、斷線(loseClient)(會觸發netty的斷連方法channelInactive())的管理 client的連接信息主要是在ClientInfoHolder里面
List
apps,這里面的AppInfo主要是appName和對應的channelGroup
對apps的add和remove主要是通過新來(newClient)、斷線(loseClient) ③ 依賴的bean:List
messageFilters
/**
* 對netty來的消息,進行過濾處理
* @author wuweifeng wrote on 2019-12-11
* @version 1.0
*/
public interface INettyMsgFilter {
boolean chain(HotKeyMsg message, ChannelHandlerContext ctx);
}
對client發給worker的netty消息,進行過濾處理,共有四個實現類,也就是說底下四個過濾器都是收到client發送的netty消息來做處理 ④ 各個消息處理的類型:MessageType
APP_NAME((byte) 1),
REQUEST_NEW_KEY((byte) 2),
RESPONSE_NEW_KEY((byte) 3),
REQUEST_HIT_COUNT((byte) 7), //命中率
REQUEST_HOT_KEY((byte) 8), //熱key,worker->dashboard
PING((byte) 4), PONG((byte) 5),
EMPTY((byte) 6);
順序1:HeartBeatFilter
當消息類型為PING,則給對應的client示例返回PONG
順序2:AppNameFilter
當消息類型為APP_NAME,代表client與worker建立連接成功,然后調用iClientChangeListener的newClient方法增加apps元數據信息
順序3:HotKeyFilter
處理接收消息類型為REQUEST_NEW_KEY
先給HotKeyFilter.totalReceiveKeyCount原子類增1,該原子類代表worker實例接收到的key的總數
publishMsg方法,將消息通過自建的生產者消費者模型(KeyProducer,KeyConsumer),來把消息給發到生產者中分發消費
接收到的消息HotKeyMsg里面List
首先判斷HotKeyModel里面的key是否在白名單內,如果在則跳過,否則將HotKeyModel通過KeyProducer發送
順序4:KeyCounterFilter
處理接收類型為REQUEST_HIT_COUNT
這個過濾器是專門給dashboard來匯算key的,所以這個appName直接設置為該worker配置的appName
該過濾器的數據來源都是client的NettyKeyPusher#sendCount(String appName, List
list),這里面的數據都是默認積攢10s的,這個10s是可以配置的,這一點在client里面有講
將構造的new KeyCountItem(appName, models.get(0).getCreateTime(), models)放到阻塞隊列LinkedBlockingQueue
COUNTER_QUEUE中,然后讓CounterConsumer來消費處理,消費邏輯是單線程的
CounterConsumer:熱key統計消費者
放在公共線程池中,來單線程執行
從阻塞隊列COUNTER_QUEUE里面取數據,然后將里面的key的統計數據發布到etcd的/jd/keyHitCount/+ appName + “/“ + IpUtils.getIp() + “-“ + System.currentTimeMillis()里面,該路徑是worker服務的client集群或者default,用來存放客戶端hotKey訪問次數和總訪問次數的path,然后讓dashboard來訂閱統計展示
2.三個定時任務:3個@Scheduled
1)定時任務1:EtcdStarter#pullRules()
/**
* 每隔1分鐘拉取一次,所有的app的rule
*/
@Scheduled(fixedRate = 60000)
public void pullRules() {
try {
if (isForSingle()) {
String value = configCenter.get(ConfigConstant.rulePath + workerPath);
if (!StrUtil.isEmpty(value)) {
List
KeyRuleHolder.put(workerPath, keyRules);
}
} else {
List
for (KeyValue keyValue : keyValues) {
ruleChange(keyValue);
}
}
} catch (StatusRuntimeException ex) {
logger.error(ETCD_DOWN);
}
}
每隔1分鐘拉取一次etcd地址為/jd/rules/的規則變化,如果worker所服務的app或者default的rule有變化,則更新規則的緩存,并清空該appName所對應的本地key緩存 2)定時任務2:EtcdStarter#uploadClientCount()
/**
* 每隔10秒上傳一下client的數量到etcd中
*/
@Scheduled(fixedRate = 10000)
public void uploadClientCount() {
try {
String ip = IpUtils.getIp();
for (AppInfo appInfo : ClientInfoHolder.apps) {
String appName = appInfo.getAppName();
int count = appInfo.size();
//即便是full gc也不能超過3秒,因為這里給的過期時間是13s,由于該定時任務每隔10s執行一次,如果full gc或者說上報給etcd的時間超過3s,
//則在dashboard查詢不到client的數量
configCenter.putAndGrant(ConfigConstant.clientCountPath + appName + "/" + ip, count + "", 13);
}
configCenter.putAndGrant(ConfigConstant.caffeineSizePath + ip, FastJsonUtils.convertObjectToJSON(CaffeineCacheHolder.getSize()), 13);
//上報每秒QPS(接收key數量、處理key數量)
String totalCount = FastJsonUtils.convertObjectToJSON(new TotalCount(HotKeyFilter.totalReceiveKeyCount.get(), totalDealCount.longValue()));
configCenter.putAndGrant(ConfigConstant.totalReceiveKeyCount + ip, totalCount, 13);
logger.info(totalCount + " expireCount:" + expireTotalCount + " offerCount:" + totalOfferCount);
//如果是穩定一直有key發送的應用,建議開啟該監控,以避免可能發生的網絡故障
if (openMonitor) {
checkReceiveKeyCount();
}
// configCenter.putAndGrant(ConfigConstant.bufferPoolPath + ip, MemoryTool.getBufferPool() + "", 10);
} catch (Exception ex) {
logger.error(ETCD_DOWN);
}
}
每個10s將worker計算存儲的client信息上報給etcd,來方便dashboard來查詢展示,比如/jd/count/對應client數量,/jd/caffeineSize/對應caffeine緩存的大小,/jd/totalKeyCount/對應該worker接收的key總量和處理的key總量
可以從代碼中看到,上面所有etcd的節點租期時間都是13s,而該定時任務是每10s執行一次,意味著如果full gc或者說上報給etcd的時間超過3s,則在dashboard查詢不到client的相關匯算信息
長時間不收到key,判斷網絡狀態不好,斷開worker給etcd地址為/jd/workers/+$workerPath節點的續租,因為client會循環判斷該地址的節點是否變化,使得client重新連接worker或者斷開失聯的worker 3)定時任務3:EtcdStarter#fetchDashboardIp()
/**
* 每隔30秒去獲取一下dashboard的地址
*/
@Scheduled(fixedRate = 30000)
public void fetchDashboardIp() {
try {
//獲取DashboardIp
List
//是空,給個警告
if (CollectionUtil.isEmpty(keyValues)) {
logger.warn("very important warn !!! Dashboard ip is null!!!");
return;
}
String dashboardIp = keyValues.get(0).getValue().toStringUtf8();
NettyClient.getInstance().connect(dashboardIp);
} catch (Exception e) {
e.printStackTrace();
}
}
每隔30s拉取一次etcd前綴為/jd/dashboard/的dashboard連接ip的值,并且判斷DashboardHolder.hasConnected里面是否為未連接狀態,如果是則重新連接worker與dashboard的netty通道
3.自建的生產者消費者模型(KeyProducer,KeyConsumer)
一般生產者消費者模型包含三大元素:生產者、消費者、消息存儲隊列 這里消息存儲隊列是DispatcherConfig里面的QUEUE,使用LinkedBlockingQueue,默認大小為200W 1)KeyProducer
@Component
public class KeyProducer {
public void push(HotKeyModel model, long now) {
if (model == null || model.getKey() == null) {
return;
}
//5秒前的過時消息就不處理了
if (now - model.getCreateTime() > InitConstant.timeOut) {
expireTotalCount.increment();
return;
}
try {
QUEUE.put(model);
totalOfferCount.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
判斷接收到的HotKeyModel是否超出”netty.timeOut”配置的時間,如果是將expireTotalCount紀錄過期總數給自增,然后返回 2)KeyConsumer
public class KeyConsumer {
private IKeyListener iKeyListener;
public void setKeyListener(IKeyListener iKeyListener) {
this.iKeyListener = iKeyListener;
}
public void beginConsume() {
while (true) {
try {
//從這里可以看出,這里的生產者消費者模型,本質上還是拉模式,之所以不使用EventBus,是因為需要隊列來做緩沖
HotKeyModel model = QUEUE.take();
if (model.isRemove()) {
iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);
} else {
iKeyListener.newKey(model, KeyEventOriginal.CLIENT);
}
//處理完畢,將數量加1
totalDealCount.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Override
public void removeKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
//cache里的key,appName+keyType+key
String key = buildKey(hotKeyModel);
hotCache.invalidate(key);
CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);
//推送所有client刪除
hotKeyModel.setCreateTime(SystemClock.now());
logger.info(DELETE_KEY_EVENT + hotKeyModel.getKey());
for (IPusher pusher : iPushers) {
//這里可以看到,刪除熱key的netty消息只給client端發了過去,沒有給dashboard發過去(DashboardPusher里面的remove是個空方法)
pusher.remove(hotKeyModel);
}
}
@Override
public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
//cache里的key
String key = buildKey(hotKeyModel);
//判斷是不是剛熱不久
//hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內不重復處理相同的hotKey。
//畢竟hotKey都是瞬時流量,可以避免在這5s內重復推送給client和dashboard,避免無效的網絡開銷
Object o = hotCache.getIfPresent(key);
if (o != null) {
return;
}
//********** watch here ************//
//該方法會被InitConstant.threadCount個線程同時調用,存在多線程問題
//下面的那句addCount是加了鎖的,代表給Key累加數量時是原子性的,不會發生多加、少加的情況,到了設定的閾值一定會hot
//譬如閾值是2,如果多個線程累加,在沒hot前,hot的狀態肯定是對的,譬如thread1 加1,thread2加1,那么thread2會hot返回true,開啟推送
//但是極端情況下,譬如閾值是10,當前是9,thread1走到這里時,加1,返回true,thread2也走到這里,加1,此時是11,返回true,問題來了
//該key會走下面的else兩次,也就是2次推送。
//所以出現問題的原因是hotCache.getIfPresent(key)這一句在并發情況下,沒return掉,放了兩個key+1到addCount這一步時,會有問題
//測試代碼在TestBlockQueue類,直接運行可以看到會同時hot
//那么該問題用解決嗎,NO,不需要解決,1 首先要發生的條件極其苛刻,很難觸發,以京東這樣高的并發量,線上我也沒見過觸發連續2次推送同一個key的
//2 即便觸發了,后果也是可以接受的,2次推送而已,毫無影響,客戶端無感知。但是如果非要解決,就要對slidingWindow實例加鎖了,必然有一些開銷
//所以只要保證key數量不多計算就可以,少計算了沒事。因為熱key必然頻率高,漏計幾次沒事。但非熱key,多計算了,被干成了熱key就不對了
SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);//從這里可知,每個app的每個key都會對應一個滑動窗口
//看看hot沒
boolean hot = slidingWindow.addCount(hotKeyModel.getCount());
if (!hot) {
//如果沒hot,重新put,cache會自動刷新過期時間
CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).put(key, slidingWindow);
} else {
//這里之所以放入的value為1,是因為hotCache是用來專門存儲剛生成的hotKey
//hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內不重復處理相同的hotKey。
//畢竟hotKey都是瞬時流量,可以避免在這5s內重復推送給client和dashboard,避免無效的網絡開銷
hotCache.put(key, 1);
//刪掉該key
//這個key從實際上是專門針對slidingWindow的key,他的組合邏輯是appName+keyType+key,而不是給client和dashboard推送的hotKey
CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);
//開啟推送
hotKeyModel.setCreateTime(SystemClock.now());
//當開關打開時,打印日志。大促時關閉日志,就不打印了
if (EtcdStarter.LOGGER_ON) {
logger.info(NEW_KEY_EVENT + hotKeyModel.getKey());
}
//分別推送到各client和etcd
for (IPusher pusher : iPushers) {
pusher.push(hotKeyModel);
}
}
}
“thread.count”配置即為消費者個數,多個消費者共同消費一個QUEUE隊列 生產者消費者模型,本質上還是拉模式,之所以不使用EventBus,是因為需要隊列來做緩沖 根據HotKeyModel里面是否是刪除消息類型
刪除CaffeineCacheHolder里面對應newkey的滑動窗口緩存。
向該hotKeyModel對應的app的client推送netty消息,表示新產生hotKey,使得client本地緩存,但是推送的netty消息只代表為熱key,client本地緩存不會存儲key對應的value值,需要調用JdHotKeyStore里面的api來給本地緩存的value賦值
向dashboard推送hotKeyModel,表示新產生hotKey
刪除消息類型
根據HotKeyModel里面的appName+keyType+key的名字,來構建caffeine里面的newkey,該newkey在caffeine里面主要是用來與slidingWindow滑動時間窗對應
刪除hotCache里面newkey的緩存,放入的緩存kv分別是newKey和1,hotCache作用是用來存儲該生成的熱key,hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內不重復處理相同的hotKey。畢竟hotKey都是瞬時流量,可以避免在這5s內重復推送給client和dashboard,避免無效的網絡開銷
刪除CaffeineCacheHolder里面對應appName的caffeine里面的newKey,這里面存儲的是slidingWindow滑動窗口
推送給該HotKeyModel對應的所有client實例,用來讓client刪除該HotKeyModel
非刪除消息類型
根據HotKeyModel里面的appName+keyType+key的名字,來構建caffeine里面的newkey,該newkey在caffeine里面主要是用來與slidingWindow滑動時間窗對應
通過hotCache來判斷該newkey是否剛熱不久,如果是則返回
根據滑動時間窗口來計算判斷該key是否為hotKey(這里可以學習一下滑動時間窗口的設計),并返回或者生成該newKey對應的滑動窗口
如果沒有達到熱key的標準
通過CaffeineCacheHolder重新put,cache會自動刷新過期時間
如果達到了熱key標準
向hotCache里面增加newkey對應的緩存,value為1表示剛為熱key。
3)計算熱key滑動窗口的設計 限于篇幅的原因,這里就不細談了,直接貼出項目作者對其寫的說明文章:Java簡單實現滑動窗口
3.3.4 dashboard端
這個沒啥可說的了,就是連接etcd、mysql,增刪改查,不過京東的前端框架很方便,直接返回list就可以成列表。
4 總結
文章第二部分為大家講解了redis數據傾斜的原因以及應對方案,并對熱點問題進行了深入,從發現熱key到解決熱key的兩個關鍵問題的總結。 文章第三部分是熱key問題解決方案——JD開源hotkey的源碼解析,分別從client端、worker端、dashboard端來進行全方位講解,包括其設計、使用及相關原理。 希望通過這篇文章,能夠使大家不僅學習到相關方法論,也能明白其方法論具體的落地方案,一起學習,一起成長。
審核編輯:湯梓紅
-
開源
+關注
關注
3文章
3402瀏覽量
42711 -
Redis
+關注
關注
0文章
378瀏覽量
10942
原文標題:Redis數據傾斜與JD開源hotkey源碼分析揭秘
文章出處:【微信號:OSC開源社區,微信公眾號:OSC開源社區】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論