版本:Flink1.18.0
代码如下:“`
public static void main(String[] args) {
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2、创建source table
Schema schema = Schema.newBuilder()
.column("user", DataTypes.STRING())
.column("url",DataTypes.STRING())
.column("cTime", DataTypes.STRING())
.build();
tEnv.createTemporaryTable( "sourceTable", TableDescriptor.forConnector("kafka")
.schema(schema)
.option("topic","spring_test_kafka_topic")
.option("properties.bootstrap.servers","xxxxx:9092")
.option("properties.group.id","clicklog")
.option("scan.startup.mode","earliest-offset")
.option("format","json")
.build()
);
//3、创建sink table
tEnv.createTemporaryTable("sinkTable",
TableDescriptor.forConnector("print").schema(schema).build());
//4、执行sql查询
//Table resultTable = tEnv.sqlQuery(" select ,userurl,cTime from sourceTable ");
Table resultTable = tEnv.from("sourceTable")
.select($("user"), $("url"),$("cTime"));
//5、输出
resultTable.executeInsert("sinkTable");
}
“`
操作流程:运行代码报错:
请问是什么原因呢,请大佬指点
本文来自投稿,不代表新手站长_郑州云淘科技有限公司立场,如若转载,请注明出处:https://www.cnzhanzhang.com/17493.html