1use anyhow::Context;
8use async_trait::async_trait;
9use mas_storage::{
10 RepositoryAccess,
11 compat::CompatSessionFilter,
12 oauth2::OAuth2SessionFilter,
13 queue::{DeactivateUserJob, ReactivateUserJob},
14 user::{BrowserSessionFilter, UserEmailFilter, UserRepository},
15};
16use tracing::info;
17
18use crate::{
19 State,
20 new_queue::{JobContext, JobError, RunnableJob},
21};
22
23#[async_trait]
25impl RunnableJob for DeactivateUserJob {
26 #[tracing::instrument(
27 name = "job.deactivate_user"
28 fields(user.id = %self.user_id(), erase = %self.hs_erase()),
29 skip_all,
30 )]
31 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
32 let clock = state.clock();
33 let matrix = state.matrix_connection();
34 let mut repo = state.repository().await.map_err(JobError::retry)?;
35
36 let user = repo
37 .user()
38 .lookup(self.user_id())
39 .await
40 .map_err(JobError::retry)?
41 .context("User not found")
42 .map_err(JobError::fail)?;
43
44 let user = repo
46 .user()
47 .lock(&clock, user)
48 .await
49 .context("Failed to lock user")
50 .map_err(JobError::retry)?;
51
52 let user = repo
53 .user()
54 .deactivate(&clock, user)
55 .await
56 .context("Failed to deactivate user")
57 .map_err(JobError::retry)?;
58
59 let n = repo
61 .browser_session()
62 .finish_bulk(
63 &clock,
64 BrowserSessionFilter::new().for_user(&user).active_only(),
65 )
66 .await
67 .map_err(JobError::retry)?;
68 info!(affected = n, "Killed all browser sessions for user");
69
70 let n = repo
71 .oauth2_session()
72 .finish_bulk(
73 &clock,
74 OAuth2SessionFilter::new().for_user(&user).active_only(),
75 )
76 .await
77 .map_err(JobError::retry)?;
78 info!(affected = n, "Killed all OAuth 2.0 sessions for user");
79
80 let n = repo
81 .compat_session()
82 .finish_bulk(
83 &clock,
84 CompatSessionFilter::new().for_user(&user).active_only(),
85 )
86 .await
87 .map_err(JobError::retry)?;
88 info!(affected = n, "Killed all compatibility sessions for user");
89
90 let n = repo
92 .user_email()
93 .remove_bulk(UserEmailFilter::new().for_user(&user))
94 .await
95 .map_err(JobError::retry)?;
96 info!(affected = n, "Removed all email addresses for user");
97
98 repo.save().await.map_err(JobError::retry)?;
101
102 let mxid = matrix.mxid(&user.username);
103 info!("Deactivating user {} on homeserver", mxid);
104 matrix
105 .delete_user(&mxid, self.hs_erase())
106 .await
107 .map_err(JobError::retry)?;
108
109 Ok(())
110 }
111}
112
113#[async_trait]
115impl RunnableJob for ReactivateUserJob {
116 #[tracing::instrument(
117 name = "job.reactivate_user",
118 fields(user.id = %self.user_id()),
119 skip_all,
120 )]
121 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
122 let matrix = state.matrix_connection();
123 let mut repo = state.repository().await.map_err(JobError::retry)?;
124
125 let user = repo
126 .user()
127 .lookup(self.user_id())
128 .await
129 .map_err(JobError::retry)?
130 .context("User not found")
131 .map_err(JobError::fail)?;
132
133 let mxid = matrix.mxid(&user.username);
134 info!("Reactivating user {} on homeserver", mxid);
135 matrix
136 .reactivate_user(&mxid)
137 .await
138 .map_err(JobError::retry)?;
139
140 let _user = repo.user().unlock(user).await.map_err(JobError::retry)?;
143 repo.save().await.map_err(JobError::retry)?;
144
145 Ok(())
146 }
147}