介紹
kafka是一個比較流行的分布式、可拓展、高性能、可靠的流處理平臺。在處理kafka的數據時,這里有確保處理效率和可靠性的多種最佳實踐。本文將介紹這幾種實踐方式,并通過sarama實現他們。
以下是一些kafka消費的最佳實踐:
選擇合適的提交策略:Kafka提供兩種提交策略,自動和手動。雖然自動操作很容易使用,但它可能會導致數據丟失或重復。手動提交提供了更高級別的控制,確保消息至少處理一次或恰好一次,具體取決于用例。
盡可能減少Kafka的傳輸次數:大批量讀取消息可以顯著提高吞吐量。這可以通過調整 fetch.min.bytes 和 fetch.max.wait.ms 等參數來實現。
盡可能使用消費者組:Kafka允許多個消費者組成一個消費者組來并行消費數據。這使得 Kafka 能夠將數據分發給一個組中的所有消費者,從而實現高效的數據消費。
調整消費者緩沖區大小:通過調整消費者的緩沖區大小,如 receive.buffer.bytes 和 max.partition.fetch.bytes,可以根據消息的預期大小和消費者的內存容量進行調整。這可以提高消費者的表現。
處理rebalance:當新的消費者加入消費者組,或者現有的消費者離開時,Kafka會觸發rebalance以重新分配負載。在此過程中,消費者停止消費數據。因此,快速有效地處理重新平衡可以提高整體吞吐量。
監控消費者:使用 Kafka 的消費者指標來監控消費者的性能。定期監控可以幫助我們識別性能瓶頸并調整消費者的配置。
選擇合適的提交策略
1.自動提交
Sarama 的 ConsumerGroup 默認情況下會自動提交偏移量。這意味著它會定期提交已成功消費的消息的偏移量,這允許消費者在重新啟動或消費失敗時從中斷的地方繼續。
下面是一個自動提交的消費者組消費消息的例子:
config := sarama.NewConfig() config.Version = sarama.V2_0_0_0 config.Consumer.Offsets.AutoCommit.Enable = true config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second ConsumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { log.Panicf( "創建消費者組客戶端時出錯: %v" , err) } Consumer := Consumer{} ctx := context.Background() for { err := ConsumerGroup.Consume(ctx, [] string {topic}, Consumer) if err != nil { log.Panicf( "來自消費者的錯誤: %v" , err) } }
根據config.Consumer.Offsets.AutoCommit.Interval可以看到,消費者會每秒自動提交offset。
2. 手動提交
手動提交使我們更好地控制何時提交消息偏移量。下面是一個手動提交的消費者組消費消息的例子:
config := sarama.NewConfig() config.Version = sarama.V2_0_0_0 config.Consumer.Offsets.AutoCommit.Enable = false consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID , config) if err != nil { log.Panicf( "創建消費者組客戶端時出錯: %v" , err) } Consumer := Consumer{} ctx := context.Background() for { err := ConsumerGroup.Consume( ctx, [] string {topic}, Consumer) if err != nil { log.Panicf( "Error from Consumer: %v" , err) } } type Consumer struct {} func (consumer Consumer) Setup (_ sarama.ConsumerGroupSession) error { return nil } func (consumer Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (consumer Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { for msg : = range Claim.Messages() { fmt.Printf( "Message topic:%q partition:%d offset:%d " , msg.Topic, msg.Partition, msg.Offset) sess.MarkMessage(msg, "" ) } return nil }
在該示例中, 使用MarkMessage手動將消息標記為已處理,最終根據Consumer.Offsets.CommitInterval配置提交。另外這個例子省略了錯誤處理部分,開發時需要注意正確處理生產過程中出現的錯誤。
譯者注:這篇文章雖然是今年5月發布,但是這里的提交方式還是有些過時了,目前sarama已經廢棄了Consumer.Offsets.CommitInterval,相關配置目前在Consumer.Offsets.AutoCommit
盡可能減少Kafka的傳輸次數
減少kafka的傳輸次數可以通過優化從kafka中讀取和寫入數據的方式來實現:
1. 增加批次的大小
使用kafka批量發送消息的效果優于逐個發送消息,批次越大,kafka發送數據效率就越高。但是需要權衡延遲和吞吐量之間的關系。較大的批次雖然代表著更大的吞吐量,但也會增加延遲。因為批次越大,填充批次的時間也越久。
在Go中,我們可以在使用sarama包生成消息時設置批次大小:
config := sarama.NewConfig() config.Producer.Flush.Bytes = 1024 * 1024
以及獲取消息的批次大小
config := sarama.NewConfig() config.Consumer.Fetch.Default = 1024 * 1024
2. 使用長輪詢
長輪詢是指消費者輪詢時如果Kafka中沒有數據,則消費者將等待數據到達。這減少了往返次數,因為消費者不需要在沒有數據時不斷請求數據。
config := sarama.NewConfig() config .Consumer.MaxWaitTime = 500 *time.Millisecond
該配置告訴消費者在返回之前會等待500毫秒
3. 盡可能使用消費者組
消費者組是一組協同工作消費來自kafka主題的消息的消費者。消費者組允許我們在多個消費者之間分配消息,從而提供橫向拓展能力。使用消費者組時,kafka負責將分區分配給組中的消費者,并確保每個分區同時僅被一個消費者消費。
接下來是sarama中消費者組的使用:
使用消費者組需要實現一個ConsumerGroupHandler接口:
該接口具有三個方法:Setup、Cleanup、 和ConsumeClaim
type exampleConsumerGroupHandler struct { } func (h *exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (h *exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h *exampleConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { for message := range Claim.Messages() { fmt.Printf( "Message: %s " , string (message.Value)) session.MarkMessage(message, "" ) } 返回 nil }
創建sarama.ConsumerGroup并開始消費:
brokers := []string{"localhost:9092"} topic := "example_topic" groupID := "example_consumer_group" consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { log.Fatalf("Error creating consumer group: %v", err) } defer consumerGroup.Close() handler := &exampleConsumerGroupHandler{} for { err := consumerGroup.Consume(context.Background(), []string{topic}, handler) if err != nil { log.Printf("Error consuming messages: %v", err) } }
該示例設置了一個消費組,用于消費來自“example_topic”的消息。消費者組可以通過添加更多消費者來提高處理能力。
使用消費者組時,記得處理消費期間rebalance和錯誤。
調整消費者緩沖區大小
在sarama中,我們可以調整消費者緩沖區的大小,以調整消費者在處理消息之前可以在內存中保存的消息數量。
默認情況下,緩沖區大小設置為256,這代表Sarama在開始處理消息之前將在內存中保存最多256條消息。如果消費者速度很慢,增加緩沖區大小可能有助于提高吞吐量。但是,更大的緩沖區也會消耗更多的內存。
以下是如何增加緩沖區大小的例子:
config := sarama.NewConfig() config.Consumer.Return.Errors = true config.Version = sarama.V2_1_0_0 config.Consumer.Offsets.Initial = sarama.OffsetOldest config.ChannelBufferSize = 500 group, err := sarama.NewConsumerGroup([]string{broker}, groupID, config) if err != nil { panic(err) } ctx := context.Background() for { topics := []string{topic} handler := exampleConsumerGroupHandler{} err := group.Consume(ctx, topics, &handler) if err != nil { panic(err) } }
處理rebalance
當新消費者添加到消費者組或現有消費者離開消費者組時,kafka會重新平衡該組中的消費者。rebalance是kafka確保消費者組中的所有消費者不會消費同一分區的保證。
在sarama中,處理rebalance是通過 Setup 和CleanUp函數來完成的。
通過正確處理重新平衡事件,您可以確保應用程序正常處理消費者組的更改,例如消費者離開或加入,并且在這些事件期間不會丟失或處理兩次消息。
譯者注:其實更重要的是在ConsumeClaim函數在通道關閉時盡早退出,才能正確的進入CleanUp函數。
監控消費者
監控Kafka消費者對于確保系統的健康和性能至關重要,我們需要時刻關注延遲、處理時間和錯誤率的指標。
Golang沒有內置對 Kafka 監控的支持,但有幾個庫和工具可以幫助我們。讓我們看一下其中的一些:
Sarama的Metrics:Sarama 提供了一個指標注冊表,它報告了有助于監控的各種指標,例如請求、響應的數量、請求和響應的大小等。這些指標可以使用 Prometheus 等監控系統來收集和監控。
JMX Exporter:如果您在 JVM 上運行 Kafka, 則可以使用 JMX Exporter 將kafka的 MBeans 發送給Prometheus
Kafka Exporter:Kafka Exporter是一個第三方工具,可以提供有關Kafka的更詳細的指標。它可以提供消費者組延遲,這是消費kafka消息時要監控的關鍵指標。
Jaeger 或 OpenTelemetry:這些工具可用于分布式追蹤,這有助于追蹤消息如何流經系統以及可能出現瓶頸的位置。
日志:時刻關注應用程序日志,記錄消費者中的任何錯誤或異常行為。這些日志可以幫助我們診斷問題。
消費者組命令, 可以使用kafka-consumer-groups命令來描述消費者組的狀態。
請記住,不僅要追蹤這些指標,還要針對任何需要關注的場景設置警報。通過這些方法,我們可以在問題還在初始階段時快速做出響應。
以上工作有助于確保使用kafka的應用程序健壯、可靠且高效。
審核編輯:湯梓紅
-
數據
+關注
關注
8文章
7145瀏覽量
89584 -
參數
+關注
關注
11文章
1860瀏覽量
32428 -
kafka
+關注
關注
0文章
52瀏覽量
5244
原文標題:golang中使用kafka的綜合指南
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論