mas_matrix_synapse/
lib.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use std::collections::HashSet;
8
9use anyhow::{Context, bail};
10use error::SynapseResponseExt;
11use http::{Method, StatusCode};
12use mas_http::RequestBuilderExt as _;
13use mas_matrix::{HomeserverConnection, MatrixUser, ProvisionRequest};
14use serde::{Deserialize, Serialize};
15use tracing::debug;
16use url::Url;
17
18static SYNAPSE_AUTH_PROVIDER: &str = "oauth-delegated";
19
20/// Encountered when trying to register a user ID which has been taken.
21/// — <https://spec.matrix.org/v1.10/client-server-api/#other-error-codes>
22const M_USER_IN_USE: &str = "M_USER_IN_USE";
23/// Encountered when trying to register a user ID which is not valid.
24/// — <https://spec.matrix.org/v1.10/client-server-api/#other-error-codes>
25const M_INVALID_USERNAME: &str = "M_INVALID_USERNAME";
26
27mod error;
28
29#[derive(Clone)]
30pub struct SynapseConnection {
31    homeserver: String,
32    endpoint: Url,
33    access_token: String,
34    http_client: reqwest::Client,
35}
36
37impl SynapseConnection {
38    #[must_use]
39    pub fn new(
40        homeserver: String,
41        endpoint: Url,
42        access_token: String,
43        http_client: reqwest::Client,
44    ) -> Self {
45        Self {
46            homeserver,
47            endpoint,
48            access_token,
49            http_client,
50        }
51    }
52
53    fn builder(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
54        self.http_client
55            .request(
56                method,
57                self.endpoint
58                    .join(url)
59                    .map(String::from)
60                    .unwrap_or_default(),
61            )
62            .bearer_auth(&self.access_token)
63    }
64
65    fn post(&self, url: &str) -> reqwest::RequestBuilder {
66        self.builder(Method::POST, url)
67    }
68
69    fn get(&self, url: &str) -> reqwest::RequestBuilder {
70        self.builder(Method::GET, url)
71    }
72
73    fn put(&self, url: &str) -> reqwest::RequestBuilder {
74        self.builder(Method::PUT, url)
75    }
76
77    fn delete(&self, url: &str) -> reqwest::RequestBuilder {
78        self.builder(Method::DELETE, url)
79    }
80}
81
82#[derive(Serialize, Deserialize)]
83struct ExternalID {
84    auth_provider: String,
85    external_id: String,
86}
87
88#[derive(Serialize, Deserialize)]
89#[serde(rename_all = "lowercase")]
90enum ThreePIDMedium {
91    Email,
92    Msisdn,
93}
94
95#[derive(Serialize, Deserialize)]
96struct ThreePID {
97    medium: ThreePIDMedium,
98    address: String,
99}
100
101#[derive(Default, Serialize, Deserialize)]
102struct SynapseUser {
103    #[serde(
104        default,
105        rename = "displayname",
106        skip_serializing_if = "Option::is_none"
107    )]
108    display_name: Option<String>,
109
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    avatar_url: Option<String>,
112
113    #[serde(default, rename = "threepids", skip_serializing_if = "Option::is_none")]
114    three_pids: Option<Vec<ThreePID>>,
115
116    #[serde(default, skip_serializing_if = "Option::is_none")]
117    external_ids: Option<Vec<ExternalID>>,
118
119    #[serde(default, skip_serializing_if = "Option::is_none")]
120    deactivated: Option<bool>,
121}
122
123#[derive(Deserialize)]
124struct SynapseDeviceListResponse {
125    devices: Vec<SynapseDevice>,
126}
127
128#[derive(Serialize, Deserialize)]
129struct SynapseDevice {
130    device_id: String,
131
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    dehydrated: Option<bool>,
134}
135
136#[derive(Serialize)]
137struct SynapseDeleteDevicesRequest {
138    devices: Vec<String>,
139}
140
141#[derive(Serialize)]
142struct SetDisplayNameRequest<'a> {
143    displayname: &'a str,
144}
145
146#[derive(Serialize)]
147struct SynapseDeactivateUserRequest {
148    erase: bool,
149}
150
151#[derive(Serialize)]
152struct SynapseAllowCrossSigningResetRequest {}
153
154/// Response body of
155/// `/_synapse/admin/v1/username_available?username={localpart}`
156#[derive(Deserialize)]
157struct UsernameAvailableResponse {
158    available: bool,
159}
160
161#[async_trait::async_trait]
162impl HomeserverConnection for SynapseConnection {
163    fn homeserver(&self) -> &str {
164        &self.homeserver
165    }
166
167    #[tracing::instrument(
168        name = "homeserver.query_user",
169        skip_all,
170        fields(
171            matrix.homeserver = self.homeserver,
172            matrix.mxid = mxid,
173        ),
174        err(Debug),
175    )]
176    async fn query_user(&self, mxid: &str) -> Result<MatrixUser, anyhow::Error> {
177        let mxid = urlencoding::encode(mxid);
178
179        let response = self
180            .get(&format!("_synapse/admin/v2/users/{mxid}"))
181            .send_traced()
182            .await
183            .context("Failed to query user from Synapse")?;
184
185        let response = response
186            .error_for_synapse_error()
187            .await
188            .context("Unexpected HTTP response while querying user from Synapse")?;
189
190        let body: SynapseUser = response
191            .json()
192            .await
193            .context("Failed to deserialize response while querying user from Synapse")?;
194
195        Ok(MatrixUser {
196            displayname: body.display_name,
197            avatar_url: body.avatar_url,
198            deactivated: body.deactivated.unwrap_or(false),
199        })
200    }
201
202    #[tracing::instrument(
203        name = "homeserver.is_localpart_available",
204        skip_all,
205        fields(
206            matrix.homeserver = self.homeserver,
207            matrix.localpart = localpart,
208        ),
209        err(Debug),
210    )]
211    async fn is_localpart_available(&self, localpart: &str) -> Result<bool, anyhow::Error> {
212        let localpart = urlencoding::encode(localpart);
213
214        let response = self
215            .get(&format!(
216                "_synapse/admin/v1/username_available?username={localpart}"
217            ))
218            .send_traced()
219            .await
220            .context("Failed to query localpart availability from Synapse")?;
221
222        match response.error_for_synapse_error().await {
223            Ok(resp) => {
224                let response: UsernameAvailableResponse = resp.json().await.context(
225                    "Unexpected response while querying localpart availability from Synapse",
226                )?;
227
228                Ok(response.available)
229            }
230
231            Err(err)
232                if err.errcode() == Some(M_INVALID_USERNAME)
233                    || err.errcode() == Some(M_USER_IN_USE) =>
234            {
235                debug!(
236                    error = &err as &dyn std::error::Error,
237                    "Localpart is not available"
238                );
239                Ok(false)
240            }
241
242            Err(err) => Err(err).context("Failed to query localpart availability from Synapse"),
243        }
244    }
245
246    #[tracing::instrument(
247        name = "homeserver.provision_user",
248        skip_all,
249        fields(
250            matrix.homeserver = self.homeserver,
251            matrix.mxid = request.mxid(),
252            user.id = request.sub(),
253        ),
254        err(Debug),
255    )]
256    async fn provision_user(&self, request: &ProvisionRequest) -> Result<bool, anyhow::Error> {
257        let mut body = SynapseUser {
258            external_ids: Some(vec![ExternalID {
259                auth_provider: SYNAPSE_AUTH_PROVIDER.to_owned(),
260                external_id: request.sub().to_owned(),
261            }]),
262            ..SynapseUser::default()
263        };
264
265        request
266            .on_displayname(|displayname| {
267                body.display_name = Some(displayname.unwrap_or_default().to_owned());
268            })
269            .on_avatar_url(|avatar_url| {
270                body.avatar_url = Some(avatar_url.unwrap_or_default().to_owned());
271            })
272            .on_emails(|emails| {
273                body.three_pids = Some(
274                    emails
275                        .unwrap_or_default()
276                        .iter()
277                        .map(|email| ThreePID {
278                            medium: ThreePIDMedium::Email,
279                            address: email.clone(),
280                        })
281                        .collect(),
282                );
283            });
284
285        let mxid = urlencoding::encode(request.mxid());
286        let response = self
287            .put(&format!("_synapse/admin/v2/users/{mxid}"))
288            .json(&body)
289            .send_traced()
290            .await
291            .context("Failed to provision user in Synapse")?;
292
293        let response = response
294            .error_for_synapse_error()
295            .await
296            .context("Unexpected HTTP response while provisioning user in Synapse")?;
297
298        match response.status() {
299            StatusCode::CREATED => Ok(true),
300            StatusCode::OK => Ok(false),
301            code => bail!("Unexpected HTTP code while provisioning user in Synapse: {code}"),
302        }
303    }
304
305    #[tracing::instrument(
306        name = "homeserver.create_device",
307        skip_all,
308        fields(
309            matrix.homeserver = self.homeserver,
310            matrix.mxid = mxid,
311            matrix.device_id = device_id,
312        ),
313        err(Debug),
314    )]
315    async fn create_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error> {
316        let mxid = urlencoding::encode(mxid);
317
318        let response = self
319            .post(&format!("_synapse/admin/v2/users/{mxid}/devices"))
320            .json(&SynapseDevice {
321                device_id: device_id.to_owned(),
322                dehydrated: None,
323            })
324            .send_traced()
325            .await
326            .context("Failed to create device in Synapse")?;
327
328        let response = response
329            .error_for_synapse_error()
330            .await
331            .context("Unexpected HTTP response while creating device in Synapse")?;
332
333        if response.status() != StatusCode::CREATED {
334            bail!(
335                "Unexpected HTTP code while creating device in Synapse: {}",
336                response.status()
337            );
338        }
339
340        Ok(())
341    }
342
343    #[tracing::instrument(
344        name = "homeserver.delete_device",
345        skip_all,
346        fields(
347            matrix.homeserver = self.homeserver,
348            matrix.mxid = mxid,
349            matrix.device_id = device_id,
350        ),
351        err(Debug),
352    )]
353    async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error> {
354        let mxid = urlencoding::encode(mxid);
355        let device_id = urlencoding::encode(device_id);
356
357        let response = self
358            .delete(&format!(
359                "_synapse/admin/v2/users/{mxid}/devices/{device_id}"
360            ))
361            .send_traced()
362            .await
363            .context("Failed to delete device in Synapse")?;
364
365        let response = response
366            .error_for_synapse_error()
367            .await
368            .context("Unexpected HTTP response while deleting device in Synapse")?;
369
370        if response.status() != StatusCode::OK {
371            bail!(
372                "Unexpected HTTP code while deleting device in Synapse: {}",
373                response.status()
374            );
375        }
376
377        Ok(())
378    }
379
380    #[tracing::instrument(
381        name = "homeserver.sync_devices",
382        skip_all,
383        fields(
384            matrix.homeserver = self.homeserver,
385            matrix.mxid = mxid,
386        ),
387        err(Debug),
388    )]
389    async fn sync_devices(
390        &self,
391        mxid: &str,
392        devices: HashSet<String>,
393    ) -> Result<(), anyhow::Error> {
394        // Get the list of current devices
395        let mxid_url = urlencoding::encode(mxid);
396
397        let response = self
398            .get(&format!("_synapse/admin/v2/users/{mxid_url}/devices"))
399            .send_traced()
400            .await
401            .context("Failed to query devices from Synapse")?;
402
403        let response = response.error_for_synapse_error().await?;
404
405        if response.status() != StatusCode::OK {
406            bail!(
407                "Unexpected HTTP code while querying devices from Synapse: {}",
408                response.status()
409            );
410        }
411
412        let body: SynapseDeviceListResponse = response
413            .json()
414            .await
415            .context("Failed to parse response while querying devices from Synapse")?;
416
417        let existing_devices: HashSet<String> = body
418            .devices
419            .into_iter()
420            .filter(|d| d.dehydrated != Some(true))
421            .map(|d| d.device_id)
422            .collect();
423
424        // First, delete all the devices that are not needed anymore
425        let to_delete = existing_devices.difference(&devices).cloned().collect();
426
427        let response = self
428            .post(&format!(
429                "_synapse/admin/v2/users/{mxid_url}/delete_devices"
430            ))
431            .json(&SynapseDeleteDevicesRequest { devices: to_delete })
432            .send_traced()
433            .await
434            .context("Failed to delete devices from Synapse")?;
435
436        let response = response
437            .error_for_synapse_error()
438            .await
439            .context("Unexpected HTTP response while deleting devices from Synapse")?;
440
441        if response.status() != StatusCode::OK {
442            bail!(
443                "Unexpected HTTP code while deleting devices from Synapse: {}",
444                response.status()
445            );
446        }
447
448        // Then, create the devices that are missing. There is no batching API to do
449        // this, so we do this sequentially, which is fine as the API is idempotent.
450        for device_id in devices.difference(&existing_devices) {
451            self.create_device(mxid, device_id).await?;
452        }
453
454        Ok(())
455    }
456
457    #[tracing::instrument(
458        name = "homeserver.delete_user",
459        skip_all,
460        fields(
461            matrix.homeserver = self.homeserver,
462            matrix.mxid = mxid,
463            erase = erase,
464        ),
465        err(Debug),
466    )]
467    async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), anyhow::Error> {
468        let mxid = urlencoding::encode(mxid);
469
470        let response = self
471            .post(&format!("_synapse/admin/v1/deactivate/{mxid}"))
472            .json(&SynapseDeactivateUserRequest { erase })
473            .send_traced()
474            .await
475            .context("Failed to deactivate user in Synapse")?;
476
477        let response = response
478            .error_for_synapse_error()
479            .await
480            .context("Unexpected HTTP response while deactivating user in Synapse")?;
481
482        if response.status() != StatusCode::OK {
483            bail!(
484                "Unexpected HTTP code while deactivating user in Synapse: {}",
485                response.status()
486            );
487        }
488
489        Ok(())
490    }
491
492    #[tracing::instrument(
493        name = "homeserver.reactivate_user",
494        skip_all,
495        fields(
496            matrix.homeserver = self.homeserver,
497            matrix.mxid = mxid,
498        ),
499        err(Debug),
500    )]
501    async fn reactivate_user(&self, mxid: &str) -> Result<(), anyhow::Error> {
502        let mxid = urlencoding::encode(mxid);
503        let response = self
504            .put(&format!("_synapse/admin/v2/users/{mxid}"))
505            .json(&SynapseUser {
506                deactivated: Some(false),
507                ..SynapseUser::default()
508            })
509            .send_traced()
510            .await
511            .context("Failed to reactivate user in Synapse")?;
512
513        let response = response
514            .error_for_synapse_error()
515            .await
516            .context("Unexpected HTTP response while reactivating user in Synapse")?;
517
518        match response.status() {
519            StatusCode::CREATED | StatusCode::OK => Ok(()),
520            code => bail!("Unexpected HTTP code while reactivating user in Synapse: {code}",),
521        }
522    }
523
524    #[tracing::instrument(
525        name = "homeserver.set_displayname",
526        skip_all,
527        fields(
528            matrix.homeserver = self.homeserver,
529            matrix.mxid = mxid,
530            matrix.displayname = displayname,
531        ),
532        err(Debug),
533    )]
534    async fn set_displayname(&self, mxid: &str, displayname: &str) -> Result<(), anyhow::Error> {
535        let mxid = urlencoding::encode(mxid);
536        let response = self
537            .put(&format!("_matrix/client/v3/profile/{mxid}/displayname"))
538            .json(&SetDisplayNameRequest { displayname })
539            .send_traced()
540            .await
541            .context("Failed to set displayname in Synapse")?;
542
543        let response = response
544            .error_for_synapse_error()
545            .await
546            .context("Unexpected HTTP response while setting displayname in Synapse")?;
547
548        if response.status() != StatusCode::OK {
549            bail!(
550                "Unexpected HTTP code while setting displayname in Synapse: {}",
551                response.status()
552            );
553        }
554
555        Ok(())
556    }
557
558    #[tracing::instrument(
559        name = "homeserver.unset_displayname",
560        skip_all,
561        fields(
562            matrix.homeserver = self.homeserver,
563            matrix.mxid = mxid,
564        ),
565        err(Display),
566    )]
567    async fn unset_displayname(&self, mxid: &str) -> Result<(), anyhow::Error> {
568        self.set_displayname(mxid, "").await
569    }
570
571    #[tracing::instrument(
572        name = "homeserver.allow_cross_signing_reset",
573        skip_all,
574        fields(
575            matrix.homeserver = self.homeserver,
576            matrix.mxid = mxid,
577        ),
578        err(Debug),
579    )]
580    async fn allow_cross_signing_reset(&self, mxid: &str) -> Result<(), anyhow::Error> {
581        let mxid = urlencoding::encode(mxid);
582
583        let response = self
584            .post(&format!(
585                "_synapse/admin/v1/users/{mxid}/_allow_cross_signing_replacement_without_uia"
586            ))
587            .json(&SynapseAllowCrossSigningResetRequest {})
588            .send_traced()
589            .await
590            .context("Failed to allow cross-signing reset in Synapse")?;
591
592        let response = response
593            .error_for_synapse_error()
594            .await
595            .context("Unexpected HTTP response while allowing cross-signing reset in Synapse")?;
596
597        if response.status() != StatusCode::OK {
598            bail!(
599                "Unexpected HTTP code while allowing cross-signing reset in Synapse: {}",
600                response.status(),
601            );
602        }
603
604        Ok(())
605    }
606}