Flink读取mysql cdc有办法过滤掉 update_before类型的数据嘛?
以下为热心网友提供的参考意见
在Apache Flink中,当读取MySQL CDC时,可以通过配置选项或编程方式来过滤掉update_before
类型的数据。
-
使用Flink SQL:
如果您正在使用Flink SQL,可以在创建表源时指定只处理特定的变更事件类型。例如:CREATE TABLE my_table ( ... ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '', 'port' = '', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '
', 'server-id' = '', -- 只处理INSERT和UPDATE_AFTER类型的事件 'include-events' = 'insert,update_after' );
编程方式:
如果您正在使用Java/Scala API进行编程,可以自定义一个ChangelogMode
,并将其传递给TableSource
的构造函数。例如:ChangelogMode changelogMode = ChangelogMode.newBuilder() .addContainedKind(Kind.INSERT) .addContainedKind(Kind.UPDATE_AFTER) .build(); TableSource<?> source = ...; source.configure( new Configuration(), new HashMap<>(), changelogMode);
以下为热心网友提供的参考意见
没有办法。此回答整理自钉群“实时计算Flink产品交流群”
本文来自投稿,不代表新手站长_郑州云淘科技有限公司立场,如若转载,请注明出处:https://www.cnzhanzhang.com/12406.html
赞 (0)Flink这个 bucket 可以修改么? -[阿里云_云淘科技]上一篇 2023年12月13日 下午5:59flink作业写入adb pg,一直提示错误,有什么排查思路,或者优化方案吗?-[阿里云_云淘科技]下一篇 2023年12月13日新手站长从“心”出发,感谢16年您始终不离不弃。