Skip to main content

mas_storage_pg/compat/
session.rs

1// Copyright 2025, 2026 Element Creations Ltd.
2// Copyright 2024, 2025 New Vector Ltd.
3// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
4//
5// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
6// Please see LICENSE files in the repository root for full details.
7
8use std::net::IpAddr;
9
10use async_trait::async_trait;
11use chrono::{DateTime, Utc};
12use mas_data_model::{
13    BrowserSession, Clock, CompatSession, CompatSessionState, CompatSsoLogin, CompatSsoLoginState,
14    Device, User,
15};
16use mas_storage::{
17    Page, Pagination,
18    compat::{CompatSessionFilter, CompatSessionRepository},
19    pagination::Node,
20};
21use rand::RngCore;
22use sea_query::{Expr, PostgresQueryBuilder, Query, enum_def};
23use sea_query_binder::SqlxBinder;
24use sqlx::PgConnection;
25use ulid::Ulid;
26use url::Url;
27use uuid::Uuid;
28
29use crate::{
30    DatabaseError, DatabaseInconsistencyError,
31    filter::{Filter, StatementExt, StatementWithJoinsExt},
32    iden::{CompatSessions, CompatSsoLogins, UserSessions},
33    pagination::QueryBuilderExt,
34    tracing::ExecuteExt,
35    ulid_at::{max_ulid_at, min_ulid_at},
36};
37
38/// An implementation of [`CompatSessionRepository`] for a PostgreSQL connection
39pub struct PgCompatSessionRepository<'c> {
40    conn: &'c mut PgConnection,
41}
42
43impl<'c> PgCompatSessionRepository<'c> {
44    /// Create a new [`PgCompatSessionRepository`] from an active PostgreSQL
45    /// connection
46    pub fn new(conn: &'c mut PgConnection) -> Self {
47        Self { conn }
48    }
49}
50
51struct CompatSessionLookup {
52    compat_session_id: Uuid,
53    device_id: Option<String>,
54    human_name: Option<String>,
55    user_id: Uuid,
56    user_session_id: Option<Uuid>,
57    created_at: DateTime<Utc>,
58    finished_at: Option<DateTime<Utc>>,
59    is_synapse_admin: bool,
60    user_agent: Option<String>,
61    last_active_at: Option<DateTime<Utc>>,
62    last_active_ip: Option<IpAddr>,
63}
64
65impl Node<Ulid> for CompatSessionLookup {
66    fn cursor(&self) -> Ulid {
67        self.compat_session_id.into()
68    }
69}
70
71impl From<CompatSessionLookup> for CompatSession {
72    fn from(value: CompatSessionLookup) -> Self {
73        let id = value.compat_session_id.into();
74
75        let state = match value.finished_at {
76            None => CompatSessionState::Valid,
77            Some(finished_at) => CompatSessionState::Finished { finished_at },
78        };
79
80        CompatSession {
81            id,
82            state,
83            user_id: value.user_id.into(),
84            user_session_id: value.user_session_id.map(Ulid::from),
85            device: value.device_id.map(Device::from),
86            human_name: value.human_name,
87            created_at: value.created_at,
88            is_synapse_admin: value.is_synapse_admin,
89            user_agent: value.user_agent,
90            last_active_at: value.last_active_at,
91            last_active_ip: value.last_active_ip,
92        }
93    }
94}
95
96#[derive(sqlx::FromRow)]
97#[enum_def]
98struct CompatSessionAndSsoLoginLookup {
99    compat_session_id: Uuid,
100    device_id: Option<String>,
101    human_name: Option<String>,
102    user_id: Uuid,
103    user_session_id: Option<Uuid>,
104    created_at: DateTime<Utc>,
105    finished_at: Option<DateTime<Utc>>,
106    is_synapse_admin: bool,
107    user_agent: Option<String>,
108    last_active_at: Option<DateTime<Utc>>,
109    last_active_ip: Option<IpAddr>,
110    compat_sso_login_id: Option<Uuid>,
111    compat_sso_login_token: Option<String>,
112    compat_sso_login_redirect_uri: Option<String>,
113    compat_sso_login_created_at: Option<DateTime<Utc>>,
114    compat_sso_login_fulfilled_at: Option<DateTime<Utc>>,
115    compat_sso_login_exchanged_at: Option<DateTime<Utc>>,
116}
117
118impl Node<Ulid> for CompatSessionAndSsoLoginLookup {
119    fn cursor(&self) -> Ulid {
120        self.compat_session_id.into()
121    }
122}
123
124impl TryFrom<CompatSessionAndSsoLoginLookup> for (CompatSession, Option<CompatSsoLogin>) {
125    type Error = DatabaseInconsistencyError;
126
127    fn try_from(value: CompatSessionAndSsoLoginLookup) -> Result<Self, Self::Error> {
128        let id = value.compat_session_id.into();
129
130        let state = match value.finished_at {
131            None => CompatSessionState::Valid,
132            Some(finished_at) => CompatSessionState::Finished { finished_at },
133        };
134
135        let session = CompatSession {
136            id,
137            state,
138            user_id: value.user_id.into(),
139            device: value.device_id.map(Device::from),
140            human_name: value.human_name,
141            user_session_id: value.user_session_id.map(Ulid::from),
142            created_at: value.created_at,
143            is_synapse_admin: value.is_synapse_admin,
144            user_agent: value.user_agent,
145            last_active_at: value.last_active_at,
146            last_active_ip: value.last_active_ip,
147        };
148
149        match (
150            value.compat_sso_login_id,
151            value.compat_sso_login_token,
152            value.compat_sso_login_redirect_uri,
153            value.compat_sso_login_created_at,
154            value.compat_sso_login_fulfilled_at,
155            value.compat_sso_login_exchanged_at,
156        ) {
157            (None, None, None, None, None, None) => Ok((session, None)),
158            (
159                Some(id),
160                Some(login_token),
161                Some(redirect_uri),
162                Some(created_at),
163                fulfilled_at,
164                exchanged_at,
165            ) => {
166                let id = id.into();
167                let redirect_uri = Url::parse(&redirect_uri).map_err(|e| {
168                    DatabaseInconsistencyError::on("compat_sso_logins")
169                        .column("redirect_uri")
170                        .row(id)
171                        .source(e)
172                })?;
173
174                let state = match (fulfilled_at, exchanged_at) {
175                    (Some(fulfilled_at), Some(exchanged_at)) => CompatSsoLoginState::Exchanged {
176                        fulfilled_at,
177                        exchanged_at,
178                        compat_session_id: session.id,
179                    },
180                    _ => return Err(DatabaseInconsistencyError::on("compat_sso_logins").row(id)),
181                };
182
183                let login = CompatSsoLogin {
184                    id,
185                    redirect_uri,
186                    login_token,
187                    created_at,
188                    state,
189                };
190
191                Ok((session, Some(login)))
192            }
193            _ => Err(DatabaseInconsistencyError::on("compat_sso_logins").row(id)),
194        }
195    }
196}
197
198impl Filter for CompatSessionFilter<'_> {
199    fn generate_condition(&self, has_joins: bool) -> impl sea_query::IntoCondition {
200        sea_query::Condition::all()
201            .add_option(self.user().map(|user| {
202                Expr::col((CompatSessions::Table, CompatSessions::UserId)).eq(Uuid::from(user.id))
203            }))
204            .add_option(self.browser_session().map(|browser_session| {
205                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId))
206                    .eq(Uuid::from(browser_session.id))
207            }))
208            .add_option(self.browser_session_filter().map(|browser_session_filter| {
209                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)).in_subquery(
210                    Query::select()
211                        .expr(Expr::col((
212                            UserSessions::Table,
213                            UserSessions::UserSessionId,
214                        )))
215                        .apply_filter(browser_session_filter)
216                        .from(UserSessions::Table)
217                        .take(),
218                )
219            }))
220            .add_option(self.state().map(|state| {
221                if state.is_active() {
222                    Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)).is_null()
223                } else {
224                    Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)).is_not_null()
225                }
226            }))
227            .add_option(self.auth_type().map(|auth_type| {
228                // In in the SELECT to list sessions, we can rely on the JOINed table, whereas
229                // in other queries we need to do a subquery
230                if has_joins {
231                    if auth_type.is_sso_login() {
232                        Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId))
233                            .is_not_null()
234                    } else {
235                        Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId))
236                            .is_null()
237                    }
238                } else {
239                    // This builds either a:
240                    // `WHERE compat_session_id = ANY(...)`
241                    // or a `WHERE compat_session_id <> ALL(...)`
242                    let compat_sso_logins = Query::select()
243                        .expr(Expr::col((
244                            CompatSsoLogins::Table,
245                            CompatSsoLogins::CompatSessionId,
246                        )))
247                        .from(CompatSsoLogins::Table)
248                        .take();
249
250                    if auth_type.is_sso_login() {
251                        Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
252                            .eq(Expr::any(compat_sso_logins))
253                    } else {
254                        Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
255                            .ne(Expr::all(compat_sso_logins))
256                    }
257                }
258            }))
259            .add_option(self.last_active_after().map(|last_active_after| {
260                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt))
261                    .gt(last_active_after)
262            }))
263            .add_option(self.last_active_before().map(|last_active_before| {
264                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt))
265                    .lt(last_active_before)
266            }))
267            .add_option(self.created_after().map(|created_after| {
268                // ULIDs encode the creation time in their high 48 bits, so we
269                // can use the primary key index to filter on creation time
270                // without touching the `created_at` column.
271                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
272                    .gt(max_ulid_at(created_after))
273            }))
274            .add_option(self.created_before().map(|created_before| {
275                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
276                    .lt(min_ulid_at(created_before))
277            }))
278            .add_option(self.device().map(|device| {
279                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)).eq(device.as_str())
280            }))
281    }
282}
283
284#[async_trait]
285impl CompatSessionRepository for PgCompatSessionRepository<'_> {
286    type Error = DatabaseError;
287
288    #[tracing::instrument(
289        name = "db.compat_session.lookup",
290        skip_all,
291        fields(
292            db.query.text,
293            compat_session.id = %id,
294        ),
295        err,
296    )]
297    async fn lookup(&mut self, id: Ulid) -> Result<Option<CompatSession>, Self::Error> {
298        let res = sqlx::query_as!(
299            CompatSessionLookup,
300            r#"
301                SELECT compat_session_id
302                     , device_id
303                     , human_name
304                     , user_id
305                     , user_session_id
306                     , created_at
307                     , finished_at
308                     , is_synapse_admin
309                     , user_agent
310                     , last_active_at
311                     , last_active_ip as "last_active_ip: IpAddr"
312                FROM compat_sessions
313                WHERE compat_session_id = $1
314            "#,
315            Uuid::from(id),
316        )
317        .traced()
318        .fetch_optional(&mut *self.conn)
319        .await?;
320
321        let Some(res) = res else { return Ok(None) };
322
323        Ok(Some(res.into()))
324    }
325
326    #[tracing::instrument(
327        name = "db.compat_session.add",
328        skip_all,
329        fields(
330            db.query.text,
331            compat_session.id,
332            %user.id,
333            %user.username,
334            compat_session.device.id = device.as_str(),
335        ),
336        err,
337    )]
338    async fn add(
339        &mut self,
340        rng: &mut (dyn RngCore + Send),
341        clock: &dyn Clock,
342        user: &User,
343        device: Device,
344        browser_session: Option<&BrowserSession>,
345        is_synapse_admin: bool,
346        human_name: Option<String>,
347    ) -> Result<CompatSession, Self::Error> {
348        let created_at = clock.now();
349        let id = Ulid::from_datetime_with_source(created_at.into(), rng);
350        tracing::Span::current().record("compat_session.id", tracing::field::display(id));
351
352        sqlx::query!(
353            r#"
354                INSERT INTO compat_sessions
355                    (compat_session_id, user_id, device_id,
356                     user_session_id, created_at, is_synapse_admin,
357                     human_name)
358                VALUES ($1, $2, $3, $4, $5, $6, $7)
359            "#,
360            Uuid::from(id),
361            Uuid::from(user.id),
362            device.as_str(),
363            browser_session.map(|s| Uuid::from(s.id)),
364            created_at,
365            is_synapse_admin,
366            human_name.as_deref(),
367        )
368        .traced()
369        .execute(&mut *self.conn)
370        .await?;
371
372        Ok(CompatSession {
373            id,
374            state: CompatSessionState::default(),
375            user_id: user.id,
376            device: Some(device),
377            human_name,
378            user_session_id: browser_session.map(|s| s.id),
379            created_at,
380            is_synapse_admin,
381            user_agent: None,
382            last_active_at: None,
383            last_active_ip: None,
384        })
385    }
386
387    #[tracing::instrument(
388        name = "db.compat_session.finish",
389        skip_all,
390        fields(
391            db.query.text,
392            %compat_session.id,
393            user.id = %compat_session.user_id,
394            compat_session.device.id = compat_session.device.as_ref().map(mas_data_model::Device::as_str),
395        ),
396        err,
397    )]
398    async fn finish(
399        &mut self,
400        clock: &dyn Clock,
401        compat_session: CompatSession,
402    ) -> Result<CompatSession, Self::Error> {
403        let finished_at = clock.now();
404
405        let res = sqlx::query!(
406            r#"
407                UPDATE compat_sessions cs
408                SET finished_at = $2
409                WHERE compat_session_id = $1
410            "#,
411            Uuid::from(compat_session.id),
412            finished_at,
413        )
414        .traced()
415        .execute(&mut *self.conn)
416        .await?;
417
418        DatabaseError::ensure_affected_rows(&res, 1)?;
419
420        let compat_session = compat_session
421            .finish(finished_at)
422            .map_err(DatabaseError::to_invalid_operation)?;
423
424        Ok(compat_session)
425    }
426
427    #[tracing::instrument(
428        name = "db.compat_session.finish_bulk",
429        skip_all,
430        fields(db.query.text),
431        err,
432    )]
433    async fn finish_bulk(
434        &mut self,
435        clock: &dyn Clock,
436        filter: CompatSessionFilter<'_>,
437    ) -> Result<usize, Self::Error> {
438        let finished_at = clock.now();
439        let (sql, arguments) = Query::update()
440            .table(CompatSessions::Table)
441            .value(CompatSessions::FinishedAt, finished_at)
442            .apply_filter(filter)
443            .build_sqlx(PostgresQueryBuilder);
444
445        let res = sqlx::query_with(&sql, arguments)
446            .traced()
447            .execute(&mut *self.conn)
448            .await?;
449
450        Ok(res.rows_affected().try_into().unwrap_or(usize::MAX))
451    }
452
453    #[tracing::instrument(
454        name = "db.compat_session.list",
455        skip_all,
456        fields(
457            db.query.text,
458        ),
459        err,
460    )]
461    async fn list(
462        &mut self,
463        filter: CompatSessionFilter<'_>,
464        pagination: Pagination,
465    ) -> Result<Page<(CompatSession, Option<CompatSsoLogin>)>, Self::Error> {
466        let (sql, arguments) = Query::select()
467            .expr_as(
468                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
469                CompatSessionAndSsoLoginLookupIden::CompatSessionId,
470            )
471            .expr_as(
472                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
473                CompatSessionAndSsoLoginLookupIden::DeviceId,
474            )
475            .expr_as(
476                Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
477                CompatSessionAndSsoLoginLookupIden::HumanName,
478            )
479            .expr_as(
480                Expr::col((CompatSessions::Table, CompatSessions::UserId)),
481                CompatSessionAndSsoLoginLookupIden::UserId,
482            )
483            .expr_as(
484                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
485                CompatSessionAndSsoLoginLookupIden::UserSessionId,
486            )
487            .expr_as(
488                Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
489                CompatSessionAndSsoLoginLookupIden::CreatedAt,
490            )
491            .expr_as(
492                Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
493                CompatSessionAndSsoLoginLookupIden::FinishedAt,
494            )
495            .expr_as(
496                Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
497                CompatSessionAndSsoLoginLookupIden::IsSynapseAdmin,
498            )
499            .expr_as(
500                Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
501                CompatSessionAndSsoLoginLookupIden::UserAgent,
502            )
503            .expr_as(
504                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
505                CompatSessionAndSsoLoginLookupIden::LastActiveAt,
506            )
507            .expr_as(
508                Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
509                CompatSessionAndSsoLoginLookupIden::LastActiveIp,
510            )
511            .expr_as(
512                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId)),
513                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginId,
514            )
515            .expr_as(
516                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::LoginToken)),
517                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginToken,
518            )
519            .expr_as(
520                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::RedirectUri)),
521                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginRedirectUri,
522            )
523            .expr_as(
524                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CreatedAt)),
525                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginCreatedAt,
526            )
527            .expr_as(
528                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::FulfilledAt)),
529                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginFulfilledAt,
530            )
531            .expr_as(
532                Expr::col((CompatSsoLogins::Table, CompatSsoLogins::ExchangedAt)),
533                CompatSessionAndSsoLoginLookupIden::CompatSsoLoginExchangedAt,
534            )
535            .from(CompatSessions::Table)
536            .left_join(
537                CompatSsoLogins::Table,
538                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
539                    .equals((CompatSsoLogins::Table, CompatSsoLogins::CompatSessionId)),
540            )
541            .apply_filter_with_joins(filter)
542            .generate_pagination(
543                (CompatSessions::Table, CompatSessions::CompatSessionId),
544                pagination,
545            )
546            .build_sqlx(PostgresQueryBuilder);
547
548        let edges: Vec<CompatSessionAndSsoLoginLookup> = sqlx::query_as_with(&sql, arguments)
549            .traced()
550            .fetch_all(&mut *self.conn)
551            .await?;
552
553        let page = pagination.process(edges).try_map(TryFrom::try_from)?;
554
555        Ok(page)
556    }
557
558    #[tracing::instrument(
559        name = "db.compat_session.count",
560        skip_all,
561        fields(
562            db.query.text,
563        ),
564        err,
565    )]
566    async fn count(&mut self, filter: CompatSessionFilter<'_>) -> Result<usize, Self::Error> {
567        let (sql, arguments) = sea_query::Query::select()
568            .expr(Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)).count())
569            .from(CompatSessions::Table)
570            .apply_filter(filter)
571            .build_sqlx(PostgresQueryBuilder);
572
573        let count: i64 = sqlx::query_scalar_with(&sql, arguments)
574            .traced()
575            .fetch_one(&mut *self.conn)
576            .await?;
577
578        count
579            .try_into()
580            .map_err(DatabaseError::to_invalid_operation)
581    }
582
583    #[tracing::instrument(
584        name = "db.compat_session.record_batch_activity",
585        skip_all,
586        fields(
587            db.query.text,
588        ),
589        err,
590    )]
591    async fn record_batch_activity(
592        &mut self,
593        mut activities: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
594    ) -> Result<(), Self::Error> {
595        // Sort the activity by ID, so that when batching the updates, Postgres
596        // locks the rows in a stable order, preventing deadlocks
597        activities.sort_unstable();
598        let mut ids = Vec::with_capacity(activities.len());
599        let mut last_activities = Vec::with_capacity(activities.len());
600        let mut ips = Vec::with_capacity(activities.len());
601
602        for (id, last_activity, ip) in activities {
603            ids.push(Uuid::from(id));
604            last_activities.push(last_activity);
605            ips.push(ip);
606        }
607
608        let res = sqlx::query!(
609            r#"
610                UPDATE compat_sessions
611                SET last_active_at = GREATEST(t.last_active_at, compat_sessions.last_active_at)
612                  , last_active_ip = COALESCE(t.last_active_ip, compat_sessions.last_active_ip)
613                FROM (
614                    SELECT *
615                    FROM UNNEST($1::uuid[], $2::timestamptz[], $3::inet[])
616                        AS t(compat_session_id, last_active_at, last_active_ip)
617                ) AS t
618                WHERE compat_sessions.compat_session_id = t.compat_session_id
619            "#,
620            &ids,
621            &last_activities,
622            &ips as &[Option<IpAddr>],
623        )
624        .traced()
625        .execute(&mut *self.conn)
626        .await?;
627
628        DatabaseError::ensure_affected_rows(&res, ids.len().try_into().unwrap_or(u64::MAX))?;
629
630        Ok(())
631    }
632
633    #[tracing::instrument(
634        name = "db.compat_session.record_user_agent",
635        skip_all,
636        fields(
637            db.query.text,
638            %compat_session.id,
639        ),
640        err,
641    )]
642    async fn record_user_agent(
643        &mut self,
644        mut compat_session: CompatSession,
645        user_agent: String,
646    ) -> Result<CompatSession, Self::Error> {
647        let res = sqlx::query!(
648            r#"
649            UPDATE compat_sessions
650            SET user_agent = $2
651            WHERE compat_session_id = $1
652        "#,
653            Uuid::from(compat_session.id),
654            &*user_agent,
655        )
656        .traced()
657        .execute(&mut *self.conn)
658        .await?;
659
660        compat_session.user_agent = Some(user_agent);
661
662        DatabaseError::ensure_affected_rows(&res, 1)?;
663
664        Ok(compat_session)
665    }
666
667    #[tracing::instrument(
668        name = "repository.compat_session.set_human_name",
669        skip(self),
670        fields(
671            compat_session.id = %compat_session.id,
672            compat_session.human_name = ?human_name,
673        ),
674        err,
675    )]
676    async fn set_human_name(
677        &mut self,
678        mut compat_session: CompatSession,
679        human_name: Option<String>,
680    ) -> Result<CompatSession, Self::Error> {
681        let res = sqlx::query!(
682            r#"
683            UPDATE compat_sessions
684            SET human_name = $2
685            WHERE compat_session_id = $1
686        "#,
687            Uuid::from(compat_session.id),
688            human_name.as_deref(),
689        )
690        .traced()
691        .execute(&mut *self.conn)
692        .await?;
693
694        compat_session.human_name = human_name;
695
696        DatabaseError::ensure_affected_rows(&res, 1)?;
697
698        Ok(compat_session)
699    }
700
701    #[tracing::instrument(
702        name = "db.compat_session.cleanup_finished",
703        skip_all,
704        fields(
705            db.query.text,
706        ),
707        err,
708    )]
709    async fn cleanup_finished(
710        &mut self,
711        since: Option<DateTime<Utc>>,
712        until: DateTime<Utc>,
713        limit: usize,
714    ) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
715        let res = sqlx::query!(
716            r#"
717                WITH
718                    to_delete AS (
719                        SELECT compat_session_id, finished_at
720                        FROM compat_sessions
721                        WHERE finished_at IS NOT NULL
722                          AND ($1::timestamptz IS NULL OR finished_at >= $1)
723                          AND finished_at < $2
724                        ORDER BY finished_at ASC
725                        LIMIT $3
726                        FOR UPDATE
727                    ),
728
729                    -- Delete refresh tokens first because they reference access tokens
730                    deleted_refresh_tokens AS (
731                        DELETE FROM compat_refresh_tokens
732                        USING to_delete
733                        WHERE compat_refresh_tokens.compat_session_id = to_delete.compat_session_id
734                    ),
735
736                    deleted_access_tokens AS (
737                        DELETE FROM compat_access_tokens
738                        USING to_delete
739                        WHERE compat_access_tokens.compat_session_id = to_delete.compat_session_id
740                    ),
741
742                    deleted_sso_logins AS (
743                        DELETE FROM compat_sso_logins
744                        USING to_delete
745                        WHERE compat_sso_logins.compat_session_id = to_delete.compat_session_id
746                    ),
747
748                    deleted_sessions AS (
749                        DELETE FROM compat_sessions
750                        USING to_delete
751                        WHERE compat_sessions.compat_session_id = to_delete.compat_session_id
752                        RETURNING compat_sessions.finished_at
753                    )
754
755                SELECT
756                    COUNT(*) as "count!",
757                    MAX(finished_at) as last_finished_at
758                FROM deleted_sessions
759            "#,
760            since,
761            until,
762            i64::try_from(limit).unwrap_or(i64::MAX),
763        )
764        .traced()
765        .fetch_one(&mut *self.conn)
766        .await?;
767
768        Ok((
769            res.count.try_into().unwrap_or(usize::MAX),
770            res.last_finished_at,
771        ))
772    }
773
774    #[tracing::instrument(
775        name = "db.compat_session.cleanup_inactive_ips",
776        skip_all,
777        fields(
778            db.query.text,
779            since = since.map(tracing::field::display),
780            threshold = %threshold,
781            limit = limit,
782        ),
783        err,
784    )]
785    async fn cleanup_inactive_ips(
786        &mut self,
787        since: Option<DateTime<Utc>>,
788        threshold: DateTime<Utc>,
789        limit: usize,
790    ) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
791        let res = sqlx::query!(
792            r#"
793                WITH to_update AS (
794                    SELECT compat_session_id, last_active_at
795                    FROM compat_sessions
796                    WHERE last_active_ip IS NOT NULL
797                      AND last_active_at IS NOT NULL
798                      AND ($1::timestamptz IS NULL OR last_active_at >= $1)
799                      AND last_active_at < $2
800                    ORDER BY last_active_at ASC
801                    LIMIT $3
802                    FOR UPDATE
803                ),
804                updated AS (
805                    UPDATE compat_sessions
806                    SET last_active_ip = NULL
807                    FROM to_update
808                    WHERE compat_sessions.compat_session_id = to_update.compat_session_id
809                    RETURNING compat_sessions.last_active_at
810                )
811                SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated
812            "#,
813            since,
814            threshold,
815            i64::try_from(limit).unwrap_or(i64::MAX),
816        )
817        .traced()
818        .fetch_one(&mut *self.conn)
819        .await?;
820
821        Ok((
822            res.count.try_into().unwrap_or(usize::MAX),
823            res.last_active_at,
824        ))
825    }
826}