Skip to content

Commit 80b47b8

Browse files
authored
chore: [VRD-1075] Update orchestrator for new pipeline defn (#4013)
1 parent 3ea0347 commit 80b47b8

File tree

3 files changed

+22
-23
lines changed

3 files changed

+22
-23
lines changed

client/verta/tests/pipeline/test_orchestrator.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,21 +72,20 @@ def test_run_and_logs(self, registered_model):
7272
triple_model_ver = create_standard_model(Triple)
7373
sum_model_ver = create_standard_model(Sum)
7474

75-
pipeline_spec = {
76-
"pipeline_name": "diamond-pipeline",
75+
pipeline_defn = {
7776
"steps": [
7877
{"name": "echo", "model_version_id": echo_model_ver.id},
7978
{"name": "double", "model_version_id": double_model_ver.id},
8079
{"name": "triple", "model_version_id": triple_model_ver.id},
8180
{"name": "sum", "model_version_id": sum_model_ver.id},
8281
],
8382
"graph": [
84-
{"name": "double", "inputs": ["echo"]},
85-
{"name": "triple", "inputs": ["echo"]},
86-
{"name": "sum", "inputs": ["double", "triple"]},
83+
{"name": "double", "predecessors": ["echo"]},
84+
{"name": "triple", "predecessors": ["echo"]},
85+
{"name": "sum", "predecessors": ["double", "triple"]},
8786
],
8887
}
89-
orchestrator = LocalOrchestrator(registered_model._conn, pipeline_spec)
88+
orchestrator = LocalOrchestrator(registered_model._conn, pipeline_defn)
9089

9190
input = 3
9291
with runtime.context() as ctx:

client/verta/verta/__about__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66
__maintainer_email__ = "[email protected]"
77
__title__ = "verta"
88
__url__ = "https://www.verta.ai/"
9-
__version__ = "0.24.0"
9+
__version__ = "0.24.1a0"

client/verta/verta/_pipeline_orchestrator/_orchestrator.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ class _OrchestratorBase(abc.ABC):
1717

1818
def __init__(
1919
self,
20-
pipeline_spec: Dict[str, Any],
20+
pipeline_defn: Dict[str, Any],
2121
step_handlers: Dict[str, _StepHandlerBase],
2222
):
23-
self._pipeline_spec = pipeline_spec
23+
self._pipeline_defn = pipeline_defn
2424
self._step_handlers = step_handlers
2525

2626
# DAG nodes are step names
@@ -30,13 +30,13 @@ def __init__(
3030

3131
@staticmethod
3232
def _get_predecessors_mapping(
33-
pipeline_spec: Dict[str, Any],
33+
pipeline_defn: Dict[str, Any],
3434
) -> Dict[str, List[str]]:
3535
"""Get the names of steps' predecessors from a pipeline specification.
3636
3737
Parameters
3838
----------
39-
pipeline_spec : dict
39+
pipeline_defn : dict
4040
Pipeline specification.
4141
4242
Returns
@@ -45,14 +45,14 @@ def _get_predecessors_mapping(
4545
Mapping from step names to their predecessors' names.
4646
4747
"""
48-
return {node["name"]: node["inputs"] for node in pipeline_spec["graph"]}
48+
return {node["name"]: node["predecessors"] for node in pipeline_defn["graph"]}
4949

5050
def _prepare_pipeline(self):
5151
"""Initialize ``self._dag`` and ``self._outputs``.
5252
5353
Parameters
5454
----------
55-
pipeline_spec : dict
55+
pipeline_defn : dict
5656
Pipeline specification.
5757
5858
Raises
@@ -61,7 +61,7 @@ def _prepare_pipeline(self):
6161
If the pipeline graph has cycles.
6262
6363
"""
64-
dag = TopologicalSorter(self._get_predecessors_mapping(self._pipeline_spec))
64+
dag = TopologicalSorter(self._get_predecessors_mapping(self._pipeline_defn))
6565
dag.prepare()
6666
# TODO: assert one input node
6767
# TODO: assert one output node
@@ -201,7 +201,7 @@ class LocalOrchestrator(_OrchestratorBase):
201201
----------
202202
conn : :class:`~verta._internal_utils._utils.Connection`
203203
Verta client connection.
204-
pipeline_spec : dict
204+
pipeline_defn : dict
205205
Pipeline specification.
206206
207207
Examples
@@ -210,34 +210,34 @@ class LocalOrchestrator(_OrchestratorBase):
210210
211211
from verta._pipeline_orchestrator import LocalOrchestrator
212212
213-
orchestrator = LocalOrchestrator(client._conn, pipeline_spec)
213+
orchestrator = LocalOrchestrator(client._conn, pipeline_defn)
214214
pipeline_output = orchestrator.run(pipeline_input)
215215
216216
"""
217217

218218
def __init__(
219219
self,
220220
conn: _utils.Connection,
221-
pipeline_spec: Dict[str, Any],
221+
pipeline_defn: Dict[str, Any],
222222
):
223223
super().__init__(
224-
pipeline_spec=pipeline_spec,
225-
step_handlers=self._init_step_handlers(conn, pipeline_spec),
224+
pipeline_defn=pipeline_defn,
225+
step_handlers=self._init_step_handlers(conn, pipeline_defn),
226226
)
227227

228228
@classmethod
229229
def _init_step_handlers(
230230
cls,
231231
conn: _utils.Connection,
232-
pipeline_spec: Dict[str, Any],
232+
pipeline_defn: Dict[str, Any],
233233
) -> Dict[str, ModelObjectStepHandler]:
234234
"""Instantiate and return step handlers.
235235
236236
Parameters
237237
----------
238238
conn : :class:`~verta._internal_utils._utils.Connection`
239239
Verta client connection.
240-
pipeline_spec : dict
240+
pipeline_defn : dict
241241
Pipeline specification.
242242
243243
Returns
@@ -246,10 +246,10 @@ def _init_step_handlers(
246246
Mapping of step names to their handlers.
247247
248248
"""
249-
predecessors_mapping = cls._get_predecessors_mapping(pipeline_spec)
249+
predecessors_mapping = cls._get_predecessors_mapping(pipeline_defn)
250250

251251
step_handlers = dict()
252-
for step in pipeline_spec["steps"]:
252+
for step in pipeline_defn["steps"]:
253253
step_handlers[step["name"]] = ModelObjectStepHandler(
254254
name=step["name"],
255255
predecessors=predecessors_mapping.get(step["name"], []),

0 commit comments

Comments
 (0)