mas_tasks/
email.rs

1// Copyright 2024, 2025 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use async_trait::async_trait;
8use chrono::Duration;
9use mas_email::{Address, EmailVerificationContext, Mailbox};
10use mas_storage::queue::{SendEmailAuthenticationCodeJob, VerifyEmailJob};
11use mas_templates::TemplateContext as _;
12use rand::{Rng, distributions::Uniform};
13use tracing::info;
14
15use crate::{
16    State,
17    new_queue::{JobContext, JobError, RunnableJob},
18};
19
20#[async_trait]
21impl RunnableJob for VerifyEmailJob {
22    #[tracing::instrument(
23        name = "job.verify_email",
24        fields(user_email.id = %self.user_email_id()),
25        skip_all,
26    )]
27    async fn run(&self, _state: &State, _context: JobContext) -> Result<(), JobError> {
28        // This job was for the old email verification flow, which has been replaced.
29        // We still want to consume existing jobs in the queue, so we just make them
30        // permanently fail.
31        Err(JobError::fail(anyhow::anyhow!("Not implemented")))
32    }
33}
34
35#[async_trait]
36impl RunnableJob for SendEmailAuthenticationCodeJob {
37    #[tracing::instrument(
38        name = "job.send_email_authentication_code",
39        fields(user_email_authentication.id = %self.user_email_authentication_id()),
40        skip_all,
41    )]
42    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
43        let clock = state.clock();
44        let mailer = state.mailer();
45        let mut rng = state.rng();
46        let mut repo = state.repository().await.map_err(JobError::retry)?;
47
48        let user_email_authentication = repo
49            .user_email()
50            .lookup_authentication(self.user_email_authentication_id())
51            .await
52            .map_err(JobError::retry)?
53            .ok_or(JobError::fail(anyhow::anyhow!(
54                "User email authentication not found"
55            )))?;
56
57        if user_email_authentication.completed_at.is_some() {
58            return Err(JobError::fail(anyhow::anyhow!(
59                "User email authentication already completed"
60            )));
61        }
62
63        // Load the browser session, if any
64        let browser_session =
65            if let Some(browser_session) = user_email_authentication.user_session_id {
66                Some(
67                    repo.browser_session()
68                        .lookup(browser_session)
69                        .await
70                        .map_err(JobError::retry)?
71                        .ok_or(JobError::fail(anyhow::anyhow!(
72                            "Failed to load browser session"
73                        )))?,
74                )
75            } else {
76                None
77            };
78
79        // Load the registration, if any
80        let registration =
81            if let Some(registration_id) = user_email_authentication.user_registration_id {
82                Some(
83                    repo.user_registration()
84                        .lookup(registration_id)
85                        .await
86                        .map_err(JobError::retry)?
87                        .ok_or(JobError::fail(anyhow::anyhow!(
88                            "Failed to load user registration"
89                        )))?,
90                )
91            } else {
92                None
93            };
94
95        // Generate a new 6-digit authentication code
96        let range = Uniform::<u32>::from(0..1_000_000);
97        let code = rng.sample(range);
98        let code = format!("{code:06}");
99        let code = repo
100            .user_email()
101            .add_authentication_code(
102                &mut rng,
103                &clock,
104                Duration::minutes(5), // TODO: make this configurable
105                &user_email_authentication,
106                code,
107            )
108            .await
109            .map_err(JobError::retry)?;
110
111        let address: Address = user_email_authentication
112            .email
113            .parse()
114            .map_err(JobError::fail)?;
115        let username_from_session = browser_session.as_ref().map(|s| s.user.username.clone());
116        let username_from_registration = registration.as_ref().map(|r| r.username.clone());
117        let username = username_from_registration.or(username_from_session);
118        let mailbox = Mailbox::new(username, address);
119
120        info!("Sending email verification code to {}", mailbox);
121
122        let language = self.language().parse().map_err(JobError::fail)?;
123
124        let context = EmailVerificationContext::new(code, browser_session, registration)
125            .with_language(language);
126        mailer
127            .send_verification_email(mailbox, &context)
128            .await
129            .map_err(JobError::fail)?;
130
131        repo.save().await.map_err(JobError::fail)?;
132
133        Ok(())
134    }
135}