syn2mas/mas_writer/
constraint_pausing.rs1use std::time::Instant;
7
8use sqlx::PgConnection;
9use tracing::{debug, info};
10
11use super::{Error, IntoDatabase};
12
13pub struct ConstraintDescription {
15 pub name: String,
16 pub table_name: String,
17 pub definition: String,
18}
19
20pub struct IndexDescription {
21 pub name: String,
22 pub table_name: String,
23 pub definition: String,
24}
25
26pub async fn describe_constraints_on_table(
28 conn: &mut PgConnection,
29 table_name: &str,
30) -> Result<Vec<ConstraintDescription>, Error> {
31 sqlx::query_as!(
32 ConstraintDescription,
33 r#"
34 SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!"
35 FROM pg_constraint c
36 JOIN pg_namespace n ON n.oid = c.connamespace
37 WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1
38 AND n.nspname = current_schema;
39 "#,
40 table_name
41 ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read constraint definitions of {table_name}"))
42}
43
44pub async fn describe_foreign_key_constraints_to_table(
47 conn: &mut PgConnection,
48 target_table_name: &str,
49) -> Result<Vec<ConstraintDescription>, Error> {
50 sqlx::query_as!(
51 ConstraintDescription,
52 r#"
53 SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!"
54 FROM pg_constraint c
55 JOIN pg_namespace n ON n.oid = c.connamespace
56 WHERE contype = 'f' AND confrelid::regclass::text = $1
57 AND n.nspname = current_schema;
58 "#,
59 target_table_name
60 ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read FK constraint definitions targetting {target_table_name}"))
61}
62
63pub async fn describe_indices_on_table(
65 conn: &mut PgConnection,
66 table_name: &str,
67) -> Result<Vec<IndexDescription>, Error> {
68 sqlx::query_as!(
69 IndexDescription,
70 r#"
71 SELECT indexname AS "name!", indexdef AS "definition!", schemaname AS "table_name!"
72 FROM pg_indexes
73 WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL
74 "#,
75 table_name
76 ).fetch_all(&mut *conn).await.into_database("cannot search for indices")
77}
78
79pub async fn drop_constraint(
83 conn: &mut PgConnection,
84 constraint: &ConstraintDescription,
85) -> Result<(), Error> {
86 let name = &constraint.name;
87 let table_name = &constraint.table_name;
88 debug!("dropping constraint {name} on table {table_name}");
89 sqlx::query(&format!("ALTER TABLE {table_name} DROP CONSTRAINT {name};"))
90 .execute(&mut *conn)
91 .await
92 .into_database_with(|| format!("failed to drop constraint {name} on {table_name}"))?;
93
94 Ok(())
95}
96
97pub async fn drop_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
101 let index_name = &index.name;
102 debug!("dropping index {index_name}");
103 sqlx::query(&format!("DROP INDEX {index_name};"))
104 .execute(&mut *conn)
105 .await
106 .into_database_with(|| format!("failed to temporarily drop {index_name}"))?;
107
108 Ok(())
109}
110
111#[tracing::instrument(name = "syn2mas.restore_constraint", skip_all, fields(constraint.name = constraint.name))]
115pub async fn restore_constraint(
116 conn: &mut PgConnection,
117 constraint: &ConstraintDescription,
118) -> Result<(), Error> {
119 let start = Instant::now();
120
121 let ConstraintDescription {
122 name,
123 table_name,
124 definition,
125 } = &constraint;
126
127 sqlx::query(&format!(
128 "ALTER TABLE {table_name} ADD CONSTRAINT {name} {definition};"
129 ))
130 .execute(conn)
131 .await
132 .into_database_with(|| {
133 format!("failed to recreate constraint {name} on {table_name} with {definition}")
134 })?;
135
136 info!(
137 "constraint {name} rebuilt in {:.1}s",
138 Instant::now().duration_since(start).as_secs_f64()
139 );
140
141 Ok(())
142}
143
144#[tracing::instrument(name = "syn2mas.restore_index", skip_all, fields(index.name = index.name))]
148pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
149 let start = Instant::now();
150
151 let IndexDescription {
152 name,
153 table_name,
154 definition,
155 } = &index;
156
157 sqlx::query(&format!("{definition};"))
158 .execute(conn)
159 .await
160 .into_database_with(|| {
161 format!("failed to recreate index {name} on {table_name} with {definition}")
162 })?;
163
164 info!(
165 "index {name} rebuilt in {:.1}s",
166 Instant::now().duration_since(start).as_secs_f64()
167 );
168
169 Ok(())
170}