mas_matrix_synapse/
lib.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use std::{collections::HashSet, time::Duration};
8
9use anyhow::{Context, bail};
10use error::SynapseResponseExt;
11use http::{Method, StatusCode};
12use mas_http::RequestBuilderExt as _;
13use mas_matrix::{HomeserverConnection, MatrixUser, ProvisionRequest};
14use serde::{Deserialize, Serialize};
15use tracing::debug;
16use url::Url;
17
18static SYNAPSE_AUTH_PROVIDER: &str = "oauth-delegated";
19
20/// Encountered when trying to register a user ID which has been taken.
21/// — <https://spec.matrix.org/v1.10/client-server-api/#other-error-codes>
22const M_USER_IN_USE: &str = "M_USER_IN_USE";
23/// Encountered when trying to register a user ID which is not valid.
24/// — <https://spec.matrix.org/v1.10/client-server-api/#other-error-codes>
25const M_INVALID_USERNAME: &str = "M_INVALID_USERNAME";
26
27mod error;
28
29#[derive(Clone)]
30pub struct SynapseConnection {
31    homeserver: String,
32    endpoint: Url,
33    access_token: String,
34    http_client: reqwest::Client,
35}
36
37impl SynapseConnection {
38    #[must_use]
39    pub fn new(
40        homeserver: String,
41        endpoint: Url,
42        access_token: String,
43        http_client: reqwest::Client,
44    ) -> Self {
45        Self {
46            homeserver,
47            endpoint,
48            access_token,
49            http_client,
50        }
51    }
52
53    fn builder(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
54        self.http_client
55            .request(
56                method,
57                self.endpoint
58                    .join(url)
59                    .map(String::from)
60                    .unwrap_or_default(),
61            )
62            .bearer_auth(&self.access_token)
63    }
64
65    fn post(&self, url: &str) -> reqwest::RequestBuilder {
66        self.builder(Method::POST, url)
67    }
68
69    fn get(&self, url: &str) -> reqwest::RequestBuilder {
70        self.builder(Method::GET, url)
71    }
72
73    fn put(&self, url: &str) -> reqwest::RequestBuilder {
74        self.builder(Method::PUT, url)
75    }
76
77    fn delete(&self, url: &str) -> reqwest::RequestBuilder {
78        self.builder(Method::DELETE, url)
79    }
80}
81
82#[derive(Serialize, Deserialize)]
83struct ExternalID {
84    auth_provider: String,
85    external_id: String,
86}
87
88#[derive(Serialize, Deserialize)]
89#[serde(rename_all = "lowercase")]
90enum ThreePIDMedium {
91    Email,
92    Msisdn,
93}
94
95#[derive(Serialize, Deserialize)]
96struct ThreePID {
97    medium: ThreePIDMedium,
98    address: String,
99}
100
101#[derive(Default, Serialize, Deserialize)]
102struct SynapseUser {
103    #[serde(
104        default,
105        rename = "displayname",
106        skip_serializing_if = "Option::is_none"
107    )]
108    display_name: Option<String>,
109
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    avatar_url: Option<String>,
112
113    #[serde(default, rename = "threepids", skip_serializing_if = "Option::is_none")]
114    three_pids: Option<Vec<ThreePID>>,
115
116    #[serde(default, skip_serializing_if = "Option::is_none")]
117    external_ids: Option<Vec<ExternalID>>,
118
119    #[serde(default, skip_serializing_if = "Option::is_none")]
120    deactivated: Option<bool>,
121}
122
123#[derive(Deserialize)]
124struct SynapseDeviceListResponse {
125    devices: Vec<SynapseDevice>,
126}
127
128#[derive(Serialize, Deserialize)]
129struct SynapseDevice {
130    device_id: String,
131
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    dehydrated: Option<bool>,
134}
135
136#[derive(Serialize)]
137struct SynapseDeleteDevicesRequest {
138    devices: Vec<String>,
139}
140
141#[derive(Serialize)]
142struct SetDisplayNameRequest<'a> {
143    displayname: &'a str,
144}
145
146#[derive(Serialize)]
147struct SynapseDeactivateUserRequest {
148    erase: bool,
149}
150
151#[derive(Serialize)]
152struct SynapseAllowCrossSigningResetRequest {}
153
154/// Response body of
155/// `/_synapse/admin/v1/username_available?username={localpart}`
156#[derive(Deserialize)]
157struct UsernameAvailableResponse {
158    available: bool,
159}
160
161#[async_trait::async_trait]
162impl HomeserverConnection for SynapseConnection {
163    fn homeserver(&self) -> &str {
164        &self.homeserver
165    }
166
167    #[tracing::instrument(
168        name = "homeserver.query_user",
169        skip_all,
170        fields(
171            matrix.homeserver = self.homeserver,
172            matrix.mxid = mxid,
173        ),
174        err(Debug),
175    )]
176    async fn query_user(&self, mxid: &str) -> Result<MatrixUser, anyhow::Error> {
177        let mxid = urlencoding::encode(mxid);
178
179        let response = self
180            .get(&format!("_synapse/admin/v2/users/{mxid}"))
181            .send_traced()
182            .await
183            .context("Failed to query user from Synapse")?;
184
185        let response = response
186            .error_for_synapse_error()
187            .await
188            .context("Unexpected HTTP response while querying user from Synapse")?;
189
190        let body: SynapseUser = response
191            .json()
192            .await
193            .context("Failed to deserialize response while querying user from Synapse")?;
194
195        Ok(MatrixUser {
196            displayname: body.display_name,
197            avatar_url: body.avatar_url,
198            deactivated: body.deactivated.unwrap_or(false),
199        })
200    }
201
202    #[tracing::instrument(
203        name = "homeserver.is_localpart_available",
204        skip_all,
205        fields(
206            matrix.homeserver = self.homeserver,
207            matrix.localpart = localpart,
208        ),
209        err(Debug),
210    )]
211    async fn is_localpart_available(&self, localpart: &str) -> Result<bool, anyhow::Error> {
212        // Synapse will give us a M_UNKNOWN error if the localpart is not ASCII,
213        // so we bail out early
214        if !localpart.is_ascii() {
215            return Ok(false);
216        }
217
218        let localpart = urlencoding::encode(localpart);
219
220        let response = self
221            .get(&format!(
222                "_synapse/admin/v1/username_available?username={localpart}"
223            ))
224            .send_traced()
225            .await
226            .context("Failed to query localpart availability from Synapse")?;
227
228        match response.error_for_synapse_error().await {
229            Ok(resp) => {
230                let response: UsernameAvailableResponse = resp.json().await.context(
231                    "Unexpected response while querying localpart availability from Synapse",
232                )?;
233
234                Ok(response.available)
235            }
236
237            Err(err)
238                if err.errcode() == Some(M_INVALID_USERNAME)
239                    || err.errcode() == Some(M_USER_IN_USE) =>
240            {
241                debug!(
242                    error = &err as &dyn std::error::Error,
243                    "Localpart is not available"
244                );
245                Ok(false)
246            }
247
248            Err(err) => Err(err).context("Failed to query localpart availability from Synapse"),
249        }
250    }
251
252    #[tracing::instrument(
253        name = "homeserver.provision_user",
254        skip_all,
255        fields(
256            matrix.homeserver = self.homeserver,
257            matrix.mxid = request.mxid(),
258            user.id = request.sub(),
259        ),
260        err(Debug),
261    )]
262    async fn provision_user(&self, request: &ProvisionRequest) -> Result<bool, anyhow::Error> {
263        let mut body = SynapseUser {
264            external_ids: Some(vec![ExternalID {
265                auth_provider: SYNAPSE_AUTH_PROVIDER.to_owned(),
266                external_id: request.sub().to_owned(),
267            }]),
268            ..SynapseUser::default()
269        };
270
271        request
272            .on_displayname(|displayname| {
273                body.display_name = Some(displayname.unwrap_or_default().to_owned());
274            })
275            .on_avatar_url(|avatar_url| {
276                body.avatar_url = Some(avatar_url.unwrap_or_default().to_owned());
277            })
278            .on_emails(|emails| {
279                body.three_pids = Some(
280                    emails
281                        .unwrap_or_default()
282                        .iter()
283                        .map(|email| ThreePID {
284                            medium: ThreePIDMedium::Email,
285                            address: email.clone(),
286                        })
287                        .collect(),
288                );
289            });
290
291        let mxid = urlencoding::encode(request.mxid());
292        let response = self
293            .put(&format!("_synapse/admin/v2/users/{mxid}"))
294            .json(&body)
295            .send_traced()
296            .await
297            .context("Failed to provision user in Synapse")?;
298
299        let response = response
300            .error_for_synapse_error()
301            .await
302            .context("Unexpected HTTP response while provisioning user in Synapse")?;
303
304        match response.status() {
305            StatusCode::CREATED => Ok(true),
306            StatusCode::OK => Ok(false),
307            code => bail!("Unexpected HTTP code while provisioning user in Synapse: {code}"),
308        }
309    }
310
311    #[tracing::instrument(
312        name = "homeserver.create_device",
313        skip_all,
314        fields(
315            matrix.homeserver = self.homeserver,
316            matrix.mxid = mxid,
317            matrix.device_id = device_id,
318        ),
319        err(Debug),
320    )]
321    async fn create_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error> {
322        let mxid = urlencoding::encode(mxid);
323
324        let response = self
325            .post(&format!("_synapse/admin/v2/users/{mxid}/devices"))
326            .json(&SynapseDevice {
327                device_id: device_id.to_owned(),
328                dehydrated: None,
329            })
330            .send_traced()
331            .await
332            .context("Failed to create device in Synapse")?;
333
334        let response = response
335            .error_for_synapse_error()
336            .await
337            .context("Unexpected HTTP response while creating device in Synapse")?;
338
339        if response.status() != StatusCode::CREATED {
340            bail!(
341                "Unexpected HTTP code while creating device in Synapse: {}",
342                response.status()
343            );
344        }
345
346        Ok(())
347    }
348
349    #[tracing::instrument(
350        name = "homeserver.delete_device",
351        skip_all,
352        fields(
353            matrix.homeserver = self.homeserver,
354            matrix.mxid = mxid,
355            matrix.device_id = device_id,
356        ),
357        err(Debug),
358    )]
359    async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error> {
360        let mxid = urlencoding::encode(mxid);
361        let device_id = urlencoding::encode(device_id);
362
363        let response = self
364            .delete(&format!(
365                "_synapse/admin/v2/users/{mxid}/devices/{device_id}"
366            ))
367            .send_traced()
368            .await
369            .context("Failed to delete device in Synapse")?;
370
371        let response = response
372            .error_for_synapse_error()
373            .await
374            .context("Unexpected HTTP response while deleting device in Synapse")?;
375
376        if response.status() != StatusCode::OK {
377            bail!(
378                "Unexpected HTTP code while deleting device in Synapse: {}",
379                response.status()
380            );
381        }
382
383        Ok(())
384    }
385
386    #[tracing::instrument(
387        name = "homeserver.sync_devices",
388        skip_all,
389        fields(
390            matrix.homeserver = self.homeserver,
391            matrix.mxid = mxid,
392        ),
393        err(Debug),
394    )]
395    async fn sync_devices(
396        &self,
397        mxid: &str,
398        devices: HashSet<String>,
399    ) -> Result<(), anyhow::Error> {
400        // Get the list of current devices
401        let mxid_url = urlencoding::encode(mxid);
402
403        let response = self
404            .get(&format!("_synapse/admin/v2/users/{mxid_url}/devices"))
405            .send_traced()
406            .await
407            .context("Failed to query devices from Synapse")?;
408
409        let response = response.error_for_synapse_error().await?;
410
411        if response.status() != StatusCode::OK {
412            bail!(
413                "Unexpected HTTP code while querying devices from Synapse: {}",
414                response.status()
415            );
416        }
417
418        let body: SynapseDeviceListResponse = response
419            .json()
420            .await
421            .context("Failed to parse response while querying devices from Synapse")?;
422
423        let existing_devices: HashSet<String> = body
424            .devices
425            .into_iter()
426            .filter(|d| d.dehydrated != Some(true))
427            .map(|d| d.device_id)
428            .collect();
429
430        // First, delete all the devices that are not needed anymore
431        let to_delete = existing_devices.difference(&devices).cloned().collect();
432
433        let response = self
434            .post(&format!(
435                "_synapse/admin/v2/users/{mxid_url}/delete_devices"
436            ))
437            .json(&SynapseDeleteDevicesRequest { devices: to_delete })
438            .send_traced()
439            .await
440            .context("Failed to delete devices from Synapse")?;
441
442        let response = response
443            .error_for_synapse_error()
444            .await
445            .context("Unexpected HTTP response while deleting devices from Synapse")?;
446
447        if response.status() != StatusCode::OK {
448            bail!(
449                "Unexpected HTTP code while deleting devices from Synapse: {}",
450                response.status()
451            );
452        }
453
454        // Then, create the devices that are missing. There is no batching API to do
455        // this, so we do this sequentially, which is fine as the API is idempotent.
456        for device_id in devices.difference(&existing_devices) {
457            self.create_device(mxid, device_id).await?;
458        }
459
460        Ok(())
461    }
462
463    #[tracing::instrument(
464        name = "homeserver.delete_user",
465        skip_all,
466        fields(
467            matrix.homeserver = self.homeserver,
468            matrix.mxid = mxid,
469            erase = erase,
470        ),
471        err(Debug),
472    )]
473    async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), anyhow::Error> {
474        let mxid = urlencoding::encode(mxid);
475
476        let response = self
477            .post(&format!("_synapse/admin/v1/deactivate/{mxid}"))
478            .json(&SynapseDeactivateUserRequest { erase })
479            // Deactivation can take a while, so we set a longer timeout
480            .timeout(Duration::from_secs(60 * 5))
481            .send_traced()
482            .await
483            .context("Failed to deactivate user in Synapse")?;
484
485        let response = response
486            .error_for_synapse_error()
487            .await
488            .context("Unexpected HTTP response while deactivating user in Synapse")?;
489
490        if response.status() != StatusCode::OK {
491            bail!(
492                "Unexpected HTTP code while deactivating user in Synapse: {}",
493                response.status()
494            );
495        }
496
497        Ok(())
498    }
499
500    #[tracing::instrument(
501        name = "homeserver.reactivate_user",
502        skip_all,
503        fields(
504            matrix.homeserver = self.homeserver,
505            matrix.mxid = mxid,
506        ),
507        err(Debug),
508    )]
509    async fn reactivate_user(&self, mxid: &str) -> Result<(), anyhow::Error> {
510        let mxid = urlencoding::encode(mxid);
511        let response = self
512            .put(&format!("_synapse/admin/v2/users/{mxid}"))
513            .json(&SynapseUser {
514                deactivated: Some(false),
515                ..SynapseUser::default()
516            })
517            .send_traced()
518            .await
519            .context("Failed to reactivate user in Synapse")?;
520
521        let response = response
522            .error_for_synapse_error()
523            .await
524            .context("Unexpected HTTP response while reactivating user in Synapse")?;
525
526        match response.status() {
527            StatusCode::CREATED | StatusCode::OK => Ok(()),
528            code => bail!("Unexpected HTTP code while reactivating user in Synapse: {code}",),
529        }
530    }
531
532    #[tracing::instrument(
533        name = "homeserver.set_displayname",
534        skip_all,
535        fields(
536            matrix.homeserver = self.homeserver,
537            matrix.mxid = mxid,
538            matrix.displayname = displayname,
539        ),
540        err(Debug),
541    )]
542    async fn set_displayname(&self, mxid: &str, displayname: &str) -> Result<(), anyhow::Error> {
543        let mxid = urlencoding::encode(mxid);
544        let response = self
545            .put(&format!("_matrix/client/v3/profile/{mxid}/displayname"))
546            .json(&SetDisplayNameRequest { displayname })
547            .send_traced()
548            .await
549            .context("Failed to set displayname in Synapse")?;
550
551        let response = response
552            .error_for_synapse_error()
553            .await
554            .context("Unexpected HTTP response while setting displayname in Synapse")?;
555
556        if response.status() != StatusCode::OK {
557            bail!(
558                "Unexpected HTTP code while setting displayname in Synapse: {}",
559                response.status()
560            );
561        }
562
563        Ok(())
564    }
565
566    #[tracing::instrument(
567        name = "homeserver.unset_displayname",
568        skip_all,
569        fields(
570            matrix.homeserver = self.homeserver,
571            matrix.mxid = mxid,
572        ),
573        err(Display),
574    )]
575    async fn unset_displayname(&self, mxid: &str) -> Result<(), anyhow::Error> {
576        self.set_displayname(mxid, "").await
577    }
578
579    #[tracing::instrument(
580        name = "homeserver.allow_cross_signing_reset",
581        skip_all,
582        fields(
583            matrix.homeserver = self.homeserver,
584            matrix.mxid = mxid,
585        ),
586        err(Debug),
587    )]
588    async fn allow_cross_signing_reset(&self, mxid: &str) -> Result<(), anyhow::Error> {
589        let mxid = urlencoding::encode(mxid);
590
591        let response = self
592            .post(&format!(
593                "_synapse/admin/v1/users/{mxid}/_allow_cross_signing_replacement_without_uia"
594            ))
595            .json(&SynapseAllowCrossSigningResetRequest {})
596            .send_traced()
597            .await
598            .context("Failed to allow cross-signing reset in Synapse")?;
599
600        let response = response
601            .error_for_synapse_error()
602            .await
603            .context("Unexpected HTTP response while allowing cross-signing reset in Synapse")?;
604
605        if response.status() != StatusCode::OK {
606            bail!(
607                "Unexpected HTTP code while allowing cross-signing reset in Synapse: {}",
608                response.status(),
609            );
610        }
611
612        Ok(())
613    }
614}