55from collections import defaultdict
66from collections .abc import Iterable
77from dataclasses import dataclass , field
8+ from pathlib import Path
89
910from databricks .sdk import WorkspaceClient
1011from databricks .sdk .errors import DatabricksError
1112from databricks .sdk .service .jobs import Job , JobCluster , Task
1213
1314from databricks .labs .ucx .assessment .clusters import ClusterOwnership , ClusterInfo
1415from databricks .labs .ucx .assessment .jobs import JobOwnership , JobInfo
15- from databricks .labs .ucx .framework .owners import AdministratorLocator
16- from databricks .labs .ucx .source_code .graph import DependencyProblem
16+ from databricks .labs .ucx .framework .owners import AdministratorLocator , WorkspaceObjectOwnership
17+ from databricks .labs .ucx .source_code .graph import DependencyGraph , DependencyProblem
18+ from databricks .labs .ucx .source_code .path_lookup import PathLookup
1719
1820
1921@dataclass
@@ -39,6 +41,9 @@ class MigrationStep:
3941 required_step_ids : list [int ]
4042 """The step ids that should be completed before this step is started."""
4143
44+ @property
45+ def key (self ) -> tuple [str , str ]:
46+ return self .object_type , self .object_id
4247
4348MigrationNodeKey = tuple [str , str ]
4449
@@ -148,8 +153,9 @@ class MigrationSequencer:
148153 Analysing the graph in this case means: computing the migration sequence in `meth:generate_steps`.
149154 """
150155
151- def __init__ (self , ws : WorkspaceClient , administrator_locator : AdministratorLocator ):
156+ def __init__ (self , ws : WorkspaceClient , path_lookup : PathLookup , administrator_locator : AdministratorLocator ):
152157 self ._ws = ws
158+ self ._path_lookup = path_lookup
153159 self ._admin_locator = administrator_locator
154160 self ._counter = itertools .count ()
155161 self ._nodes : dict [MigrationNodeKey , MigrationNode ] = {}
@@ -220,7 +226,7 @@ def _register_job(self, job: Job) -> MaybeMigrationNode:
220226 problems .append (problem )
221227 return MaybeMigrationNode (job_node , problems )
222228
223- def _register_workflow_task (self , task : Task , parent : MigrationNode ) -> MaybeMigrationNode :
229+ def _register_workflow_task (self , task : Task , parent : MigrationNode , graph : DependencyGraph ) -> MaybeMigrationNode :
224230 """Register a workflow task.
225231
226232 TODO:
@@ -262,8 +268,51 @@ def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMig
262268 else :
263269 problem = DependencyProblem ('cluster-not-found' , f"Could not find cluster: { task .job_cluster_key } " )
264270 problems .append (problem )
271+ graph .visit (self ._visit_dependency , None )
265272 return MaybeMigrationNode (task_node , problems )
266273
274+ def _visit_dependency (self , graph : DependencyGraph ) -> bool | None :
275+ lineage = graph .dependency .lineage [- 1 ]
276+ parent_node = self ._nodes [(lineage .object_type , lineage .object_id )]
277+ for dependency in graph .local_dependencies :
278+ lineage = dependency .lineage [- 1 ]
279+ self .register_dependency (parent_node , lineage .object_type , lineage .object_id )
280+ # TODO tables and dfsas
281+ return False
282+
283+ def register_dependency (self , parent_node : MigrationNode , object_type : str , object_id : str ) -> MigrationNode :
284+ dependency_node = self ._nodes .get ((object_type , object_id ), None )
285+ if not dependency_node :
286+ dependency_node = self ._create_dependency_node (object_type , object_id )
287+ if parent_node :
288+ self ._incoming [parent_node .key ].add (dependency_node .key )
289+ self ._outgoing [dependency_node .key ].add (parent_node .key )
290+ return dependency_node
291+
292+ def _create_dependency_node (self , object_type : str , object_id : str ) -> MigrationNode :
293+ object_name : str = "<ANONYMOUS>"
294+ _object_owner : str = "<UNKNOWN>"
295+ if object_type in {"NOTEBOOK" , "FILE" }:
296+ path = Path (object_id )
297+ for library_root in self ._path_lookup .library_roots :
298+ if not path .is_relative_to (library_root ):
299+ continue
300+ object_name = path .relative_to (library_root ).as_posix ()
301+ break
302+ object_owner = WorkspaceObjectOwnership (self ._admin_locator ).owner_of ((object_type , object_id ))
303+ else :
304+ raise ValueError (f"{ object_type } not supported yet!" )
305+ self ._last_node_id += 1
306+ dependency_node = MigrationNode (
307+ node_id = self ._last_node_id ,
308+ object_type = object_type ,
309+ object_id = object_id ,
310+ object_name = object_name ,
311+ object_owner = object_owner ,
312+ )
313+ self ._nodes [dependency_node .key ] = dependency_node
314+ return dependency_node
315+
267316 def _register_job_cluster (self , cluster : JobCluster , parent : MigrationNode ) -> MaybeMigrationNode :
268317 """Register a job cluster.
269318
@@ -322,6 +371,16 @@ def generate_steps(self) -> Iterable[MigrationStep]:
322371 leaf during processing)
323372 - We handle cyclic dependencies (implemented in PR #3009)
324373 """
374+ """The below algo is adapted from Kahn's topological sort.
375+ The main differences are as follows:
376+ 1) we want the same step number for all nodes with same dependency depth
377+ so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed
378+ (these are transient 'leaf' nodes i.e. they only become 'leaf' during processing)
379+ 2) Kahn only supports DAGs but python code allows cyclic dependencies i.e. A -> B -> C -> A is not a DAG
380+ so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order
381+ to avoid an infinite loop. We also avoid side effects (such as negative counts).
382+ This algo works correctly for simple cases, but is not tested on large trees.
383+ """
325384 ordered_steps : list [MigrationStep ] = []
326385 # For updating the priority of steps that depend on other steps
327386 incoming_references = self ._invert_outgoing_to_incoming_references ()
0 commit comments