大家有遇到Flink这问题吗?-[阿里云_云淘科技]

大家有遇到Flink这问题吗?在1.15.4里报错。Caused by: java.util.concurrent.CompletionException: org.apache.flink.api.common.InvalidProgramException: The job graph is cyclic.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)

以下为热心网友提供的参考意见

这个问题是由于Flink作业图中存在循环依赖导致的。在Flink中,作业图是一个有向无环图(DAG),如果作业图中存在循环依赖,那么Flink将无法正确执行作业。

为了解决这个问题,您需要检查您的Flink作业代码,确保作业图中没有循环依赖。您可以使用Flink的拓扑排序功能来检查作业图是否存在循环依赖。以下是一个使用Java API进行拓扑排序的示例:

import org.apache.flink.api.common.JobVertex;
import org.apache.flink.graph.DefaultEdge;
import org.apache.flink.graph.Graph;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class TopologicalSortExample {
    public static void main(String[] args) throws Exception {
        // 创建一个作业图
        Graph<JobVertex, DefaultEdge> graph = new Graph<>();
        Map<JobVertex, Integer> inDegrees = new ConcurrentHashMap<>();

        // 添加作业顶点和边到作业图中
        // ...

        // 计算作业顶点的入度
        for (JobVertex vertex : graph.getVertices()) {
            inDegrees.put(vertex, graph.inDegreeOf(vertex));
        }

        // 拓扑排序
        List<JobVertex> sortedVertices = topologicalSort(graph, inDegrees);

        // 输出排序后的作业顶点
        for (JobVertex vertex : sortedVertices) {
            System.out.println(vertex.getName());
        }
    }

    private static <K, VV, EV> List<K> topologicalSort(Graph<K, EV> graph, Map<K, Integer> inDegrees) {
        List<K> result = new ArrayList<>();
        List<K> zeroInDegreeVertices = new ArrayList<>();

        // 初始化入度为0的顶点列表
        for (K vertex : graph.getVertices()) {
            if (inDegrees.get(vertex) == 0) {
                zeroInDegreeVertices.add(vertex);
            }
        }

        // 执行拓扑排序
        while (!zeroInDegreeVertices.isEmpty()) {
            K currentVertex = zeroInDegreeVertices.remove(0);
            result.add(currentVertex);

            // 更新相邻顶点的入度
            for (K neighbor : graph.getNeighborsOf(currentVertex)) {
                inDegrees.put(neighbor, inDegrees.get(neighbor) - 1);
                if (inDegrees.get(neighbor) == 0) {
                    zeroInDegreeVertices.add(neighbor);
                }
            }
        }

        return result;
    }
}

请根据您的实际作业代码修改上述示例,并确保作业图中没有循环依赖。

本文来自投稿,不代表新手站长_郑州云淘科技有限公司立场,如若转载,请注明出处:https://www.cnzhanzhang.com/19754.html

(0)
匿名
上一篇 2024年1月4日
下一篇 2024年1月4日

相关推荐

新手站长从“心”出发,感谢16年您始终不离不弃。