ClickHouse:人群圈选业务的大杀器
什么是人群圈選
隨著數(shù)據(jù)時代的發(fā)展,各行各業(yè)數(shù)據(jù)平臺的體量越來越大,用戶個性化運營的訴求也越來越突出,用戶標(biāo)簽系統(tǒng),做為個性化千人千面運營的基礎(chǔ)服務(wù),應(yīng)運而生。如今,幾乎所有行業(yè)(如互聯(lián)網(wǎng)、游戲、教育等)都有實時精準(zhǔn)營銷的需求。通過系統(tǒng)生成用戶畫像,在營銷時通過條件組合篩選用戶,快速提取目標(biāo)群體,例如:
? 電商行業(yè)中,商家在運營活動前,需要根據(jù)活動的目標(biāo)群體的特征,圈選出一批目標(biāo)用戶進行廣告推送或進行活動條件的判斷。
? 游戲行業(yè)中,商家需要根據(jù)玩家的某些特征進行圈選,針對性地發(fā)放大禮包,提高玩家活躍度。
? 教育行業(yè)中,需要根據(jù)學(xué)生不同的特征,推送有針對性的習(xí)題,幫助學(xué)生查缺補漏。
? 搜索、門戶、視頻網(wǎng)站等業(yè)務(wù)中,根據(jù)用戶的關(guān)注熱點,推送不同的內(nèi)容。
以電商平臺中一個典型的目標(biāo)群體圈選場景為例,如服裝行業(yè)對其潛在客戶信息采集,打標(biāo),清洗后如下表:
(以上表結(jié)構(gòu)中,第一列為用戶身份的唯一標(biāo)識,往往作為主鍵,其他列均為標(biāo)簽列。)
如公司想推出一款高端男性運動產(chǎn)品,則可能的圈選條件為:
1.男性,推出產(chǎn)品的受眾群體為男性。
2.運動愛好者,運動愛好者更有可能消費運動類產(chǎn)品。
3.一線城市,一線城市用戶相比于二三線城市用戶,可能更傾向于消費高端產(chǎn)品。
4....
從上述表結(jié)構(gòu)(人群圈選典型表結(jié)構(gòu),且大都如此,第一列為用戶id,其余皆為標(biāo)簽列)和查詢條件可以看出,人群圈選業(yè)務(wù)都面臨一些共同的痛點:
? 用戶標(biāo)簽多、標(biāo)簽豐富,標(biāo)簽列可達成百甚至上千列。
? 數(shù)據(jù)量龐大,用戶數(shù)多,從而所需運算量也極大。
? 圈選條件組合多樣化,沒有固定索引可以優(yōu)化,存儲空間占用極大。
? 性能要求高,圈選結(jié)果要求及時響應(yīng),過長的延時會造成營銷人群的不準(zhǔn)確。
? 數(shù)據(jù)更新時效要求高,用戶畫像要求近實時的更新,過期的人群信息也將直接影響圈選的精準(zhǔn)性。
針對以上痛點,本文將從原理層面深度分析,多角度對比講解如何使用ClickHouse搭建人群圈選系統(tǒng),為何選擇ClickHouse,以及選用ClickHouse搭建人群圈選系統(tǒng)的優(yōu)勢。
為什么選擇ClickHouse
本文以開ElasticSearch(ES)為例,僅針對人群圈選場景與ClickHouse做對比。開源版ES是一款高效的搜索分析引擎,利用其優(yōu)秀的索引技術(shù),可以完成各種復(fù)雜的條件組合和數(shù)據(jù)聚合運算。ClickHouse是最近比較火的一款開源列式存儲分析型數(shù)據(jù)庫,它最核心的特點就是極致存儲壓縮率和查詢性能,尤其擅長單個大寬表的查詢場景。因此細比兩者,相較與ClickHouse,ES雖具備人群圈選業(yè)務(wù)所需的必要能力,但仍有以下3方面不足:
成本方面:
開源ES的底層存儲使用lucene,主要包含行存儲(storefiled),列存儲(docvalues)和倒排索引(invertindex)。行存中_source字段用于控制doc原始數(shù)據(jù)的存儲。在寫入數(shù)據(jù)時,ES把doc原始數(shù)據(jù)的整個json結(jié)構(gòu)體當(dāng)做一個string,存儲為_source字段,因此_source字段對存儲占用量大且關(guān)閉_source將不支持update操作。同時,索引也是ES不可缺少的一部分,ES默認全列索引,雖可手動設(shè)置對特定的列取消索引,但取消索引的列將不可查詢。在人群圈選場景下,選取標(biāo)簽過濾條件是任意的,多樣的,不斷變化的。對任意一條標(biāo)簽列不做索引都是不現(xiàn)實的,因此針對成百上千列的大寬表,全列索引必然使得存儲成本翻倍。
ClickHouse是一款徹底的列式存儲數(shù)據(jù)庫,且ClickHouse的查詢不依賴索引,使用過程中也不強制構(gòu)建索引,因此不需要保留額外的索引文件。同時ClickHouse存儲數(shù)據(jù)的副本數(shù)量靈活可配,可將使用成本降至最低。
數(shù)據(jù)更新與治理方面:
索引為ES帶來了高效的查詢性能,但是索引的構(gòu)造過程是復(fù)雜的,耗時的。每一次索引的構(gòu)建都需對全列數(shù)據(jù)進行掃描,排序來生成索引文件。而在人群圈選業(yè)務(wù)中,人群信息必然是不斷增長的。標(biāo)簽的不斷更新將會使得ES不得不頻繁的重構(gòu)索引,這將對ES的性能造成巨大的開銷 。
ClickHouse的查詢不依賴索引,使用過程中也不強制構(gòu)建索引。因此對于新增數(shù)據(jù),ClickHouse不涉及索引的更新與維護。
易用性方面:
開源ES缺少完備的sql支持,查詢請求的json格式復(fù)雜。同時ES對多條件過濾聚合的執(zhí)行策略缺少優(yōu)化,還以文章開頭的典型場景為例,圈出一款高端男性運動產(chǎn)品的受眾人群。可得如下sql:“SELECT user_id FROM whatever_table WHERE city_level = '一線城市' AND gender = '男性' AND is_like_sports = '是';”
針對以上sql,ES的執(zhí)行會對3個標(biāo)簽分別做3次索引掃描,之后再將3次掃描的結(jié)果做merge,流程如下圖所示
而ClickHouse的執(zhí)行則更優(yōu)雅一些。ClickHouse采用標(biāo)準(zhǔn)sql,語法簡單且功能強大。在執(zhí)行where語句時,會自動優(yōu)化形成prewhere分層執(zhí)行,因此二次掃描將基于一次掃描的結(jié)果進行,執(zhí)行流程如下圖所示:
顯而易見,針對復(fù)雜條件過濾的場景,ClickHouse對多條件篩選流程做出優(yōu)化,掃描的數(shù)據(jù)量更小,性能也較ES而言更高效。
如何基于ClickHouse搭建人群圈選系統(tǒng):
對比選型完成后,接下來講解如何基于ClickHouse搭建人群圈選系統(tǒng),回顧文章開頭的業(yè)務(wù)描述和上一部分的典型sql(“SELECT user_id FROM whatever_table WHERE city_level = '一線城市' AND gender = '男性' AND is_like_sports = '是';”),再次總結(jié)人群圈選業(yè)務(wù)對數(shù)據(jù)庫能力的要求如下:
1.具備高效的批量數(shù)據(jù)導(dǎo)入性能。
2.具備處理頻繁,實時update的能力。
3.具備加列/減列的DDL能力。
4.可以指定任意列為過濾條件的高效查詢能力。
面對以上需求,ClickHouse如何使用才能在人群圈選場景下物盡其用,揚長避短?
insert代替update
首先要解決的是ClickHouse的異步update機制。ClickHouse對update的執(zhí)行是低效的,ClickHouse內(nèi)核中的MergeTree存儲一旦生成一個Data Part,這個Data Part就不可再更改了。所以從MergeTree存儲內(nèi)核層面,ClickHouse就不擅長做數(shù)據(jù)更新刪除操作。ClickHouse的語法把Update操作也加入到了Alter Table的范疇中,它并不支持裸的Update操作。
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr;
當(dāng)用戶執(zhí)行一個如上的Update操作獲得返回時,ClickHouse內(nèi)核其實只做了兩件事情:
1.檢查Update操作是否合法;
2.保存Update命令到存儲文件中,喚醒一個異步處理merge和mutation的工作線程;異步線程的工作流程極其復(fù)雜,總結(jié)其精髓描述如下:先查找到需要update的數(shù)據(jù)所在datapart,之后對整個datapart做掃描,更新需要變更的數(shù)據(jù),然后再將數(shù)據(jù)重新落盤生成新的datapart,最后用新的datapart做替代并remove掉過期的datapart。
這就是ClickHouse對update指令的執(zhí)行過程,可以看出,頻繁的update指令對于ClickHouse來說將是災(zāi)難性的。
因此,我們使用insert語句代替update語句。當(dāng)需要對某一指定user更新標(biāo)簽時,就重新插入一條該user的數(shù)據(jù),
如對表中07號用戶進行數(shù)據(jù)更新:
最終,每個user可能都存在多條記錄。針對人群圈選場景,同一user錯亂冗余的信息顯然對查詢結(jié)果產(chǎn)生誤導(dǎo),無法滿足精準(zhǔn)圈選的需求。接下來講解如何使用ClickHouse進行主鍵去重,即同一user,讓后insert進來的數(shù)據(jù)覆蓋掉已有的數(shù)據(jù),實現(xiàn)update的效果。
選用AggregatingMergeTree表引擎
MergeTree是ClickHouse中最重要,最核心的存儲內(nèi)核,MergeTree思想上與LSM-Tree相似,其實現(xiàn)原理復(fù)雜,不在此展開,因為一篇文章也難以講解清楚。本篇圍繞人群圈選場景,著重從功能層面描述如何在人群圈選場景下使用MergeTree的變種AggregatingMergeTree以及使用AggregatingMergeTree可實現(xiàn)的數(shù)據(jù)聚合效果。AggregatingMergeTree繼承自 MergeTree,存儲上和基礎(chǔ)的MergeTree其實沒有任何差異,而是在數(shù)據(jù)Merge的過程中加入了“額外的合并邏輯”, AggregatingMergeTree 會將相同主鍵的所有行(在一個數(shù)據(jù)片段內(nèi))替換為單個存儲一系列聚合函數(shù)狀態(tài)的行。以文章開頭部分的表結(jié)構(gòu)為例,使用AggregatingMergeTree表引擎的建表語句如下:
CREATE TABLE IF NOT EXISTS whatever_table ON CLUSTER default (user_id UInt64,city_level SimpleAggregateFunction(anyLast, Nullable(Enum('一線城市' = 0, '二線城市' = 1, '三線城市' = 2, '四線城市' = 3))),gender SimpleAggregateFunction(anyLast, Nullable(Enum('女' = 0, '男' = 1))),interest_sports SimpleAggregateFunction(anyLast, Nullable(Enum('否' = 0, '是' = 1))),reg_date SimpleAggregateFunction(anyLast, Datetime),comment_like_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),last30d_share_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),user_like_consume_trend_type SimpleAggregateFunction(anyLast, Nullable(String)),province SimpleAggregateFunction(anyLast, Nullable(String)),last_access_version SimpleAggregateFunction(anyLast, Nullable(String)),others SimpleAggregateFunction(anyLast,Array(String)) )ENGINE = AggregatingMergeTree() partition by toYYYYMMDD(reg_date) ORDER BY user_id;就以上建標(biāo)語句展開分析,AggregatingMergeTree會將除主鍵(user)外的其余列,配合anyLast函數(shù),替換每行數(shù)據(jù)為一種預(yù)聚合狀態(tài)。其中anyLast聚合函數(shù)聲明聚合策略為保留最后一次的更新數(shù)據(jù)。
數(shù)據(jù)一致性保證
上一部分講述了如何針對人群圈選場景選擇表引擎和聚合函數(shù),但是AggregatingMergeTree并不能保證任何時候的查詢都是聚合過后的結(jié)果,并且也沒有提供標(biāo)志位用于查詢數(shù)據(jù)的聚合狀態(tài)與進度。因此,為了確保數(shù)據(jù)在查詢前處于已聚合的狀態(tài),還需手動下發(fā)optimize指令強制聚合過程的執(zhí)行。同時方便起見,可自行配置周期性optimize指令的下發(fā)。例如每10分鐘執(zhí)行一次optimize指令。optimize的執(zhí)行周期可在業(yè)務(wù)的實時性需求與計算資源之間做權(quán)衡。如數(shù)據(jù)量過大,optimize生效慢,可按partition級別并行下發(fā)做優(yōu)化。optimize生效后即可實現(xiàn)去重邏輯。
Demo:
import java.sql.*; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.concurrent.TimeoutException;public class Main {private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT);public static void main(String[] args) throws ClassNotFoundException, SQLException, InterruptedException, ParseException {String url = "your url";String username = "your username";String password = "your password";Class.forName("ru.yandex.clickhouse.ClickHouseDriver");String connectionStr = "jdbc:clickhouse://" + url + ":8123";try {Connection connection = DriverManager.getConnection(connectionStr, username, password);Statement stmt = connection.createStatement();// 創(chuàng)建local表String createLocalTableDDL = "CREATE TABLE IF NOT EXISTS whatever_table ON CLUSTER default " +"(user_id UInt64, " +"city_level SimpleAggregateFunction(anyLast, Nullable(Enum('一線城市' = 0, '二線城市' = 1, '三線城市' = 2, '四線城市' = 3))), " +"gender SimpleAggregateFunction(anyLast, Nullable(Enum('女' = 0, '男' = 1)))," +"interest_sports SimpleAggregateFunction(anyLast, Nullable(Enum('否' = 0, '是' = 1)))," +"reg_date SimpleAggregateFunction(anyLast, Datetime)) " +"comment_like_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),\n" +"last30d_share_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),\n" +"user_like_consume_trend_type SimpleAggregateFunction(anyLast, Nullable(String)),\n" +"province SimpleAggregateFunction(anyLast, Nullable(String)),\n" +"last_access_version SimpleAggregateFunction(anyLast, Nullable(String)),\n" +"others SimpleAggregateFunction(anyLast, Array(String)),\n" +"ENGINE = AggregatingMergeTree() PARTITION by toYYYYMM(reg_date) ORDER BY user_id;";stmt.execute(createLocalTableDDL);System.out.println("create local table done.");// 創(chuàng)建distributed表String createDistributedTableDDL = "CREATE TABLE IF NOT EXISTS whatever_table_dist ON cluster default " +"AS default.whatever_table " +"ENGINE = Distributed(default, default, whatever_table, intHash64(user_id));";stmt.execute(createDistributedTableDDL);System.out.println("create distributed table done");// 插入mock數(shù)據(jù)String insertSQL = "INSERT INTO whatever_table(\n" +"\tuser_id,\n" +"\tcity_level,\n" +"\tgender,\n" +"\tinterest_sports,\n" +"\treg_date,\n" +"\tcomment_like_cnt,\n" +"\tlast30d_share_cnt,\n" +"\tuser_like_consume_trend_type,\n" +"\tprovince,\n" +"\tlast_access_version,\n" +"\tothers\n" +"\t)SELECT\n" +" number as user_id,\n" +" toUInt32(rand(11)%4) as city_level,\n" +" toUInt32(rand(30)%2) as gender,\n" +" toUInt32(rand(28)%2) as interest_sports,\n" +" (toDateTime('2020-01-01 00:00:00') + rand(1)%(3600*24*30*4)) as reg_date,\n" +" toUInt32(rand(15)%10) as comment_like_cnt,\n" +" toUInt32(rand(16)%10) as last30d_share_cnt,\n" +"randomPrintableASCII(64) as user_like_consume_trend_type,\n" +"randomPrintableASCII(64) as province,\n" +"randomPrintableASCII(64) as last_access_version,\n" +"[randomPrintableASCII(64)] as others\n" +" FROM numbers(100000);\n";stmt.execute(insertSQL);System.out.println("Mock data and insert done.");System.out.println("Select count(user_id)...");ResultSet rs = stmt.executeQuery("select count(user_id) from whatever_table_dist");while (rs.next()) {int count = rs.getInt(1);System.out.println("user_id count: " + count);}// 數(shù)據(jù)合并String optimizeSQL = "OPTIMIZE table whatever_table final;";// 如數(shù)據(jù)合并時間過長,可在partition級別并行執(zhí)行String optimizeByPartitionSQL = "OPTIMIZE table whatever_table PARTITION 202001 final;";try {stmt.execute(optimizeByPartitionSQL);}catch (SQLTimeoutException e){// 查看merge進展// String checkMergeSQL = "select * from system.merges where database = 'default' and table = 'whatever_table';";Thread.sleep(60*1000);}// 人群圈選(city_level='一線城市',gender='男性',interest_sports='是', reg_date<='2020-01-31 23:59:59')String selectSQL = "SELECT user_id from whatever_table_dist where city_level=0 and gender=1 and interest_sports=1 and reg_date <= NOW();";rs = stmt.executeQuery(selectSQL);while (rs.next()) {int user_id = rs.getInt(1);System.out.println("Got suitable user: " + user_id);}} catch (Exception e) {e.printStackTrace();}} }寫在最后
阿里云已經(jīng)推出了ClickHouse的云托管產(chǎn)品,產(chǎn)品首頁地址:云數(shù)據(jù)庫ClickHouse,歡迎大家試用,對Clickhouse感興趣的也可加入Clickhouse技術(shù)交流群。
原文鏈接:https://developer.aliyun.com/article/781084?
版權(quán)聲明:本文內(nèi)容由阿里云實名注冊用戶自發(fā)貢獻,版權(quán)歸原作者所有,阿里云開發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請查看《阿里云開發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開發(fā)者社區(qū)知識產(chǎn)權(quán)保護指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫侵權(quán)投訴表單進行舉報,一經(jīng)查實,本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。總結(jié)
以上是生活随笔為你收集整理的ClickHouse:人群圈选业务的大杀器的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何跑通第一个 DataStream 作
- 下一篇: 如何将实时计算 Flink 与自身环境打