Skip to content

patching command framework infrahub db patch plan/apply/revert #6311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
move call order around
  • Loading branch information
ajtmccarty committed Apr 23, 2025
commit fc01260f65cf422d1b3b1167369f076f5398ae76
2 changes: 1 addition & 1 deletion backend/infrahub/patch/edge_adder.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def _run_add_query(self, edge_type: str, edges_to_add: list[EdgeToAdd]) ->
"id_func_name": self.db.get_id_function_name(),
}
edges_to_add_dicts = [asdict(v) for v in edges_to_add]
results, _ = await self.db.execute_query_with_metadata(
results = await self.db.execute_query(
query=query, params={"edges_to_add": edges_to_add_dicts}, type=QueryType.WRITE
)
abstract_to_concrete_id_map: dict[str, str] = {}
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/patch/edge_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def _run_update_query(self, edges_to_update: list[EdgeToUpdate]) -> None:
WHERE %(id_func_name)s(e) = edge_to_update.db_id
SET e = edge_to_update.after_props
""" % {"id_func_name": self.db.get_id_function_name()}
await self.db.execute_query_with_metadata(
await self.db.execute_query(
query=query, params={"edges_to_update": [asdict(e) for e in edges_to_update]}, type=QueryType.WRITE
)

Expand Down
32 changes: 16 additions & 16 deletions backend/infrahub/patch/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ async def prepare_plan(self, patch_query: PatchQuery, directory: Path) -> Path:
async def apply(self, patch_plan_directory: Path) -> PatchPlan:
patch_plan = self.plan_reader.read(patch_plan_directory)
await self._apply_vertices_to_add(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
await self._apply_vertices_to_delete(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
await self._apply_edges_to_add(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
if patch_plan.vertices_to_update:
await self.vertex_updater.execute(vertices_to_update=patch_plan.vertices_to_update)
await self._apply_edges_to_add(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
await self._apply_edges_to_delete(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
await self._apply_vertices_to_delete(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
if patch_plan.edges_to_update:
await self.edge_updater.execute(edges_to_update=patch_plan.edges_to_update)
return patch_plan
Expand Down Expand Up @@ -116,22 +116,10 @@ async def _apply_edges_to_delete(self, patch_plan: PatchPlan, patch_plan_directo
async def revert(self, patch_plan_directory: Path) -> PatchPlan:
"""Invert the PatchPlan to create the complement of every added/updated/deleted element and undo them"""
patch_plan = self.plan_reader.read(patch_plan_directory)
await self._revert_added_edges(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
await self._revert_deleted_vertices(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
await self._revert_deleted_edges(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
edges_to_update = []
for edge_update_to_revert in patch_plan.edges_to_update:
edges_to_update.append(
EdgeToUpdate(
db_id=edge_update_to_revert.db_id,
before_props=edge_update_to_revert.after_props,
after_props=edge_update_to_revert.before_props,
)
)
if edges_to_update:
await self.edge_updater.execute(edges_to_update=edges_to_update)

await self._revert_added_edges(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
await self._revert_added_vertices(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
await self._revert_deleted_vertices(patch_plan=patch_plan, patch_plan_directory=patch_plan_directory)
vertices_to_update = []
for vertex_update_to_revert in patch_plan.vertices_to_update:
Copy link
Contributor

@LucasG0 LucasG0 Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list comprehension could be used here, and on similar patterns below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion. I've made the change and it is more readable this way

vertices_to_update.append(
Expand All @@ -143,6 +131,18 @@ async def revert(self, patch_plan_directory: Path) -> PatchPlan:
)
if vertices_to_update:
await self.vertex_updater.execute(vertices_to_update=vertices_to_update)

edges_to_update = []
for edge_update_to_revert in patch_plan.edges_to_update:
edges_to_update.append(
EdgeToUpdate(
db_id=edge_update_to_revert.db_id,
before_props=edge_update_to_revert.after_props,
after_props=edge_update_to_revert.before_props,
)
)
if edges_to_update:
await self.edge_updater.execute(edges_to_update=edges_to_update)
return patch_plan

async def _revert_added_vertices(self, patch_plan: PatchPlan, patch_plan_directory: Path) -> None:
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/patch/vertex_adder.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def _run_add_query(self, labels: list[str], vertices_to_add: list[VertexTo
"cypher_variable_map": cypher_variable_map,
"id_func_name": self.db.get_id_function_name(),
}
results, _ = await self.db.execute_query_with_metadata(
results = await self.db.execute_query(
Copy link
Contributor

@LucasG0 LucasG0 Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this query is executed with 100 vertices to add, if for some reason it fails while only 60 vertices were created, would this function raise an error, implying these 60 created vertices ids would not later be added to the list of created vertices? If so, running this query in a transaction that would be rollback in case of failure would fix this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very good question. thanks for reading through and really understanding what is going on here. I think that you are right and a transaction will be helpful here, so I've added it (and in the same place in PatchPlanEdgeAdder). I think there is still a possible failure point where the data written to the database will not match what we write to the file (say if the commit part of the transaction fails), but that failure space is much smaller now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the transaction commit fails, my understanding is that an error would be raised so nothing would be written to the file?

query=query, params={"vertices_to_add": serial_vertices_to_add}, type=QueryType.WRITE
)
abstract_to_concrete_id_map: dict[str, str] = {}
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/patch/vertex_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def _run_update_query(self, vertices_to_update: list[VertexToUpdate]) -> N
WHERE %(id_func_name)s(n) = vertex_to_update.db_id
SET n = vertex_to_update.after_props
""" % {"id_func_name": self.db.get_id_function_name()}
await self.db.execute_query_with_metadata(
await self.db.execute_query(
query=query, params={"vertices_to_update": [asdict(v) for v in vertices_to_update]}, type=QueryType.WRITE
)

Expand Down
Loading