1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
67use anyhow::Context;
8use async_trait::async_trait;
9use mas_storage::{
10 RepositoryAccess,
11 compat::CompatSessionFilter,
12 oauth2::OAuth2SessionFilter,
13 queue::{DeactivateUserJob, ReactivateUserJob},
14 user::{BrowserSessionFilter, UserEmailFilter, UserRepository},
15};
16use tracing::info;
1718use crate::{
19 State,
20 new_queue::{JobContext, JobError, RunnableJob},
21};
2223/// Job to deactivate a user, both locally and on the Matrix homeserver.
24#[async_trait]
25impl RunnableJob for DeactivateUserJob {
26#[tracing::instrument(
27 name = "job.deactivate_user"
28fields(user.id = %self.user_id(), erase = %self.hs_erase()),
29 skip_all,
30 )]
31async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
32let clock = state.clock();
33let matrix = state.matrix_connection();
34let mut repo = state.repository().await.map_err(JobError::retry)?;
3536let user = repo
37 .user()
38 .lookup(self.user_id())
39 .await
40.map_err(JobError::retry)?
41.context("User not found")
42 .map_err(JobError::fail)?;
4344// Let's first lock & deactivate the user
45let user = repo
46 .user()
47 .lock(&clock, user)
48 .await
49.context("Failed to lock user")
50 .map_err(JobError::retry)?;
5152let user = repo
53 .user()
54 .deactivate(&clock, user)
55 .await
56.context("Failed to deactivate user")
57 .map_err(JobError::retry)?;
5859// Kill all sessions for the user
60let n = repo
61 .browser_session()
62 .finish_bulk(
63&clock,
64 BrowserSessionFilter::new().for_user(&user).active_only(),
65 )
66 .await
67.map_err(JobError::retry)?;
68info!(affected = n, "Killed all browser sessions for user");
6970let n = repo
71 .oauth2_session()
72 .finish_bulk(
73&clock,
74 OAuth2SessionFilter::new().for_user(&user).active_only(),
75 )
76 .await
77.map_err(JobError::retry)?;
78info!(affected = n, "Killed all OAuth 2.0 sessions for user");
7980let n = repo
81 .compat_session()
82 .finish_bulk(
83&clock,
84 CompatSessionFilter::new().for_user(&user).active_only(),
85 )
86 .await
87.map_err(JobError::retry)?;
88info!(affected = n, "Killed all compatibility sessions for user");
8990// Delete all the email addresses for the user
91let n = repo
92 .user_email()
93 .remove_bulk(UserEmailFilter::new().for_user(&user))
94 .await
95.map_err(JobError::retry)?;
96info!(affected = n, "Removed all email addresses for user");
9798// Before calling back to the homeserver, commit the changes to the database, as
99 // we want the user to be locked out as soon as possible
100repo.save().await.map_err(JobError::retry)?;
101102let mxid = matrix.mxid(&user.username);
103info!("Deactivating user {} on homeserver", mxid);
104 matrix
105 .delete_user(&mxid, self.hs_erase())
106 .await
107.map_err(JobError::retry)?;
108109Ok(())
110 }
111}
112113/// Job to reactivate a user, both locally and on the Matrix homeserver.
114#[async_trait]
115impl RunnableJob for ReactivateUserJob {
116#[tracing::instrument(
117 name = "job.reactivate_user",
118 fields(user.id = %self.user_id()),
119 skip_all,
120 )]
121async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
122let matrix = state.matrix_connection();
123let mut repo = state.repository().await.map_err(JobError::retry)?;
124125let user = repo
126 .user()
127 .lookup(self.user_id())
128 .await
129.map_err(JobError::retry)?
130.context("User not found")
131 .map_err(JobError::fail)?;
132133let mxid = matrix.mxid(&user.username);
134info!("Reactivating user {} on homeserver", mxid);
135 matrix
136 .reactivate_user(&mxid)
137 .await
138.map_err(JobError::retry)?;
139140// We want to unlock the user from our side only once it has been reactivated on
141 // the homeserver
142let _user = repo.user().unlock(user).await.map_err(JobError::retry)?;
143 repo.save().await.map_err(JobError::retry)?;
144145Ok(())
146 }
147}