温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Flink CDC怎么监听MySQL表

发布时间:2021-12-04 10:04:51 来源:亿速云 阅读:297 作者:iii 栏目:大数据

这篇文章主要讲解了“Flink CDC怎么监听MySQL表”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Flink CDC怎么监听MySQL表”吧!

// 前景提要:开启mysql binlog监控。(目录:C:\ProgramData\MySQL\MySQL Server 5.6\my.ini)ProgramData 为隐藏目录。注意:binlog_format=ROW
// 创建Blink Streaming的TableEnvironmentEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// 创建表,connector使用mysql-cdcbsTableEnv.executeSql("CREATE TABLE mysql_binlog " +"(id STRING, " +"times STRING, " +"temp STRING) " +"WITH " +"('connector' = 'mysql-cdc', " +" 'hostname' = '127.0.0.1', " +" 'port' = '3306', " +" 'username' = 'root', " +" 'password' = '123456', " +" 'database-name' = 'test', " +" 'table-name' = 'sersor_temp'" +")");// 打印控制台bsTableEnv.executeSql("CREATE TABLE sink_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE) " +"WITH " +"('connector' = 'print'" +")");// 将CDC数据源和下游数据表对接起来bsTableEnv.executeSql("INSERT INTO sink_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE sink_kafka_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE " +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'test_mysql_binlog'," +" 'scan.startup.mode' = 'earliest-offset'," +" 'properties.group.id' = 'testGroup'," +" 'properties.bootstrap.servers' = 'node2:9092', " +" 'format' = 'canal-json' " +")");// 将CDC数据与 kafka表对接起来bsTableEnv.executeSql("INSERT INTO sink_kafka_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE hTable (" +" id STRING," +" f ROW<times STRING, temp STRING>," +" PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +" 'connector' = 'hbase-2.2'," +" 'table-name' = 'regional:binlog'," +" 'zookeeper.quorum' = 'node2:2181'" +")");// 将CDC数据存储到 Hbase中bsTableEnv.executeSql("INSERT INTO hTable SELECT id, ROW(times, temp) FROM mysql_binlog");

-- ----------------------------
-- Table structure for sersor_temp
-- ----------------------------
DROP TABLE IF EXISTS `sersor_temp`;
CREATE TABLE `sersor_temp`  (
  `id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `temp` decimal(10, 2) NOT NULL,
  `times` varchar(10) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of sersor_temp
-- ----------------------------
INSERT INTO `sersor_temp` VALUES ('sensor_1', 22.20, '1547718527');
INSERT INTO `sersor_temp` VALUES ('sensor_2', 25.20, '1547718214');
INSERT INTO `sersor_temp` VALUES ('sensor_3', 46.40, '1547718520');
INSERT INTO `sersor_temp` VALUES ('sensor_5', 32.62, '1547718325');
 

注意:此处 表中 temp 字段为 decimal 类型,在SQL中使用  DECIMAL 、DOUBLE 类型 存储到hbase中都会出现乱码问题,遂 都换成 STRING

感谢各位的阅读,以上就是“Flink CDC怎么监听MySQL表”的内容了,经过本文的学习后,相信大家对Flink CDC怎么监听MySQL表这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI