Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@
write_pid_to_pidfile,
)
from airflow.providers.edge3.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.utils import timezone
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS, timezone
from airflow.utils.net import getfqdn
from airflow.utils.state import TaskInstanceState

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
from airflow.providers.edge3.models.edge_job import EdgeJobModel
from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, reset_metrics
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.edge3.version_compat import AIRFLOW_V_3_0_PLUS, timezone
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.db import DBLocks, create_global_lock
from airflow.utils.session import NEW_SESSION, provide_session

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow.models.base import Base, StringID
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils import timezone
from airflow.providers.edge3.version_compat import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import UtcDateTime

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

from airflow.exceptions import AirflowException
from airflow.models.base import Base
from airflow.providers.edge3.version_compat import timezone
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down
11 changes: 11 additions & 0 deletions providers/edge3/src/airflow/providers/edge3/version_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,14 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:

AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import timezone
else:
from airflow.utils import timezone # type: ignore[no-redef,attr-defined]

__all__ = [
"AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_1_PLUS",
"timezone",
]
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from sqlalchemy import select, update

from airflow.providers.edge3.models.edge_job import EdgeJobModel
from airflow.providers.edge3.version_compat import timezone
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
from airflow.providers.edge3.worker_api.datamodels import (
EdgeJobFetched,
Expand All @@ -38,7 +39,6 @@
status,
)
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.sqlalchemy import with_row_locks
from airflow.utils.state import TaskInstanceState

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from sqlalchemy import select

from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState, set_metrics
from airflow.providers.edge3.version_compat import timezone
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
from airflow.providers.edge3.worker_api.datamodels import (
WorkerQueueUpdateBody,
Expand All @@ -41,7 +42,6 @@
status,
)
from airflow.stats import Stats
from airflow.utils import timezone

worker_router = AirflowRouter(
tags=["Worker"],
Expand Down
2 changes: 1 addition & 1 deletion providers/edge3/tests/unit/edge3/cli/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
EdgeWorkerState,
EdgeWorkerVersionException,
)
from airflow.providers.edge3.version_compat import timezone
from airflow.providers.edge3.worker_api.datamodels import (
EdgeJobFetched,
WorkerRegistrationReturn,
WorkerSetStateReturn,
)
from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState

from tests_common.test_utils.config import conf_vars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@
from airflow.providers.edge3.executors.edge_executor import EdgeExecutor
from airflow.providers.edge3.models.edge_job import EdgeJobModel
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState

try:
from airflow.sdk import timezone
except ImportError:
from airflow.utils import timezone # type: ignore[attr-defined,no-redef]
from airflow.providers.edge3.version_compat import timezone
from airflow.utils.session import create_session
from airflow.utils.state import TaskInstanceState

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import pytest

from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
from airflow.providers.edge3.version_compat import timezone
from airflow.providers.edge3.worker_api.datamodels import PushLogsBody
from airflow.providers.edge3.worker_api.routes.logs import logfile_path, push_logs
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils import timezone
from airflow.utils.session import create_session

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from airflow.providers.edge3.cli.worker import EdgeWorker
from airflow.providers.edge3.models.edge_worker import EdgeWorkerModel, EdgeWorkerState
from airflow.providers.edge3.version_compat import timezone
from airflow.providers.edge3.worker_api.datamodels import WorkerQueueUpdateBody, WorkerStateBody
from airflow.providers.edge3.worker_api.routes._v2_compat import HTTPException
from airflow.providers.edge3.worker_api.routes.worker import (
Expand All @@ -31,7 +32,6 @@
set_state,
update_queues,
)
from airflow.utils import timezone

if TYPE_CHECKING:
from sqlalchemy.orm import Session
Expand Down
8 changes: 5 additions & 3 deletions scripts/ci/prek/check_airflow_v_imports_in_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# ]
# ///
"""
Check that AIRFLOW_V_X_Y_PLUS constants are only imported from test_utils in provider tests.
Check that AIRFLOW_V_X_Y_PLUS constants are only imported from tests_common.test_utils in provider tests.
"""

from __future__ import annotations
Expand All @@ -48,7 +48,7 @@ def check_airflow_v_imports_and_fix(test_file: Path) -> list[str]:
console.print("Found AIRFLOW_V_*_PLUS import in test file:", test_file)
if node.module != "tests_common.test_utils.version_compat":
errors.append(
f"{test_file}: AIRFLOW_V_*_PLUS should only be imported from tests.test_utils.version_compat, "
f"{test_file}: AIRFLOW_V_*_PLUS should only be imported from tests_common.test_utils.version_compat, "
f"but found import from '{node.module}'"
)
# Replace the import line
Expand Down Expand Up @@ -79,7 +79,9 @@ def main():
console.print(f"[red]{err}")
console.print("\n[red]Some AIRFLOW_V_*_PLUS imports were incorrect![/]")
sys.exit(1)
console.print("[green]All AIRFLOW_V_*_PLUS imports in tests are from tests.test_utils.version_compat.")
console.print(
"[green]All AIRFLOW_V_*_PLUS imports in tests are from tests_common.test_utils.version_compat."
)


if __name__ == "__main__":
Expand Down
Loading