Flink1.18.0版本连接外部kafka本地执行测试代码报错-[阿里云_云淘科技]

版本:Flink1.18.0
代码如下:“`
public static void main(String[] args) {

    //1、创建TableEnvironment
    EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
    TableEnvironment tEnv = TableEnvironment.create(settings);

    //2、创建source  table
    Schema schema = Schema.newBuilder()
            .column("user", DataTypes.STRING())
            .column("url",DataTypes.STRING())
            .column("cTime", DataTypes.STRING())
            .build();
    tEnv.createTemporaryTable( "sourceTable", TableDescriptor.forConnector("kafka")
            .schema(schema)
            .option("topic","spring_test_kafka_topic")
            .option("properties.bootstrap.servers","xxxxx:9092")
            .option("properties.group.id","clicklog")
            .option("scan.startup.mode","earliest-offset")
            .option("format","json")
            .build()
    );

    //3、创建sink  table
    tEnv.createTemporaryTable("sinkTable",
            TableDescriptor.forConnector("print").schema(schema).build());

    //4、执行sql查询
    //Table resultTable = tEnv.sqlQuery(" select ,userurl,cTime from sourceTable ");
    Table resultTable = tEnv.from("sourceTable")
            .select($("user"), $("url"),$("cTime"));

    //5、输出
    resultTable.executeInsert("sinkTable");
}

“`

操作流程:运行代码报错:Flink1.18.0版本连接外部kafka本地执行测试代码报错-[阿里云_云淘科技]
Flink1.18.0版本连接外部kafka本地执行测试代码报错-[阿里云_云淘科技]
请问是什么原因呢,请大佬指点

本文来自投稿,不代表新手站长_郑州云淘科技有限公司立场,如若转载,请注明出处:https://www.cnzhanzhang.com/17493.html

(0)
匿名
上一篇 2023年12月29日 上午11:24
下一篇 2023年12月29日

相关推荐

新手站长从“心”出发,感谢16年您始终不离不弃。