|
17 | 17 | // You should have received a copy of the GNU Affero General Public License
|
18 | 18 | // along with this program. If not, see <http://www.gnu.org/licenses/>.
|
19 | 19 |
|
| 20 | +use std::collections::BTreeMap; |
| 21 | + |
20 | 22 | use quickwit_proto::metastore::{MetastoreError, MetastoreResult};
|
21 |
| -use sqlx::migrate::Migrator; |
22 |
| -use sqlx::{Acquire, Postgres}; |
| 23 | +use sqlx::migrate::{Migrate, Migrator}; |
| 24 | +use sqlx::{Acquire, PgConnection, Postgres}; |
23 | 25 | use tracing::{error, instrument};
|
24 | 26 |
|
25 | 27 | use super::pool::TrackedPool;
|
26 | 28 |
|
27 |
| -static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql"); |
| 29 | +fn get_migrations() -> Migrator { |
| 30 | + sqlx::migrate!("migrations/postgresql") |
| 31 | +} |
28 | 32 |
|
29 | 33 | /// Initializes the database and runs the SQL migrations stored in the
|
30 | 34 | /// `quickwit-metastore/migrations` directory.
|
31 | 35 | #[instrument(skip_all)]
|
32 |
| -pub(super) async fn run_migrations(pool: &TrackedPool<Postgres>) -> MetastoreResult<()> { |
| 36 | +pub(super) async fn run_migrations( |
| 37 | + pool: &TrackedPool<Postgres>, |
| 38 | + skip_migrations: bool, |
| 39 | + skip_locking: bool, |
| 40 | +) -> MetastoreResult<()> { |
33 | 41 | let mut tx = pool.begin().await?;
|
34 | 42 | let conn = tx.acquire().await?;
|
35 |
| - // this is an hidden function, made to get "around the annoying "implementation of `Acquire` |
36 |
| - // is not general enough" error", which is the error we get otherwise. |
37 |
| - let migrate_result = MIGRATOR.run_direct(conn).await; |
38 | 43 |
|
39 |
| - let Err(migrate_error) = migrate_result else { |
40 |
| - tx.commit().await?; |
41 |
| - return Ok(()); |
| 44 | + let mut migrator = get_migrations(); |
| 45 | + |
| 46 | + if skip_locking { |
| 47 | + migrator.set_locking(false); |
| 48 | + } |
| 49 | + |
| 50 | + if !skip_migrations { |
| 51 | + // this is an hidden function, made to get "around the annoying "implementation of `Acquire` |
| 52 | + // is not general enough" error", which is the error we get otherwise. |
| 53 | + let migrate_result = migrator.run_direct(conn).await; |
| 54 | + |
| 55 | + let Err(migrate_error) = migrate_result else { |
| 56 | + tx.commit().await?; |
| 57 | + return Ok(()); |
| 58 | + }; |
| 59 | + tx.rollback().await?; |
| 60 | + error!(error=%migrate_error, "failed to run PostgreSQL migrations"); |
| 61 | + |
| 62 | + Err(MetastoreError::Internal { |
| 63 | + message: "failed to run PostgreSQL migrations".to_string(), |
| 64 | + cause: migrate_error.to_string(), |
| 65 | + }) |
| 66 | + } else { |
| 67 | + check_migrations(migrator, conn).await |
| 68 | + } |
| 69 | +} |
| 70 | + |
| 71 | +async fn check_migrations(migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { |
| 72 | + let dirty = match conn.dirty_version().await { |
| 73 | + Ok(dirty) => dirty, |
| 74 | + Err(migrate_error) => { |
| 75 | + error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); |
| 76 | + |
| 77 | + return Err(MetastoreError::Internal { |
| 78 | + message: "failed to validate PostgreSQL migrations".to_string(), |
| 79 | + cause: migrate_error.to_string(), |
| 80 | + }); |
| 81 | + } |
| 82 | + }; |
| 83 | + if let Some(dirty) = dirty { |
| 84 | + error!("migration {dirty} is dirty"); |
| 85 | + |
| 86 | + return Err(MetastoreError::Internal { |
| 87 | + message: "failed to validate PostgreSQL migrations".to_string(), |
| 88 | + cause: format!("migration {dirty} is dirty"), |
| 89 | + }); |
42 | 90 | };
|
43 |
| - tx.rollback().await?; |
44 |
| - error!(error=%migrate_error, "failed to run PostgreSQL migrations"); |
| 91 | + let applied_migrations = match conn.list_applied_migrations().await { |
| 92 | + Ok(applied_migrations) => applied_migrations, |
| 93 | + Err(migrate_error) => { |
| 94 | + error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); |
| 95 | + |
| 96 | + return Err(MetastoreError::Internal { |
| 97 | + message: "failed to validate PostgreSQL migrations".to_string(), |
| 98 | + cause: migrate_error.to_string(), |
| 99 | + }); |
| 100 | + } |
| 101 | + }; |
| 102 | + let expected_migrations: BTreeMap<_, _> = migrator |
| 103 | + .iter() |
| 104 | + .filter(|migration| migration.migration_type.is_up_migration()) |
| 105 | + .map(|migration| (migration.version, migration)) |
| 106 | + .collect(); |
| 107 | + if applied_migrations.len() < expected_migrations.len() { |
| 108 | + error!( |
| 109 | + "missing migrations, expected {} migrations, only {} present in database", |
| 110 | + expected_migrations.len(), |
| 111 | + applied_migrations.len() |
| 112 | + ); |
| 113 | + |
| 114 | + return Err(MetastoreError::Internal { |
| 115 | + message: "failed to validate PostgreSQL migrations".to_string(), |
| 116 | + cause: format!( |
| 117 | + "missing migrations, expected {} migrations, only {} present in database", |
| 118 | + expected_migrations.len(), |
| 119 | + applied_migrations.len() |
| 120 | + ), |
| 121 | + }); |
| 122 | + } |
| 123 | + for applied_migration in applied_migrations { |
| 124 | + let Some(migration) = expected_migrations.get(&applied_migration.version) else { |
| 125 | + error!( |
| 126 | + "found unknown migration {} in database", |
| 127 | + applied_migration.version |
| 128 | + ); |
| 129 | + |
| 130 | + return Err(MetastoreError::Internal { |
| 131 | + message: "failed to validate PostgreSQL migrations".to_string(), |
| 132 | + cause: format!( |
| 133 | + "found unknown migration {} in database", |
| 134 | + applied_migration.version |
| 135 | + ), |
| 136 | + }); |
| 137 | + }; |
| 138 | + if migration.checksum != applied_migration.checksum { |
| 139 | + error!( |
| 140 | + "migration {} differ between database and expected value", |
| 141 | + applied_migration.version |
| 142 | + ); |
| 143 | + |
| 144 | + return Err(MetastoreError::Internal { |
| 145 | + message: "failed to validate PostgreSQL migrations".to_string(), |
| 146 | + cause: format!( |
| 147 | + "migration {} differ between database and expected value", |
| 148 | + applied_migration.version |
| 149 | + ), |
| 150 | + }); |
| 151 | + } |
| 152 | + } |
| 153 | + Ok(()) |
| 154 | +} |
| 155 | + |
| 156 | +#[cfg(test)] |
| 157 | +mod tests { |
| 158 | + use std::time::Duration; |
| 159 | + |
| 160 | + use quickwit_common::uri::Uri; |
| 161 | + use sqlx::migrate::Migrate; |
| 162 | + use sqlx::Acquire; |
| 163 | + |
| 164 | + use super::{get_migrations, run_migrations}; |
| 165 | + use crate::metastore::postgres::utils::establish_connection; |
| 166 | + |
| 167 | + #[tokio::test] |
| 168 | + #[serial_test::file_serial] |
| 169 | + async fn test_metastore_check_migration() { |
| 170 | + let _ = tracing_subscriber::fmt::try_init(); |
| 171 | + |
| 172 | + dotenvy::dotenv().ok(); |
| 173 | + let uri: Uri = std::env::var("QW_TEST_DATABASE_URL") |
| 174 | + .expect("environment variable `QW_TEST_DATABASE_URL` should be set") |
| 175 | + .parse() |
| 176 | + .expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI"); |
| 177 | + |
| 178 | + { |
| 179 | + let connection_pool = |
| 180 | + establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, false) |
| 181 | + .await |
| 182 | + .unwrap(); |
| 183 | + // make sure migrations are run |
| 184 | + run_migrations(&connection_pool, false, false) |
| 185 | + .await |
| 186 | + .unwrap(); |
| 187 | + |
| 188 | + // we just ran migration, nothing else to run |
| 189 | + run_migrations(&connection_pool, true, false).await.unwrap(); |
| 190 | + |
| 191 | + let migrations = get_migrations(); |
| 192 | + let last_migration = migrations |
| 193 | + .iter() |
| 194 | + .map(|migration| migration.version) |
| 195 | + .max() |
| 196 | + .expect("no migration exists?"); |
| 197 | + let up_migration = migrations |
| 198 | + .iter() |
| 199 | + .find(|migration| { |
| 200 | + migration.version == last_migration |
| 201 | + && migration.migration_type.is_up_migration() |
| 202 | + }) |
| 203 | + .unwrap(); |
| 204 | + let down_migration = migrations |
| 205 | + .iter() |
| 206 | + .find(|migration| { |
| 207 | + migration.version == last_migration |
| 208 | + && migration.migration_type.is_down_migration() |
| 209 | + }) |
| 210 | + .unwrap(); |
| 211 | + let mut conn = connection_pool.acquire().await.unwrap(); |
| 212 | + |
| 213 | + conn.revert(down_migration).await.unwrap(); |
| 214 | + |
| 215 | + run_migrations(&connection_pool, true, false) |
| 216 | + .await |
| 217 | + .unwrap_err(); |
| 218 | + |
| 219 | + conn.apply(up_migration).await.unwrap(); |
| 220 | + } |
45 | 221 |
|
46 |
| - Err(MetastoreError::Internal { |
47 |
| - message: "failed to run PostgreSQL migrations".to_string(), |
48 |
| - cause: migrate_error.to_string(), |
49 |
| - }) |
| 222 | + { |
| 223 | + let connection_pool = |
| 224 | + establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, true) |
| 225 | + .await |
| 226 | + .unwrap(); |
| 227 | + // error because we are in read only mode, and we try to run migrations |
| 228 | + run_migrations(&connection_pool, false, false) |
| 229 | + .await |
| 230 | + .unwrap_err(); |
| 231 | + // okay because all migrations were already run before |
| 232 | + run_migrations(&connection_pool, true, false).await.unwrap(); |
| 233 | + } |
| 234 | + } |
50 | 235 | }
|
0 commit comments