|
| 1 | +import atexit |
| 2 | +import json |
1 | 3 | import logging |
2 | 4 | import random |
| 5 | +import tempfile |
3 | 6 | import time |
4 | | -from multiprocessing import Manager |
| 7 | +from pathlib import Path |
| 8 | +from typing import Optional |
5 | 9 |
|
6 | 10 | import pytest |
7 | 11 | from testcontainers.keycloak import KeycloakContainer |
|
14 | 18 | logger = logging.getLogger(__name__) |
15 | 19 | logger.setLevel(logging.INFO) |
16 | 20 |
|
17 | | -shared_state = Manager().dict() |
| 21 | +# Shared Keycloak state |
| 22 | +_keycloak_container: Optional[KeycloakContainer] = None |
| 23 | +_keycloak_info_file = Path(tempfile.gettempdir()) / "feast_keycloak_info.json" |
| 24 | + |
| 25 | + |
| 26 | +def _is_keycloak_healthy(url: str) -> bool: |
| 27 | + """Health check for Keycloak.""" |
| 28 | + try: |
| 29 | + import requests |
| 30 | + |
| 31 | + response = requests.get(f"{url}/health/ready", timeout=3) |
| 32 | + return response.status_code == 200 |
| 33 | + except Exception: |
| 34 | + try: |
| 35 | + import requests |
| 36 | + |
| 37 | + response = requests.get(f"{url}/auth/realms/master", timeout=3) |
| 38 | + return response.status_code in [200, 404] |
| 39 | + except Exception: |
| 40 | + return False |
| 41 | + |
| 42 | + |
| 43 | +def _get_shared_keycloak_url() -> Optional[str]: |
| 44 | + """Get URL of existing Keycloak instance if available.""" |
| 45 | + try: |
| 46 | + if _keycloak_info_file.exists(): |
| 47 | + with open(_keycloak_info_file, "r") as f: |
| 48 | + info = json.load(f) |
| 49 | + |
| 50 | + url = info.get("url") |
| 51 | + if url and _is_keycloak_healthy(url): |
| 52 | + return url |
| 53 | + else: |
| 54 | + _keycloak_info_file.unlink() |
| 55 | + except Exception as e: |
| 56 | + logger.debug(f"Error reading Keycloak info: {e}") |
| 57 | + try: |
| 58 | + _keycloak_info_file.unlink() |
| 59 | + except Exception: |
| 60 | + pass |
| 61 | + return None |
| 62 | + |
| 63 | + |
| 64 | +def _save_keycloak_info(url: str): |
| 65 | + """Save Keycloak info to shared file.""" |
| 66 | + try: |
| 67 | + info = {"url": url, "timestamp": time.time()} |
| 68 | + with open(_keycloak_info_file, "w") as f: |
| 69 | + json.dump(info, f) |
| 70 | + except Exception as e: |
| 71 | + logger.warning(f"Failed to save Keycloak info: {e}") |
| 72 | + |
| 73 | + |
| 74 | +def _cleanup_keycloak(): |
| 75 | + """Cleanup Keycloak container on exit.""" |
| 76 | + global _keycloak_container |
| 77 | + if _keycloak_container: |
| 78 | + try: |
| 79 | + logger.info("Stopping Keycloak container") |
| 80 | + _keycloak_container.stop() |
| 81 | + except Exception as e: |
| 82 | + logger.warning(f"Error stopping Keycloak: {e}") |
| 83 | + finally: |
| 84 | + _keycloak_container = None |
| 85 | + try: |
| 86 | + _keycloak_info_file.unlink() |
| 87 | + except Exception: |
| 88 | + pass |
18 | 89 |
|
19 | 90 |
|
20 | 91 | @pytest.fixture(scope="session") |
21 | 92 | def start_keycloak_server(): |
22 | | - # Add random sleep between 0 and 2 before checking the state to avoid concurrency issues. |
23 | | - random_sleep_time = random.uniform(0, 2) |
24 | | - time.sleep(random_sleep_time) |
25 | | - |
26 | | - # If the Keycloak instance is already started (in any worker), reuse it |
27 | | - if shared_state.get("keycloak_started", False): |
28 | | - return shared_state["keycloak_url"] |
29 | | - logger.info("Starting keycloak instance") |
30 | | - with KeycloakContainer("quay.io/keycloak/keycloak:24.0.1") as keycloak_container: |
31 | | - setup_permissions_on_keycloak(keycloak_container.get_client()) |
32 | | - shared_state["keycloak_started"] = True |
33 | | - shared_state["keycloak_url"] = keycloak_container.get_url() |
34 | | - yield shared_state["keycloak_url"] |
35 | | - |
36 | | - # After the fixture is done, cleanup the shared state |
37 | | - del shared_state["keycloak_started"] |
38 | | - del shared_state["keycloak_url"] |
| 93 | + global _keycloak_container |
| 94 | + |
| 95 | + existing_url = _get_shared_keycloak_url() |
| 96 | + if existing_url: |
| 97 | + logger.info(f"Reusing existing Keycloak at {existing_url}") |
| 98 | + yield existing_url |
| 99 | + return |
| 100 | + |
| 101 | + time.sleep(random.uniform(0, 0.5)) |
| 102 | + |
| 103 | + existing_url = _get_shared_keycloak_url() |
| 104 | + if existing_url: |
| 105 | + logger.info(f"Found Keycloak started by another process: {existing_url}") |
| 106 | + yield existing_url |
| 107 | + return |
| 108 | + |
| 109 | + try: |
| 110 | + logger.info("Starting new Keycloak instance") |
| 111 | + _keycloak_container = KeycloakContainer("quay.io/keycloak/keycloak:24.0.1") |
| 112 | + _keycloak_container.start() |
| 113 | + |
| 114 | + setup_permissions_on_keycloak(_keycloak_container.get_client()) |
| 115 | + |
| 116 | + keycloak_url = _keycloak_container.get_url() |
| 117 | + |
| 118 | + _save_keycloak_info(keycloak_url) |
| 119 | + atexit.register(_cleanup_keycloak) |
| 120 | + |
| 121 | + logger.info(f"Keycloak ready at {keycloak_url}") |
| 122 | + yield keycloak_url |
| 123 | + |
| 124 | + except Exception as e: |
| 125 | + logger.error(f"Failed to start Keycloak: {e}") |
| 126 | + if _keycloak_container: |
| 127 | + try: |
| 128 | + _keycloak_container.stop() |
| 129 | + except Exception: |
| 130 | + pass |
| 131 | + _keycloak_container = None |
| 132 | + raise |
39 | 133 |
|
40 | 134 |
|
41 | 135 | @pytest.fixture(scope="session") |
|
0 commit comments