ClickHouse数据导入

1.INSERT

2.文件

clickhouse-client -h wxpt-ck-01 --port="9000" -u default --password='abc123456' --format_csv_delimiter=$'\001' --max_partitions_per_insert_block=3000 --input_format_allow_errors_num=100000 --query="INSERT INTO default.tb_test_distributed FORMAT CSV" < CK_TEST.txt

--format_csv_delimiter 字段分隔符

--max_partitions_per_insert_block 此次插入最大分区数

--input_format_allow_errors_num 可忽略的错误条数

3.HDFS

需在ck建立HDFS映射表,高可用集群可以将配置文件加入config.xml中

<hdfs>
    <libhdfs3_conf>/data/software/comm_config/hdfs-site.xml</libhdfs3_conf>
</hdfs>

创建HDFS映射表

CREATE TABLE default.hdfs_home_page_touch_collect_local
(
    ACTIVITY_ID      String COMMENT '活动ID',
    RECOMMEND_TYPE   String COMMENT '推荐类型',
    ACTIVITY_NAME    String COMMENT '活动名称',
    ACTIVITY_START   DateTime COMMENT '活动开始时间',
    ACTIVITY_END     DateTime COMMENT '活动结束时间',
    CREATOR          String COMMENT '创建人',
    ORG_NAME         String COMMENT '组织名称',
    GOODS_ID         String COMMENT '商品ID',
    GOODS_TABLE_ID UInt64 COMMENT '商品表ID',
    SECTION_TYPE     String COMMENT '业务类型',
    FIRST_DATA_TYPE  String COMMENT '一级分类',
    SECOND_DATA_TYPE String COMMENT '二级分类',
    POSITION_CODE    String COMMENT '位置编码',
    POSITION_NAME    String COMMENT '位置名称',
    PHONE_NUMBER UInt64 COMMENT '手机号码',
    RECOM_PV UInt64 COMMENT '推荐数',
    CLICK_PV UInt64 COMMENT '点击数',
    ORDER_PV UInt64 COMMENT '订单数',
    GOODS_URL        String COMMENT '商品URL',
    GOODS_NAME       String COMMENT '商品名称',
    GOODS_CHANNEL    String COMMENT '商品触点编码',
    ACT_ORDER_PV UInt64 COMMENT '活动级订单数',
    OPERA_DAY        String COMMENT '统计日期'
) ENGINE HDFS('hdfs://cluster1/home/hadoop/baiyk/tb_dwa_home_page_touch_collect_bak/*', 'TSV');

导出数据到CK集群表中

INSERT INTO report.tb_home_page_touch_collect
SELECT ACTIVITY_ID,
       RECOMMEND_TYPE,
       ACTIVITY_NAME,
       ACTIVITY_START,
       ACTIVITY_END,
       CREATOR,
       ORG_NAME,
       GOODS_ID,
       GOODS_TABLE_ID,
       SECTION_TYPE,
       FIRST_DATA_TYPE,
       SECOND_DATA_TYPE,
       POSITION_CODE,
       POSITION_NAME,
       PHONE_NUMBER,
       RECOM_PV,
       CLICK_PV,
       ORDER_PV,
       GOODS_URL,
       GOODS_NAME,
       GOODS_CHANNEL,
       ACT_ORDER_PV,
       OPERA_DAY
FROM default.hdfs_home_page_touch_collect_local;

4.三方工具seaTunnel

官网
下载解压后,配置SPARK_HOME

 vim ./config/seatunnel-env.sh

编写配置文件,参考

vim ./config/spark.hive2CK.conf
env {
   spark.app.name = "Hive2CK_tb_st_user_lable_new"
   spark.executor.instances = 2
   # 启用动态分区,若资源消耗过于严重,也可以配置核心数与内存
   spark.shuffle.service.enabled = true
   spark.dynamicAllocation.enabled = true
 }

 source {
   hive {
       # hive数据源必须写作一行,可在SQL中进行字段处理
       pre_sql = "SELECT phone_number, user_id, age, sex, name, user_sys, wo_credit, contact, prov_id, prov_name, city_id, cb_city_id, city_name, is_5g_city, is_tencent, product_base, is_2i_user, is_online, chnl_kind_id, product_id, product_name, product_alias_name, product_id_dg, product_id_dg_name, innet_age_day, innet_age_month, innet_date, is_5g_model, curr_terminal_length, brand_id, brand_name, model_id, model_name, is_first_pay, is_second_pay, first_pay_fee, pay_amount_all, kyye_fee, recent_pay_time, recent_pay_amount, month_fee, last_month_arpu, last_2month_avg_arpu, last_3month_avg_arpu, last_6month_avg_arpu, voice_extra_fee, svip_and_month_fee, last_month_flux_fee, last_2month_avg_flux_fee, last_3month_avg_flux_fee, last_2month_max_flux_fee, total_flux, total_call, expen_in_flux_rate, expen_in_call_rate, totaler_bytes, totaler_bytes_rate, is_new_reg_low_freq, out_jf_times, is_low_flux_preference, is_low_call_preference, is_open_intl, open_intl_date, hkmotw_data, intl_data, out_active_call, in_active_call, last_3month_avg_total_flux, last_6month_avg_total_flux, last_3month_avg_out_jf_times, last_6month_avg_out_jf_times, is_silence_user, user_desc, is_susp_card_user, is_fake_user, stop_days, half_stop_days, half_stop_num_last_60, boot_state, is_family_product, is_ykrh, ykrh_type, is_yyrh, yyrh_type, is_5g_user, dual_slot, customer_type, is_innet, is_acct, offnet_date, is_5g_product, is_free_flux, is_free_call, is_opencard, contract_type, is_high_risk_3month, is_second_pay_3month, transform(video_preference, x -> to_json(x)) video_preference, transform(game_preference, x -> to_json(x)) game_preference, contact_code, is_newinnet_user, is_double_flow_effective, is_second_pay_partake, is_high_risk_partake, is_5g_up_dg, is_prime_equity_package, is_prime_equity_similar, is_commit_low, is_low_pay_user, is_jinli_member, is_prime_equity_5g, is_prime_equity_5g_contract, is_fee_help_deduction, is_preferential_benefits, off_net_prob, transfer_net_prob, credit_scores, user_stability, is_loyal_user, is_vip_user, user_value_rank, rank_clustering_net_duration, rank_clustering_age, rank_clustering_total_fee, rank_clustering_total_flux, rank_clustering_user_value, bill_flux, ct_times, product_expen_in, product_expen_out, expen_in_call, expen_in_flux_g, expen_in_flux_m, is_5g_up_dg_month, is_5g_up_td_month, is_supervip_dg_month, is_supervip_td_month, is_cndx_dg_month, is_cndx_td_month, is_newinnet_pay_partake, is_half_stop_partake, is_2i_product, is_svip, is_2i_flow_fill, is_tencent_free_flow, is_share_prime_equity, is_5g_relative, is_shouting_6month, transform(read_preference, x -> to_json(x)) read_preference, transform(chat_preference, x -> to_json(x)) chat_preference, transform(shop_preference, x -> to_json(x)) shop_preference, transform(learn_preference, x -> to_json(x)) learn_preference, transform(travel_preference, x -> to_json(x)) travel_preference, transform(life_preference, x -> to_json(x)) life_preference, transform(health_preference, x -> to_json(x)) health_preference, transform(financial_preference, x -> to_json(x)) financial_preference, transform(utilities_preference, x -> to_json(x)) utilities_preference, is_jiasubao, jiasubao_time_rate, jiasubao_num_rate, is_flux_plunge, is_time_plunge, is_new_supervip, is_not_equity_supervip, is_not_equity_supervip_same_month, off_net_prob_no_wk, off_net_prob_wk, is_supervip_not_5g_user, is_supervip_not_5g_user_type, is_dg_5g_active, is_td_weixi_active, is_game_preference, is_video_preference, family_card_mainsub, family_card_amount, service_type_old, product_id_5g, prime_equity_list, id_sx_4g_svip, id_sx_5g_svip, id_sx_cndx, id_sx_5g_up, id_td_5g_up, id_td_4g_svip, id_td_5g_svip, id_td_svip, id_td_cndx, id_td_5g_jyb, id_sx_5g_us, id_td_5g_us, date_format(current_date(), 'yyyMMdd') opera_day FROM raw_layer.tb_st_user_lable_new"
       result_table_name = "source_view_table"
   }
 }

 transform {
       # 转换模块,对于个别字段分割,处理也可以在这里做
 }

 sink {
   clickhouse {
       source_table_name = "source_view_table"
       # 集群
       host = "xxxx:8123"
       clickhouse.socket_timeout = 50000
       database = "raw_layer"
       table = "tb_st_user_label_new"
       # 表字段
       fields = ["phone_number","user_id","age","sex","name","user_sys","wo_credit","contact","prov_id","prov_name","city_id","cb_city_id","city_name","is_5g_city","is_tencent","product_base","is_2i_user","is_online","chnl_kind_id","product_id","product_name","product_alias_name","product_id_dg","product_id_dg_name","innet_age_day","innet_age_month","innet_date","is_5g_model","curr_terminal_length","brand_id","brand_name","model_id","model_name","is_first_pay","is_second_pay","first_pay_fee","pay_amount_all","kyye_fee","recent_pay_time","recent_pay_amount","month_fee","last_month_arpu","last_2month_avg_arpu","last_3month_avg_arpu","last_6month_avg_arpu","voice_extra_fee","svip_and_month_fee","last_month_flux_fee","last_2month_avg_flux_fee","last_3month_avg_flux_fee","last_2month_max_flux_fee","total_flux","total_call","expen_in_flux_rate","expen_in_call_rate","totaler_bytes","totaler_bytes_rate","is_new_reg_low_freq","out_jf_times","is_low_flux_preference","is_low_call_preference","is_open_intl","open_intl_date","hkmotw_data","intl_data","out_active_call","in_active_call","last_3month_avg_total_flux","last_6month_avg_total_flux","last_3month_avg_out_jf_times","last_6month_avg_out_jf_times","is_silence_user","user_desc","is_susp_card_user","is_fake_user","stop_days","half_stop_days","half_stop_num_last_60","boot_state","is_family_product","is_ykrh","ykrh_type","is_yyrh","yyrh_type","is_5g_user","dual_slot","customer_type","is_innet","is_acct","offnet_date","is_5g_product","is_free_flux","is_free_call","is_opencard","contract_type","is_high_risk_3month","is_second_pay_3month","video_preference","game_preference","contact_code","is_newinnet_user","is_double_flow_effective","is_second_pay_partake","is_high_risk_partake","is_5g_up_dg","is_prime_equity_package","is_prime_equity_similar","is_commit_low","is_low_pay_user","is_jinli_member","is_prime_equity_5g","is_prime_equity_5g_contract","is_fee_help_deduction","is_preferential_benefits","off_net_prob","transfer_net_prob","credit_scores","user_stability","is_loyal_user","is_vip_user","user_value_rank","rank_clustering_net_duration","rank_clustering_age","rank_clustering_total_fee","rank_clustering_total_flux","rank_clustering_user_value","bill_flux","ct_times","product_expen_in","product_expen_out","expen_in_call","expen_in_flux_g","expen_in_flux_m","is_5g_up_dg_month","is_5g_up_td_month","is_supervip_dg_month","is_supervip_td_month","is_cndx_dg_month","is_cndx_td_month","is_newinnet_pay_partake","is_half_stop_partake","is_2i_product","is_svip","is_2i_flow_fill","is_tencent_free_flow","is_share_prime_equity","is_5g_relative","is_shouting_6month","read_preference","chat_preference","shop_preference","learn_preference","travel_preference","life_preference","health_preference","financial_preference","utilities_preference","is_jiasubao","jiasubao_time_rate","jiasubao_num_rate","is_flux_plunge","is_time_plunge","is_new_supervip","is_not_equity_supervip","is_not_equity_supervip_same_month","off_net_prob_no_wk","off_net_prob_wk","is_supervip_not_5g_user","is_supervip_not_5g_user_type","is_dg_5g_active","is_td_weixi_active","is_game_preference","is_video_preference","family_card_mainsub","family_card_amount","service_type_old","product_id_5g","prime_equity_list","id_sx_4g_svip","id_sx_5g_svip","id_sx_cndx","id_sx_5g_up","id_td_5g_up","id_td_4g_svip","id_td_5g_svip","id_td_svip","id_td_cndx","id_td_5g_jyb","id_sx_5g_us","id_td_5g_us","opera_day"]
       username = "default"
       password = "abc123456"
       # 每批次最打条数
       bulk_size = 20000
       # 是否启动分割,仅用于分布表
       split_mode = true
       # 分布键
       sharding_key = "cityHash64(phone_number, user_id)"
   }
 }

执行

./bin/start-seatunnel-spark.sh --config ./config/spark.hive2ck.conf --master yarn -e client

启动后会提交到YARN上,然后执行SPARK-SQL

5.seaTunnel的CLickHouseFile插件

还未测试,介绍


时至今日,你依旧是我的光芒。