聚诚网站建设磁力bt种子搜索
Bitmap是一种经典的数据结构,用于高效地对大量的二进制数据进行压缩存储和快速查询。Doris支持bitmap数据类型,在Flink计算场景中,可以结合Flink doris Connector对bitmap数据做计算。
社区里很多小伙伴在是Doris Flink Connector的时候,不知道怎么写Bitmap类型的数据,本文将介绍如何使用 Flink Doris Connector 如何将 bitmap 数据写入 Doris 中。
前置准备
Doris2.0.1的环境
Flink1.16,同时将 Doris Flink Connector的Jar包放在<FLINK_HOME>/lib 下面。
创建Doris表
CREATE TABLE `page_view_bitmap` (
`dt` int,
`page` varchar(256),
`user_id` bitmap bitmap_union
)
AGGREGATE KEY(`dt`, page)
DISTRIBUTED BY HASH(`dt`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)
写入Bitmap数据
这里模拟Flink读取MySQL数据写入Doris,同时将user_id存储到bitmap中。
模拟数据
创建MySQL表
CREATE TABLE `page_view` (`id` int NOT NULL,`dt` int,`page` varchar(256),`user_id` int,PRIMARY KEY (`id`)
);#模拟数据
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (1, 20230921, 'home', 1001);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (2, 20230921, 'home', 1002);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (3, 20230921, 'search', 1003);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (4, 20230922, 'mine', 1001);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (5, 20230922, 'home', 1002);
FlinkSQL写入Bitmap
#使用JDBC读取mysql数据
CREATE TABLE page_view (`dt` int,`page` string,`user_id` int
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/test','table-name' = 'page_view','username' = 'root','password' = '123456'
);
doris connector写入数据
CREATE TABLE page_view_bitmap (
dt int,
page string,
user_id int
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'test.page_view_bitmap','username' = 'root','password' = '','sink.label-prefix' = 'doris_label1','sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
);
insert into page_view_bitmap select * from page_view
我们知道 Doris Flink Connector Sink 底层是基于 Doris Stream Load 来实现的,同样 Stream load 在 Connector 里也是一样适用,我们将这个参数封装在了 :sink.properties 参数里,
这里我们可以看到上面这个例子里我们在是 With 属性里加入了我们 Columns 参数,这里我们配置了列的转换操作,将 user_id 通过 to_bitmap 函数进行转换,并导入到 Doris 表里。
查询结果
mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
+----------+--------+---------------------------+
| dt | page | bitmap_to_string(user_id) |
+----------+--------+---------------------------+
| 20230921 | home | 1001,1002 |
| 20230921 | search | 1003 |
| 20230922 | home | 1002 |
| 20230922 | mine | 1001 |
+----------+--------+---------------------------+
4 rows in set (0.00 sec)
Flink DataStream
使用 DataStream API 模拟数据写入刚才的表中。
DataStream API 对 Bitmap 的操作也是和上面 SQL 操作的方式一样。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setRuntimeMode(RuntimeExecutionMode.BATCH);DorisSink.Builder<String> builder = DorisSink.builder();final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();Properties properties = new Properties();properties.setProperty("column_separator", ",");properties.setProperty("format", "csv");properties.setProperty("columns", "dt,page,user_id,user_id=to_bitmap(user_id)");DorisOptions.Builder dorisBuilder = DorisOptions.builder();dorisBuilder.setFenodes("127.0.0.1:8030").setTableIdentifier("test.page_view_bitmap").setUsername("root").setPassword("");DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();executionBuilder.setLabelPrefix("doris_label").setStreamLoadProp(properties).setDeletable(false);builder.setDorisReadOptions(readOptionBuilder.build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()).setDorisOptions(dorisBuilder.build());//mock dataDataStreamSource<String> stringDataStreamSource = env.fromCollection(Arrays.asList("20230921,home,1003", "20230921,search,1001", "20230923,home,1001"));stringDataStreamSource.sinkTo(builder.build());env.execute("doris bitmap write");}
查询结果
mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
+----------+--------+---------------------------+
| dt | page | bitmap_to_string(user_id) |
+----------+--------+---------------------------+
| 20230921 | home | 1001,1002,1003 |
| 20230921 | search | 1001,1003 |
| 20230922 | home | 1002 |
| 20230922 | mine | 1001 |
| 20230923 | home | 1001 |
+----------+--------+---------------------------+
5 rows in set (0.00 sec)