Your resource for web content, online publishing
and the distribution of digital products.
S M T W T F S
 
 
 
 
 
 
1
 
2
 
3
 
4
 
5
 
6
 
7
 
8
 
9
 
10
 
11
 
12
 
13
 
14
 
15
 
16
 
17
 
18
 
19
 
20
 
21
 
22
 
23
 
24
 
25
 
26
 
27
 
28
 
 

Dolphinscheduler DAG Core Source Code Analysis

DATE POSTED:December 7, 2024

\

Background

org.apache.dolphinscheduler.common.graph.DAG

\ \ Note: In Dolphinscheduler, offline tasks have a complete lifecycle, such as stopping, pausing, resuming from pause, rerunning, etc., all are organized in the form of DAG (Directed Acyclic Graph) for T+1 offline tasks.

Dolphinscheduler DAG Implementation

org.apache.dolphinscheduler.common.graph.DAG

Three important data structures of DAG:

// Vertex information private final Map nodesMap; // Edge association information, which records the relationship between vertices and edges, allowing to find leaf nodes and downstream nodes private final Map> edgesMap; // Reverse edge association information, which allows for quick finding of nodes with an in-degree of 0 (starting nodes), and also to obtain upstream nodes private final Map> reverseEdgesMap;

\ Example below:

DAG graph = new DAG<>(); graph.addNode("A", "A"); graph.addNode("B", "B"); graph.addNode("C", "C"); // Add an edge from B to C, A is still floating graph.addEdge("B", "C"); // If you add A -> B, it actually starts from B and checks if there is a connectable line to A. If there is, it means the A -> B edge cannot be added because it would form a cycle; otherwise, it can be added. graph.addEdge("A", "B");

\ Source code analysis: org.apache.dolphinscheduler.common.graph.DAG#addEdge

public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) { lock.writeLock().lock(); try { // TODO Whether the edge can be added if (!isLegalAddEdge(fromNode, toNode, createNode)) { log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode); return false; } // TODO Add nodes addNodeIfAbsent(fromNode, null); addNodeIfAbsent(toNode, null); // TODO Add edges addEdge(fromNode, toNode, edge, edgesMap); addEdge(toNode, fromNode, edge, reverseEdgesMap); return true; } finally { lock.writeLock().unlock(); } } private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) { // TODO If fromNode and toNode are the same vertex, this edge cannot be added if (fromNode.equals(toNode)) { log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode); return false; } // TODO If not creating a node, meaning fromNode and toNode must be existing vertices if (!createNode) { if (!containsNode(fromNode) || !containsNode(toNode)) { log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode); return false; } } // Whether an edge can be successfully added(fromNode -> toNode), need to determine whether the // DAG has a cycle! // TODO Get the number of nodes int verticesCount = getNodesCount(); Queue queue = new LinkedList<>(); // TODO Put toNode into the queue queue.add(toNode); // If DAG doesn't find fromNode, it's not a cycle! // TODO When the queue is not empty, it is definitely not empty here while (!queue.isEmpty() && (--verticesCount > 0)) { // TODO Get the element in the queue Node key = queue.poll(); for (Node subsequentNode : getSubsequentNodes(key)) { // TODO Actually, it is judged that if A -> B has a connection in the DAG, and the node B is passed in, to see if B's edge has A. If there is A, it means there is already a B -> A association, and it cannot be added. If, for example, B's downstream node is A -> B -> C, then B's downstream node is C, and C needs to be put into the queue // TODO The core idea is to find the connection of the target node to be added, whether there is a connection from the target node to the source node (to judge whether there is a cycle) if (subsequentNode.equals(fromNode)) { return false; } queue.add(subsequentNode); } } return true; } Dolphinscheduler DagHelper Explanation

The DAG class is a basic general-purpose DAG tool class, and DagHelper is a business tool class that assembles task definitions and relationships between task definitions into a DAG.

\ org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph

public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception { // TODO Here is actually to get the number of tasks and their relationships corresponding to the process instance List processTaskRelations = processService.findRelationByCode( workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion()); // TODO Get the corresponding task definition log List taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations); // TODO Get TaskNode List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); // generate process to get DAG info // TODO Here is to parse whether the start node list is manually specified, which is not by default List recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam()); // TODO If the default startNodeNameList is empty List startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam()); // TODO Build a ProcessDag object instance ProcessDag processDag = DagHelper.generateFlowDag( taskNodeList, startNodeNameList, recoveryTaskNodeCodeList, workflowInstance.getTaskDependType()); if (processDag == null) { log.error("ProcessDag is null"); throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null"); } // TODO Generate DAG DAG dagGraph = DagHelper.buildDagGraph(processDag); log.debug("Build dag success, dag: {}", dagGraph); // TODO Use WorkflowGraph to encapsulate the task node list and dagGraph return new WorkflowGraph(taskNodeList, dagGraph); }

\ org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag

public static ProcessDag generateFlowDag( List totalTaskNodeList, List startNodeNameList, List recoveryNodeCodeList, TaskDependType depNodeType) throws Exception { // TODO Actually, it is to get all nodes List destTaskNodeList = generateFlowNodeListByStartNode( totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType); if (destTaskNodeList.isEmpty()) { return null; } // TODO Get the relationship between task nodes List taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList); // TODO Actually, it is to instantiate a ProcessDag ProcessDag processDag = new ProcessDag(); // TODO Set the edges of DAG processDag.setEdges(taskNodeRelations); // TODO Set the vertices of DAG processDag.setNodes(destTaskNodeList); return processDag; }

\ Set destTaskNodeList and taskNodeRelations

org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph

public static DAG buildDagGraph(ProcessDag processDag) { DAG dag = new DAG<>(); // TODO Add vertices if (CollectionUtils.isNotEmpty(processDag.getNodes())) { for (TaskNode node : processDag.getNodes()) { dag.addNode(node.getCode(), node); } } // TODO Add edges if (CollectionUtils.isNotEmpty(processDag.getEdges())) { for (TaskNodeRelation edge : processDag.getEdges()) { dag.addEdge(edge.getStartNode(), edge.getEndNode()); } } return dag; }

\