友快網

導航選單

唯品會flink容器化實踐應用以及產品化經驗flink的容器化實踐應用

唯品會自 2017 年開始基於 k8s 深入打造高效能、穩定、可靠、易用的實時計算平臺,支援唯品會內部業務在平時以及大促的平穩執行。現平臺支援 Flink、Spark、Storm 等主流框架。本文主要分享 Flink 的容器化實踐應用以及產品化經驗。內容包括:

1。發展概覽

2。Flink 容器化實踐

3。Flink SQL 平臺化建設

4。應用案例

5。未來規劃

一 、發展概覽

平臺支援公司內部所有部門的實時計算應用。主要的業務包括實時大屏、推薦、實驗平臺、實時監控和實時資料清洗等。

1。1 叢集規模

平臺現有異地雙機房雙叢集,具有 2000 多的物理機節點,利用 k8s 的 namespaces,labels 和 taints 等,實現業務隔離以及初步的計算負載隔離。目前線上實時應用有大概 1000 個,平臺最近主要支援 Flink SQL 任務的上線。

1。2 平臺架構

上圖是唯品會實時計算平臺的整體架構。

最底層是計算任務節點的資源排程層,實際是以 deployment 的模式執行在 k8s 上,平臺雖然支援 yarn 排程,但是 yarn 排程是與批任務共享資源,所以主流任務還是執行在 k8s 上。

儲存層這一層,支援公司內部基於 kafka 實時資料 vms,基於 binlog 的 vdp 資料和原生 kafka 作為訊息匯流排,狀態儲存在 hdfs 上,資料主要存入 redis,mysql,hbase,kudu,clickhouse 等。

計算引擎層,平臺支援 Flink,Spark,Storm 主流框架容器化,提供了一些框架的封裝和元件等。每個框架會都會支援幾個版本的映象滿足不同的業務需求。

平臺層提供作業配置、排程、版本管理、容器監控、job 監控、告警、日誌等功能,提供多租戶的資源管理(quota,label 管理),提供 kafka 監控。在 Flink 1。11 版本之前,平臺自建元資料管理系統為 Flink SQL 管理 schema,1。11 版本開始,透過 hive metastore 與公司元資料管理系統融合。

最上層就是各個業務的應用層。

二、Flink 容器化實踐

2。1 容器化實踐

上圖是實時平臺 Flink 容器化的架構。Flink 容器化是基於 standalone 模式部署的。

部署模式共有 client,jobmanager 和 taskmanager 三個角色,每一個角色都由一個 deployment 控制。

使用者透過平臺上傳任務 jar 包,配置等,儲存於 hdfs 上。同時由平臺維護的配置,依賴等也儲存在 hdfs 上,當 pod 啟動時,會進行拉取等初始化操作。

client 中主程序是一個由 go 開發的 agent,當 client 啟動時,會首先檢查叢集狀態,當叢集 ready 後,從 hdfs 上拉取 jar 包向 Flink 叢集提交任務。同時,client 的主要功能還有監控任務狀態,做 savepoint 等操作。

透過部署在每臺物理機上的 smart - agent 採集容器的指標寫入 m3,以及透過 Flink 暴漏的介面將 metrics 寫入 prometheus,結合 grafana 展示。同樣透過部署在每臺物理機上的 vfilebeat 採集掛載出來的相關日誌寫入 es,在 dragonfly 可以實現日誌檢索。

Flink 平臺化

在實踐過程中,結合具體場景以及易用性考慮,做了平臺化工作。

平臺的任務配置與映象,Flink 配置,自定義元件等解耦合,現階段平臺支援 1。7、1。9、1。11、1。12 等版本。

平臺支援流水線編譯或上傳 jar、作業配置、告警配置、生命週期管理等,從而減少使用者的開發成本。

平臺開發了容器級別的如火焰圖等調優診斷的頁面化功能,以及登陸容器的功能,支援使用者進行作業診斷。

Flink 穩定性

在應用部署和執行過程中,不可避免的會出現異常。以下是平臺保證任務在出現異常狀況後的穩定性做的策略。

pod 的健康和可用,由 livenessProbe 和 readinessProbe 檢測,同時指定 pod 的重啟策略。

Flink 任務異常時:

1。Flink 原生的 restart 策略和 failover 機制,作為第一層的保證。2。在 client 中會定時監控 Flink 狀態,同時將最新的 checkpoint 地址更新到自己的快取中,並彙報到平臺,固化到 MySQL 中。當 Flink 無法再重啟時,由 client 重新從最新的成功 checkpoint 提交任務。作為第二層保證。這一層將 checkpoint 固化到 MySQL 中後,就不再使用 Flink HA 機制了,少了 zk 的元件依賴。3。當前兩層無法重啟時或叢集出現異常時,由平臺自動從固化到 MySQL 中的最新 chekcpoint 重新拉起一個叢集,提交任務,作為第三層保證。

機房容災:使用者的 jar 包,checkpoint 都做了異地雙 HDFS 儲存異地雙機房雙叢集

2。2 kafka 監控方案

kafka 監控是我們的任務監控裡相對重要的一部分,整體監控流程如下所示。

平臺提供監控 kafka 堆積,消費 message 等配置資訊,從 MySQL 中將使用者 kafka 監控配置提取後,透過 jmx 監控 kafka,寫入下游 kafka,再透過另一個 Flink 任務實時監控,同時將這些資料寫入 ck,從而展示給使用者。

三、Flink SQL 平臺化建設

基於 k8s 的 Flink 容器化實現以後,方便了 Flink api 應用的釋出,但是對於 Flink SQL 的任務仍然不夠便捷。於是平臺提供了更加方便的線上編輯釋出、SQL 管理等一棧式開發平臺。

3。1 Flink SQL 方案

平臺的 Flink SQL 方案如上圖所示,任務釋出系統與元資料管理系統完全解耦。

Flink SQL 任務釋出平臺化

在實踐過程中,結合易用性考慮,做了平臺化工作,主操作介面如下圖所示:

Flink SQL 的版本管理,語法校驗,拓撲圖管理等;

UDF 通用和任務級別的管理,支援使用者自定義 UDF;

提供引數化的配置介面,方便使用者上線任務。

元資料管理

平臺在 1。11 之前透過構建自己的元資料管理系統 UDM,MySQL 儲存 kafka,redis 等 schema,透過自定義 catalog 打通 Flink 與 UDM,從而實現元資料管理。1。11 之後,Flink 整合 hive 逐漸完善,平臺重構了 FlinkSQL 框架,透過部署一個 SQL - gateway service 服務,中間呼叫自己維護的 SQL - client jar 包,從而與離線元資料打通,實現了實時離線元資料統一,為之後的流批一體做好工作。在元資料管理系統建立的 Flink 表操作介面如下所示,建立 Flink 表的元資料,持久化到 hive裡,Flink SQL 啟動時從 hive 裡讀取對應表的 table schema 資訊。

3。2 Flink SQL 相關實踐

平臺對於官方原生支援或者不支援的 connector 進行整合和開發,映象和 connector,format 等相關依賴進行解耦,可以快捷的進行更新與迭代。

FLINK SQL 相關實踐

connector 層,現階段平臺支援官方支援的 connector,並且構建了 redis,kudu,clickhouse,vms,vdp 等平臺內部的 connector。平臺構建了內部的 pb format,支援 protobuf 實時清洗資料的讀取。平臺構建了 kudu,vdp 等內部 catalog,支援直接讀取相關的 schema,不用再建立 ddl。

平臺層主要是在 UDF、常用執行引數調整、以及升級 hadoop3。

runntime 層主要是支援拓撲圖執行計劃修改、維表關聯 keyBy cache 最佳化等

拓撲圖執行計劃修改

針對現階段 SQL 生成的 stream graph 並行度無法修改等問題,平臺提供可修改的拓撲預覽修改相關引數。平臺會將解析後的 FlinkSQL 的 excution plan json 提供給使用者,利用 uid 保證運算元的唯一性,修改每個運算元的並行度,chain 策略等,也為使用者解決反壓問題提供方法。例如針對 clickhouse sink 小併發大批次的場景,我們支援修改 clickhouse sink 並行度,source 並行度 = 72,sink 並行度 = 24,提高 clickhouse sink tps。

維表關聯 keyBy 最佳化 cache

針對維表關聯的情況,為了降低 IO 請求次數,降低維表資料庫讀壓力,從而降低延遲,提高吞吐,有以下幾種措施:

當維表資料量不大時,透過全量維表資料快取在本地,同時 ttl 控制快取重新整理的時候,這可以極大的降低 IO 請求次數,但會要求更多的記憶體空間。

當維表資料量很大時,透過 async 和 LRU cache 策略,同時 ttl 和 size 來控制快取資料的失效時間和快取大小,可以提高吞吐率並降低資料庫的讀壓力。

當維表資料量很大同時主流 qps 很高時,可以開啟把維表 join 的 key 作為 hash 的條件,將資料進行分割槽,即在 calc 節點的分割槽策略是 hash,這樣下游運算元的 subtask 的維表資料是獨立的,不僅可以提高命中率,也可降低記憶體使用空間。

最佳化之前維表關聯 LookupJoin 運算元和正常運算元 chain 在一起。

最佳化之間維表關聯 LookupJoin 運算元和正常運算元不 chain 在一起,將 join key 作為 hash 策略的 key。採用這種方式最佳化之後,例如原先 3000W 資料量的維表,10 個 TM 節點,每個節點都要快取 3000W 的資料,總共需要快取 3000W * 10 = 3 億的量。而經過 keyBy 最佳化之後,每個 TM 節點只需要快取 3000W / 10 = 300W 的資料量,總共快取的資料量只有 3000W,大大減少快取資料量。

維表關聯延遲 join

維表關聯中,有很多業務場景,在維表資料新增資料之前,主流資料已經發生 join 操作,會出現關聯不上的情況。因此,為了保證資料的正確,將關聯不上的資料進行快取,進行延遲 join。

最簡單的做法是,在維表關聯的 function 裡設定重試次數和重試間隔,這個方法會增大整個流的延遲,但主流 qps 不高的情況下,可以解決問題。

增加延遲 join 的運算元,當 join 維表未關聯時,先快取起來,根據設定重試次數和重試間隔從而進行延遲的 join。

四、應用案例

4。1 實時數倉

實時資料入倉

流量資料一級 kafka 透過實時清洗之後,寫到二級清洗 kafka,主要是 protobuf 格式,再透過 Flink SQL 寫入 hive 5min 表,以便做後續的準實時 ETL,加速 ods 層資料來源的準備時間。

MySQL 業務庫的資料,透過 VDP 解析形成 binlog cdc 訊息流,再透過 Flink SQL 寫入 hive 5min 表。

業務系統透過 VMS API 產生業務 kafka 訊息流,透過 Flink SQL 解析之後寫入 hive 5min 表。支援 string、json、csv 等訊息格式。

使用 Flink SQL 做流式資料入倉,非常的方便,而且 1。12 版本已經支援了小檔案的自動合併,解決了小檔案的痛點。

我們自定義分割槽提交策略,當前分割槽 ready 時候會調一下實時平臺的分割槽提交 api,在離線排程定時排程透過這個 api 檢查分割槽是否 ready。

採用 Flink SQL 統一入倉方案以後,我們可以獲得的收益:可解決以前 Flume 方案不穩定的問題,而且使用者可自助入倉,大大降低入倉任務的維護成本。提升了離線數倉的時效性,從小時級降低至 5min 粒度入倉。

實時指標計算

實時應用消費清洗後 kafka,透過 redis 維表、api 等方式關聯,再透過 Flink window 增量計算 UV,持久化寫到 Hbase 裡。

實時應用消費 VDP 訊息流之後,透過 redis 維表、api 等方式關聯,再透過 Flink SQL 計算出銷售額等相關指標,增量 upsert 到 kudu 裡,方便根據 range 分割槽批次查詢,最終透過資料服務對實時大屏提供最終服務。

以往指標計算通常採用 Storm 方式,需要透過 api 定製化開發,採用這樣 Flink 方案以後,我們可以獲得的收益:將計算邏輯切到 Flink SQL 上,降低計算任務口徑變化快,修改上線週期慢等問題。切換至 Flink SQL 可以做到快速修改,快速上線,降低維護成本。

實時離線一體化 ETL 資料整合

Flink SQL 在最近的版本中持續強化了維表 join 的能力,不僅可以實時關聯資料庫中的維表資料,現在還能關聯 Hive 和 Kafka 中的維表資料,能靈活滿足不同工作負載和時效性的需求。

基於 Flink 強大的流式 ETL 的能力,我們可以統一在實時層做資料接入和資料轉換,然後將明細層的資料迴流到離線數倉中。

我們透過將 presto 內部使用的 HyperLogLog ( 後面簡稱 HLL ) 實現引入到 Spark UDAF 函數里,打通 HLL 物件在 Spark SQL 與 presto 引擎之間的互通,如 Spark SQL 透過 prepare 函式生成的 HLL 物件,不僅可以在 Spark SQL 裡 merge 查詢而且可以在 presto 裡進行 merge 查詢。具體流程如下:

UV 近似計算示例:

Step 1: Spark SQL 生成 HLL 物件

insert overwrite dws_goods_uv partition (dt=‘${dt}’,hm=‘${hm}’) AS select goods_id, estimate_prepare(mid) as pre_hll from dwd_table_goods group by goods_id where dt = ${dt} and hm = ${hm}

Step 2: Spark SQL 透過 goods_id 維度的 HLL 物件 merge 成品牌維度

insert overwrite dws_brand_uv partition (dt=‘${dt}’,hm=‘${hm}’) AS select b。brand_id, estimate_merge(pre_hll) as merge_hll from dws_table_brand A left join dim_table_brand_goods B on A。goods_id = B。goods_id where dt = ${dt} and hm = ${hm}

Step 3: Spark SQL 查詢品牌維度的 UV

select brand_id, estimate_compute(merge_hll ) as uv from dws_brand_uv where dt = ${dt}

Step 4: presto merge 查詢 park 生成的 HLL 物件

select brand_id,cardinality(merge(cast(merge_hll AS HyperLogLog))) uv from dws_brand_uv group by brand_id

所以基於實時離線一體化ETL資料整合的架構,我們能獲得的收益:

統一了基礎公共資料來源;

提升了離線數倉的時效性;

減少了元件和鏈路的維護成本。

4。2 實驗平臺(Flink 實時資料入 OLAP)

唯品會實驗平臺是透過配置多維度分析和下鑽分析,提供海量資料的 A / B - test 實驗效果分析的一體化平臺。一個實驗是由一股流量(比如使用者請求)和在這股流量上進行的相對對比實驗的修改組成。實驗平臺對於海量資料查詢有著低延遲、低響應、超大規模資料(百億級)的需求。整體資料架構如下:

透過 Flink SQL 將 kafka 裡的資料清洗解析展開等操作之後,透過 redis 維表關聯商品屬性,透過分散式表寫入到 clickhouse,然後透過資料服務 adhoc 查詢。業務資料流如下:

我們透過 Flink SQL redis connector,支援 redis 的 sink 、source 維表關聯等操作,可以很方便的讀寫 redis,實現維表關聯,維表關聯內可配置 cache ,極大提高應用的 TPS。透過 Flink SQL 實現實時資料流的 pipeline,最終將大寬表 sink 到 CK 裡,並按照某個欄位粒度做 murmurHash3_64 儲存,保證相同使用者的資料都存在同一 shard 節點組內,從而使得 ck 大表之間的 join 變成 local 本地表之間的 join,減少資料 shuffle 操作,提升 join 查詢效率。

五、未來規劃

5。1 提高 Flink SQL 易用性

當前我們的 Flink SQL 除錯起來很有很多不方便的地方,對於做離線 hive 使用者來說還有一定的使用門檻,例如手動配置 kafka 監控、任務的壓測調優,如何能讓使用者的使用門檻降低至最低,是一個比較大的挑戰。將來我們考慮做一些智慧監控告訴使用者當前任務存在的問題,儘可能自動化並給使用者一些最佳化建議。

5。2 資料湖 CDC 分析方案落地

目前我們的 VDP binlog 訊息流,透過 Flink SQL 寫入到 hive ods 層,以加速 ods 層資料來源的準備時間,但是會產生大量重複訊息去重合並。我們會考慮 Flink + 資料湖的 cdc 入倉方案來做增量入倉。此外,像訂單打寬之後的 kafka 訊息流、以及聚合結果都需要非常強的實時 upsert 能力,目前我們主要是用 kudu,但是 kudu 叢集,比較獨立小眾,維護成本高,我們會調研資料湖的增量 upsert 能力來替換 kudu 增量 upsert 場景。

上一篇:體育界被發現存在的5個作弊行為,馬拉松、馬拉松、跳水都有例外!
下一篇:iphone系統升級到ios 14。5後效果怎麼樣?不同版本的不同設定有何不同?