大家有遇到Flink这问题吗?在1.15.4里报错。Caused by: java.util.concurrent.CompletionException: org.apache.flink.api.common.InvalidProgramException: The job graph is cyclic.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
以下为热心网友提供的参考意见
这个问题是由于Flink作业图中存在循环依赖导致的。在Flink中,作业图是一个有向无环图(DAG),如果作业图中存在循环依赖,那么Flink将无法正确执行作业。
为了解决这个问题,您需要检查您的Flink作业代码,确保作业图中没有循环依赖。您可以使用Flink的拓扑排序功能来检查作业图是否存在循环依赖。以下是一个使用Java API进行拓扑排序的示例:
import org.apache.flink.api.common.JobVertex;
import org.apache.flink.graph.DefaultEdge;
import org.apache.flink.graph.Graph;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TopologicalSortExample {
public static void main(String[] args) throws Exception {
// 创建一个作业图
Graph<JobVertex, DefaultEdge> graph = new Graph<>();
Map<JobVertex, Integer> inDegrees = new ConcurrentHashMap<>();
// 添加作业顶点和边到作业图中
// ...
// 计算作业顶点的入度
for (JobVertex vertex : graph.getVertices()) {
inDegrees.put(vertex, graph.inDegreeOf(vertex));
}
// 拓扑排序
List<JobVertex> sortedVertices = topologicalSort(graph, inDegrees);
// 输出排序后的作业顶点
for (JobVertex vertex : sortedVertices) {
System.out.println(vertex.getName());
}
}
private static <K, VV, EV> List<K> topologicalSort(Graph<K, EV> graph, Map<K, Integer> inDegrees) {
List<K> result = new ArrayList<>();
List<K> zeroInDegreeVertices = new ArrayList<>();
// 初始化入度为0的顶点列表
for (K vertex : graph.getVertices()) {
if (inDegrees.get(vertex) == 0) {
zeroInDegreeVertices.add(vertex);
}
}
// 执行拓扑排序
while (!zeroInDegreeVertices.isEmpty()) {
K currentVertex = zeroInDegreeVertices.remove(0);
result.add(currentVertex);
// 更新相邻顶点的入度
for (K neighbor : graph.getNeighborsOf(currentVertex)) {
inDegrees.put(neighbor, inDegrees.get(neighbor) - 1);
if (inDegrees.get(neighbor) == 0) {
zeroInDegreeVertices.add(neighbor);
}
}
}
return result;
}
}
请根据您的实际作业代码修改上述示例,并确保作业图中没有循环依赖。
本文来自投稿,不代表新手站长_郑州云淘科技有限公司立场,如若转载,请注明出处:https://www.cnzhanzhang.com/19754.html