1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
67use anyhow::Context;
8use async_trait::async_trait;
9use mas_storage::{
10 RepositoryAccess,
11 compat::CompatSessionFilter,
12 oauth2::OAuth2SessionFilter,
13 queue::{DeactivateUserJob, ReactivateUserJob},
14 user::{BrowserSessionFilter, UserEmailFilter, UserRepository},
15};
16use tracing::info;
1718use crate::{
19 State,
20 new_queue::{JobContext, JobError, RunnableJob},
21};
2223/// Job to deactivate a user, both locally and on the Matrix homeserver.
24#[async_trait]
25impl RunnableJob for DeactivateUserJob {
26#[tracing::instrument(
27 name = "job.deactivate_user"
28fields(user.id = %self.user_id(), erase = %self.hs_erase()),
29 skip_all,
30 err,
31 )]
32async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
33let clock = state.clock();
34let matrix = state.matrix_connection();
35let mut repo = state.repository().await.map_err(JobError::retry)?;
3637let user = repo
38 .user()
39 .lookup(self.user_id())
40 .await
41.map_err(JobError::retry)?
42.context("User not found")
43 .map_err(JobError::fail)?;
4445// Let's first lock & deactivate the user
46let user = repo
47 .user()
48 .lock(&clock, user)
49 .await
50.context("Failed to lock user")
51 .map_err(JobError::retry)?;
5253let user = repo
54 .user()
55 .deactivate(&clock, user)
56 .await
57.context("Failed to deactivate user")
58 .map_err(JobError::retry)?;
5960// Kill all sessions for the user
61let n = repo
62 .browser_session()
63 .finish_bulk(
64&clock,
65 BrowserSessionFilter::new().for_user(&user).active_only(),
66 )
67 .await
68.map_err(JobError::retry)?;
69info!(affected = n, "Killed all browser sessions for user");
7071let n = repo
72 .oauth2_session()
73 .finish_bulk(
74&clock,
75 OAuth2SessionFilter::new().for_user(&user).active_only(),
76 )
77 .await
78.map_err(JobError::retry)?;
79info!(affected = n, "Killed all OAuth 2.0 sessions for user");
8081let n = repo
82 .compat_session()
83 .finish_bulk(
84&clock,
85 CompatSessionFilter::new().for_user(&user).active_only(),
86 )
87 .await
88.map_err(JobError::retry)?;
89info!(affected = n, "Killed all compatibility sessions for user");
9091// Delete all the email addresses for the user
92let n = repo
93 .user_email()
94 .remove_bulk(UserEmailFilter::new().for_user(&user))
95 .await
96.map_err(JobError::retry)?;
97info!(affected = n, "Removed all email addresses for user");
9899// Before calling back to the homeserver, commit the changes to the database, as
100 // we want the user to be locked out as soon as possible
101repo.save().await.map_err(JobError::retry)?;
102103let mxid = matrix.mxid(&user.username);
104info!("Deactivating user {} on homeserver", mxid);
105 matrix
106 .delete_user(&mxid, self.hs_erase())
107 .await
108.map_err(JobError::retry)?;
109110Ok(())
111 }
112}
113114/// Job to reactivate a user, both locally and on the Matrix homeserver.
115#[async_trait]
116impl RunnableJob for ReactivateUserJob {
117#[tracing::instrument(
118 name = "job.reactivate_user",
119 fields(user.id = %self.user_id()),
120 skip_all,
121 err,
122 )]
123async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
124let matrix = state.matrix_connection();
125let mut repo = state.repository().await.map_err(JobError::retry)?;
126127let user = repo
128 .user()
129 .lookup(self.user_id())
130 .await
131.map_err(JobError::retry)?
132.context("User not found")
133 .map_err(JobError::fail)?;
134135let mxid = matrix.mxid(&user.username);
136info!("Reactivating user {} on homeserver", mxid);
137 matrix
138 .reactivate_user(&mxid)
139 .await
140.map_err(JobError::retry)?;
141142// We want to unlock the user from our side only once it has been reactivated on
143 // the homeserver
144let _user = repo.user().unlock(user).await.map_err(JobError::retry)?;
145 repo.save().await.map_err(JobError::retry)?;
146147Ok(())
148 }
149}