1use std::net::IpAddr;
9
10use async_trait::async_trait;
11use chrono::{DateTime, Utc};
12use mas_data_model::{
13 BrowserSession, Clock, CompatSession, CompatSessionState, CompatSsoLogin, CompatSsoLoginState,
14 Device, User,
15};
16use mas_storage::{
17 Page, Pagination,
18 compat::{CompatSessionFilter, CompatSessionRepository},
19 pagination::Node,
20};
21use rand::RngCore;
22use sea_query::{Expr, PostgresQueryBuilder, Query, enum_def};
23use sea_query_binder::SqlxBinder;
24use sqlx::PgConnection;
25use ulid::Ulid;
26use url::Url;
27use uuid::Uuid;
28
29use crate::{
30 DatabaseError, DatabaseInconsistencyError,
31 filter::{Filter, StatementExt, StatementWithJoinsExt},
32 iden::{CompatSessions, CompatSsoLogins, UserSessions},
33 pagination::QueryBuilderExt,
34 tracing::ExecuteExt,
35 ulid_at::{max_ulid_at, min_ulid_at},
36};
37
38pub struct PgCompatSessionRepository<'c> {
40 conn: &'c mut PgConnection,
41}
42
43impl<'c> PgCompatSessionRepository<'c> {
44 pub fn new(conn: &'c mut PgConnection) -> Self {
47 Self { conn }
48 }
49}
50
51struct CompatSessionLookup {
52 compat_session_id: Uuid,
53 device_id: Option<String>,
54 human_name: Option<String>,
55 user_id: Uuid,
56 user_session_id: Option<Uuid>,
57 created_at: DateTime<Utc>,
58 finished_at: Option<DateTime<Utc>>,
59 is_synapse_admin: bool,
60 user_agent: Option<String>,
61 last_active_at: Option<DateTime<Utc>>,
62 last_active_ip: Option<IpAddr>,
63}
64
65impl Node<Ulid> for CompatSessionLookup {
66 fn cursor(&self) -> Ulid {
67 self.compat_session_id.into()
68 }
69}
70
71impl From<CompatSessionLookup> for CompatSession {
72 fn from(value: CompatSessionLookup) -> Self {
73 let id = value.compat_session_id.into();
74
75 let state = match value.finished_at {
76 None => CompatSessionState::Valid,
77 Some(finished_at) => CompatSessionState::Finished { finished_at },
78 };
79
80 CompatSession {
81 id,
82 state,
83 user_id: value.user_id.into(),
84 user_session_id: value.user_session_id.map(Ulid::from),
85 device: value.device_id.map(Device::from),
86 human_name: value.human_name,
87 created_at: value.created_at,
88 is_synapse_admin: value.is_synapse_admin,
89 user_agent: value.user_agent,
90 last_active_at: value.last_active_at,
91 last_active_ip: value.last_active_ip,
92 }
93 }
94}
95
96#[derive(sqlx::FromRow)]
97#[enum_def]
98struct CompatSessionAndSsoLoginLookup {
99 compat_session_id: Uuid,
100 device_id: Option<String>,
101 human_name: Option<String>,
102 user_id: Uuid,
103 user_session_id: Option<Uuid>,
104 created_at: DateTime<Utc>,
105 finished_at: Option<DateTime<Utc>>,
106 is_synapse_admin: bool,
107 user_agent: Option<String>,
108 last_active_at: Option<DateTime<Utc>>,
109 last_active_ip: Option<IpAddr>,
110 compat_sso_login_id: Option<Uuid>,
111 compat_sso_login_token: Option<String>,
112 compat_sso_login_redirect_uri: Option<String>,
113 compat_sso_login_created_at: Option<DateTime<Utc>>,
114 compat_sso_login_fulfilled_at: Option<DateTime<Utc>>,
115 compat_sso_login_exchanged_at: Option<DateTime<Utc>>,
116}
117
118impl Node<Ulid> for CompatSessionAndSsoLoginLookup {
119 fn cursor(&self) -> Ulid {
120 self.compat_session_id.into()
121 }
122}
123
124impl TryFrom<CompatSessionAndSsoLoginLookup> for (CompatSession, Option<CompatSsoLogin>) {
125 type Error = DatabaseInconsistencyError;
126
127 fn try_from(value: CompatSessionAndSsoLoginLookup) -> Result<Self, Self::Error> {
128 let id = value.compat_session_id.into();
129
130 let state = match value.finished_at {
131 None => CompatSessionState::Valid,
132 Some(finished_at) => CompatSessionState::Finished { finished_at },
133 };
134
135 let session = CompatSession {
136 id,
137 state,
138 user_id: value.user_id.into(),
139 device: value.device_id.map(Device::from),
140 human_name: value.human_name,
141 user_session_id: value.user_session_id.map(Ulid::from),
142 created_at: value.created_at,
143 is_synapse_admin: value.is_synapse_admin,
144 user_agent: value.user_agent,
145 last_active_at: value.last_active_at,
146 last_active_ip: value.last_active_ip,
147 };
148
149 match (
150 value.compat_sso_login_id,
151 value.compat_sso_login_token,
152 value.compat_sso_login_redirect_uri,
153 value.compat_sso_login_created_at,
154 value.compat_sso_login_fulfilled_at,
155 value.compat_sso_login_exchanged_at,
156 ) {
157 (None, None, None, None, None, None) => Ok((session, None)),
158 (
159 Some(id),
160 Some(login_token),
161 Some(redirect_uri),
162 Some(created_at),
163 fulfilled_at,
164 exchanged_at,
165 ) => {
166 let id = id.into();
167 let redirect_uri = Url::parse(&redirect_uri).map_err(|e| {
168 DatabaseInconsistencyError::on("compat_sso_logins")
169 .column("redirect_uri")
170 .row(id)
171 .source(e)
172 })?;
173
174 let state = match (fulfilled_at, exchanged_at) {
175 (Some(fulfilled_at), Some(exchanged_at)) => CompatSsoLoginState::Exchanged {
176 fulfilled_at,
177 exchanged_at,
178 compat_session_id: session.id,
179 },
180 _ => return Err(DatabaseInconsistencyError::on("compat_sso_logins").row(id)),
181 };
182
183 let login = CompatSsoLogin {
184 id,
185 redirect_uri,
186 login_token,
187 created_at,
188 state,
189 };
190
191 Ok((session, Some(login)))
192 }
193 _ => Err(DatabaseInconsistencyError::on("compat_sso_logins").row(id)),
194 }
195 }
196}
197
198impl Filter for CompatSessionFilter<'_> {
199 fn generate_condition(&self, has_joins: bool) -> impl sea_query::IntoCondition {
200 sea_query::Condition::all()
201 .add_option(self.user().map(|user| {
202 Expr::col((CompatSessions::Table, CompatSessions::UserId)).eq(Uuid::from(user.id))
203 }))
204 .add_option(self.browser_session().map(|browser_session| {
205 Expr::col((CompatSessions::Table, CompatSessions::UserSessionId))
206 .eq(Uuid::from(browser_session.id))
207 }))
208 .add_option(self.browser_session_filter().map(|browser_session_filter| {
209 Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)).in_subquery(
210 Query::select()
211 .expr(Expr::col((
212 UserSessions::Table,
213 UserSessions::UserSessionId,
214 )))
215 .apply_filter(browser_session_filter)
216 .from(UserSessions::Table)
217 .take(),
218 )
219 }))
220 .add_option(self.state().map(|state| {
221 if state.is_active() {
222 Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)).is_null()
223 } else {
224 Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)).is_not_null()
225 }
226 }))
227 .add_option(self.auth_type().map(|auth_type| {
228 if has_joins {
231 if auth_type.is_sso_login() {
232 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId))
233 .is_not_null()
234 } else {
235 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId))
236 .is_null()
237 }
238 } else {
239 let compat_sso_logins = Query::select()
243 .expr(Expr::col((
244 CompatSsoLogins::Table,
245 CompatSsoLogins::CompatSessionId,
246 )))
247 .from(CompatSsoLogins::Table)
248 .take();
249
250 if auth_type.is_sso_login() {
251 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
252 .eq(Expr::any(compat_sso_logins))
253 } else {
254 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
255 .ne(Expr::all(compat_sso_logins))
256 }
257 }
258 }))
259 .add_option(self.last_active_after().map(|last_active_after| {
260 Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt))
261 .gt(last_active_after)
262 }))
263 .add_option(self.last_active_before().map(|last_active_before| {
264 Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt))
265 .lt(last_active_before)
266 }))
267 .add_option(self.created_after().map(|created_after| {
268 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
272 .gt(max_ulid_at(created_after))
273 }))
274 .add_option(self.created_before().map(|created_before| {
275 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
276 .lt(min_ulid_at(created_before))
277 }))
278 .add_option(self.device().map(|device| {
279 Expr::col((CompatSessions::Table, CompatSessions::DeviceId)).eq(device.as_str())
280 }))
281 }
282}
283
284#[async_trait]
285impl CompatSessionRepository for PgCompatSessionRepository<'_> {
286 type Error = DatabaseError;
287
288 #[tracing::instrument(
289 name = "db.compat_session.lookup",
290 skip_all,
291 fields(
292 db.query.text,
293 compat_session.id = %id,
294 ),
295 err,
296 )]
297 async fn lookup(&mut self, id: Ulid) -> Result<Option<CompatSession>, Self::Error> {
298 let res = sqlx::query_as!(
299 CompatSessionLookup,
300 r#"
301 SELECT compat_session_id
302 , device_id
303 , human_name
304 , user_id
305 , user_session_id
306 , created_at
307 , finished_at
308 , is_synapse_admin
309 , user_agent
310 , last_active_at
311 , last_active_ip as "last_active_ip: IpAddr"
312 FROM compat_sessions
313 WHERE compat_session_id = $1
314 "#,
315 Uuid::from(id),
316 )
317 .traced()
318 .fetch_optional(&mut *self.conn)
319 .await?;
320
321 let Some(res) = res else { return Ok(None) };
322
323 Ok(Some(res.into()))
324 }
325
326 #[tracing::instrument(
327 name = "db.compat_session.add",
328 skip_all,
329 fields(
330 db.query.text,
331 compat_session.id,
332 %user.id,
333 %user.username,
334 compat_session.device.id = device.as_str(),
335 ),
336 err,
337 )]
338 async fn add(
339 &mut self,
340 rng: &mut (dyn RngCore + Send),
341 clock: &dyn Clock,
342 user: &User,
343 device: Device,
344 browser_session: Option<&BrowserSession>,
345 is_synapse_admin: bool,
346 human_name: Option<String>,
347 ) -> Result<CompatSession, Self::Error> {
348 let created_at = clock.now();
349 let id = Ulid::from_datetime_with_source(created_at.into(), rng);
350 tracing::Span::current().record("compat_session.id", tracing::field::display(id));
351
352 sqlx::query!(
353 r#"
354 INSERT INTO compat_sessions
355 (compat_session_id, user_id, device_id,
356 user_session_id, created_at, is_synapse_admin,
357 human_name)
358 VALUES ($1, $2, $3, $4, $5, $6, $7)
359 "#,
360 Uuid::from(id),
361 Uuid::from(user.id),
362 device.as_str(),
363 browser_session.map(|s| Uuid::from(s.id)),
364 created_at,
365 is_synapse_admin,
366 human_name.as_deref(),
367 )
368 .traced()
369 .execute(&mut *self.conn)
370 .await?;
371
372 Ok(CompatSession {
373 id,
374 state: CompatSessionState::default(),
375 user_id: user.id,
376 device: Some(device),
377 human_name,
378 user_session_id: browser_session.map(|s| s.id),
379 created_at,
380 is_synapse_admin,
381 user_agent: None,
382 last_active_at: None,
383 last_active_ip: None,
384 })
385 }
386
387 #[tracing::instrument(
388 name = "db.compat_session.finish",
389 skip_all,
390 fields(
391 db.query.text,
392 %compat_session.id,
393 user.id = %compat_session.user_id,
394 compat_session.device.id = compat_session.device.as_ref().map(mas_data_model::Device::as_str),
395 ),
396 err,
397 )]
398 async fn finish(
399 &mut self,
400 clock: &dyn Clock,
401 compat_session: CompatSession,
402 ) -> Result<CompatSession, Self::Error> {
403 let finished_at = clock.now();
404
405 let res = sqlx::query!(
406 r#"
407 UPDATE compat_sessions cs
408 SET finished_at = $2
409 WHERE compat_session_id = $1
410 "#,
411 Uuid::from(compat_session.id),
412 finished_at,
413 )
414 .traced()
415 .execute(&mut *self.conn)
416 .await?;
417
418 DatabaseError::ensure_affected_rows(&res, 1)?;
419
420 let compat_session = compat_session
421 .finish(finished_at)
422 .map_err(DatabaseError::to_invalid_operation)?;
423
424 Ok(compat_session)
425 }
426
427 #[tracing::instrument(
428 name = "db.compat_session.finish_bulk",
429 skip_all,
430 fields(db.query.text),
431 err,
432 )]
433 async fn finish_bulk(
434 &mut self,
435 clock: &dyn Clock,
436 filter: CompatSessionFilter<'_>,
437 ) -> Result<usize, Self::Error> {
438 let finished_at = clock.now();
439 let (sql, arguments) = Query::update()
440 .table(CompatSessions::Table)
441 .value(CompatSessions::FinishedAt, finished_at)
442 .apply_filter(filter)
443 .build_sqlx(PostgresQueryBuilder);
444
445 let res = sqlx::query_with(&sql, arguments)
446 .traced()
447 .execute(&mut *self.conn)
448 .await?;
449
450 Ok(res.rows_affected().try_into().unwrap_or(usize::MAX))
451 }
452
453 #[tracing::instrument(
454 name = "db.compat_session.list",
455 skip_all,
456 fields(
457 db.query.text,
458 ),
459 err,
460 )]
461 async fn list(
462 &mut self,
463 filter: CompatSessionFilter<'_>,
464 pagination: Pagination,
465 ) -> Result<Page<(CompatSession, Option<CompatSsoLogin>)>, Self::Error> {
466 let (sql, arguments) = Query::select()
467 .expr_as(
468 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
469 CompatSessionAndSsoLoginLookupIden::CompatSessionId,
470 )
471 .expr_as(
472 Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
473 CompatSessionAndSsoLoginLookupIden::DeviceId,
474 )
475 .expr_as(
476 Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
477 CompatSessionAndSsoLoginLookupIden::HumanName,
478 )
479 .expr_as(
480 Expr::col((CompatSessions::Table, CompatSessions::UserId)),
481 CompatSessionAndSsoLoginLookupIden::UserId,
482 )
483 .expr_as(
484 Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
485 CompatSessionAndSsoLoginLookupIden::UserSessionId,
486 )
487 .expr_as(
488 Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
489 CompatSessionAndSsoLoginLookupIden::CreatedAt,
490 )
491 .expr_as(
492 Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
493 CompatSessionAndSsoLoginLookupIden::FinishedAt,
494 )
495 .expr_as(
496 Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
497 CompatSessionAndSsoLoginLookupIden::IsSynapseAdmin,
498 )
499 .expr_as(
500 Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
501 CompatSessionAndSsoLoginLookupIden::UserAgent,
502 )
503 .expr_as(
504 Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
505 CompatSessionAndSsoLoginLookupIden::LastActiveAt,
506 )
507 .expr_as(
508 Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
509 CompatSessionAndSsoLoginLookupIden::LastActiveIp,
510 )
511 .expr_as(
512 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CompatSsoLoginId)),
513 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginId,
514 )
515 .expr_as(
516 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::LoginToken)),
517 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginToken,
518 )
519 .expr_as(
520 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::RedirectUri)),
521 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginRedirectUri,
522 )
523 .expr_as(
524 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::CreatedAt)),
525 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginCreatedAt,
526 )
527 .expr_as(
528 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::FulfilledAt)),
529 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginFulfilledAt,
530 )
531 .expr_as(
532 Expr::col((CompatSsoLogins::Table, CompatSsoLogins::ExchangedAt)),
533 CompatSessionAndSsoLoginLookupIden::CompatSsoLoginExchangedAt,
534 )
535 .from(CompatSessions::Table)
536 .left_join(
537 CompatSsoLogins::Table,
538 Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId))
539 .equals((CompatSsoLogins::Table, CompatSsoLogins::CompatSessionId)),
540 )
541 .apply_filter_with_joins(filter)
542 .generate_pagination(
543 (CompatSessions::Table, CompatSessions::CompatSessionId),
544 pagination,
545 )
546 .build_sqlx(PostgresQueryBuilder);
547
548 let edges: Vec<CompatSessionAndSsoLoginLookup> = sqlx::query_as_with(&sql, arguments)
549 .traced()
550 .fetch_all(&mut *self.conn)
551 .await?;
552
553 let page = pagination.process(edges).try_map(TryFrom::try_from)?;
554
555 Ok(page)
556 }
557
558 #[tracing::instrument(
559 name = "db.compat_session.count",
560 skip_all,
561 fields(
562 db.query.text,
563 ),
564 err,
565 )]
566 async fn count(&mut self, filter: CompatSessionFilter<'_>) -> Result<usize, Self::Error> {
567 let (sql, arguments) = sea_query::Query::select()
568 .expr(Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)).count())
569 .from(CompatSessions::Table)
570 .apply_filter(filter)
571 .build_sqlx(PostgresQueryBuilder);
572
573 let count: i64 = sqlx::query_scalar_with(&sql, arguments)
574 .traced()
575 .fetch_one(&mut *self.conn)
576 .await?;
577
578 count
579 .try_into()
580 .map_err(DatabaseError::to_invalid_operation)
581 }
582
583 #[tracing::instrument(
584 name = "db.compat_session.record_batch_activity",
585 skip_all,
586 fields(
587 db.query.text,
588 ),
589 err,
590 )]
591 async fn record_batch_activity(
592 &mut self,
593 mut activities: Vec<(Ulid, DateTime<Utc>, Option<IpAddr>)>,
594 ) -> Result<(), Self::Error> {
595 activities.sort_unstable();
598 let mut ids = Vec::with_capacity(activities.len());
599 let mut last_activities = Vec::with_capacity(activities.len());
600 let mut ips = Vec::with_capacity(activities.len());
601
602 for (id, last_activity, ip) in activities {
603 ids.push(Uuid::from(id));
604 last_activities.push(last_activity);
605 ips.push(ip);
606 }
607
608 let res = sqlx::query!(
609 r#"
610 UPDATE compat_sessions
611 SET last_active_at = GREATEST(t.last_active_at, compat_sessions.last_active_at)
612 , last_active_ip = COALESCE(t.last_active_ip, compat_sessions.last_active_ip)
613 FROM (
614 SELECT *
615 FROM UNNEST($1::uuid[], $2::timestamptz[], $3::inet[])
616 AS t(compat_session_id, last_active_at, last_active_ip)
617 ) AS t
618 WHERE compat_sessions.compat_session_id = t.compat_session_id
619 "#,
620 &ids,
621 &last_activities,
622 &ips as &[Option<IpAddr>],
623 )
624 .traced()
625 .execute(&mut *self.conn)
626 .await?;
627
628 DatabaseError::ensure_affected_rows(&res, ids.len().try_into().unwrap_or(u64::MAX))?;
629
630 Ok(())
631 }
632
633 #[tracing::instrument(
634 name = "db.compat_session.record_user_agent",
635 skip_all,
636 fields(
637 db.query.text,
638 %compat_session.id,
639 ),
640 err,
641 )]
642 async fn record_user_agent(
643 &mut self,
644 mut compat_session: CompatSession,
645 user_agent: String,
646 ) -> Result<CompatSession, Self::Error> {
647 let res = sqlx::query!(
648 r#"
649 UPDATE compat_sessions
650 SET user_agent = $2
651 WHERE compat_session_id = $1
652 "#,
653 Uuid::from(compat_session.id),
654 &*user_agent,
655 )
656 .traced()
657 .execute(&mut *self.conn)
658 .await?;
659
660 compat_session.user_agent = Some(user_agent);
661
662 DatabaseError::ensure_affected_rows(&res, 1)?;
663
664 Ok(compat_session)
665 }
666
667 #[tracing::instrument(
668 name = "repository.compat_session.set_human_name",
669 skip(self),
670 fields(
671 compat_session.id = %compat_session.id,
672 compat_session.human_name = ?human_name,
673 ),
674 err,
675 )]
676 async fn set_human_name(
677 &mut self,
678 mut compat_session: CompatSession,
679 human_name: Option<String>,
680 ) -> Result<CompatSession, Self::Error> {
681 let res = sqlx::query!(
682 r#"
683 UPDATE compat_sessions
684 SET human_name = $2
685 WHERE compat_session_id = $1
686 "#,
687 Uuid::from(compat_session.id),
688 human_name.as_deref(),
689 )
690 .traced()
691 .execute(&mut *self.conn)
692 .await?;
693
694 compat_session.human_name = human_name;
695
696 DatabaseError::ensure_affected_rows(&res, 1)?;
697
698 Ok(compat_session)
699 }
700
701 #[tracing::instrument(
702 name = "db.compat_session.cleanup_finished",
703 skip_all,
704 fields(
705 db.query.text,
706 ),
707 err,
708 )]
709 async fn cleanup_finished(
710 &mut self,
711 since: Option<DateTime<Utc>>,
712 until: DateTime<Utc>,
713 limit: usize,
714 ) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
715 let res = sqlx::query!(
716 r#"
717 WITH
718 to_delete AS (
719 SELECT compat_session_id, finished_at
720 FROM compat_sessions
721 WHERE finished_at IS NOT NULL
722 AND ($1::timestamptz IS NULL OR finished_at >= $1)
723 AND finished_at < $2
724 ORDER BY finished_at ASC
725 LIMIT $3
726 FOR UPDATE
727 ),
728
729 -- Delete refresh tokens first because they reference access tokens
730 deleted_refresh_tokens AS (
731 DELETE FROM compat_refresh_tokens
732 USING to_delete
733 WHERE compat_refresh_tokens.compat_session_id = to_delete.compat_session_id
734 ),
735
736 deleted_access_tokens AS (
737 DELETE FROM compat_access_tokens
738 USING to_delete
739 WHERE compat_access_tokens.compat_session_id = to_delete.compat_session_id
740 ),
741
742 deleted_sso_logins AS (
743 DELETE FROM compat_sso_logins
744 USING to_delete
745 WHERE compat_sso_logins.compat_session_id = to_delete.compat_session_id
746 ),
747
748 deleted_sessions AS (
749 DELETE FROM compat_sessions
750 USING to_delete
751 WHERE compat_sessions.compat_session_id = to_delete.compat_session_id
752 RETURNING compat_sessions.finished_at
753 )
754
755 SELECT
756 COUNT(*) as "count!",
757 MAX(finished_at) as last_finished_at
758 FROM deleted_sessions
759 "#,
760 since,
761 until,
762 i64::try_from(limit).unwrap_or(i64::MAX),
763 )
764 .traced()
765 .fetch_one(&mut *self.conn)
766 .await?;
767
768 Ok((
769 res.count.try_into().unwrap_or(usize::MAX),
770 res.last_finished_at,
771 ))
772 }
773
774 #[tracing::instrument(
775 name = "db.compat_session.cleanup_inactive_ips",
776 skip_all,
777 fields(
778 db.query.text,
779 since = since.map(tracing::field::display),
780 threshold = %threshold,
781 limit = limit,
782 ),
783 err,
784 )]
785 async fn cleanup_inactive_ips(
786 &mut self,
787 since: Option<DateTime<Utc>>,
788 threshold: DateTime<Utc>,
789 limit: usize,
790 ) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
791 let res = sqlx::query!(
792 r#"
793 WITH to_update AS (
794 SELECT compat_session_id, last_active_at
795 FROM compat_sessions
796 WHERE last_active_ip IS NOT NULL
797 AND last_active_at IS NOT NULL
798 AND ($1::timestamptz IS NULL OR last_active_at >= $1)
799 AND last_active_at < $2
800 ORDER BY last_active_at ASC
801 LIMIT $3
802 FOR UPDATE
803 ),
804 updated AS (
805 UPDATE compat_sessions
806 SET last_active_ip = NULL
807 FROM to_update
808 WHERE compat_sessions.compat_session_id = to_update.compat_session_id
809 RETURNING compat_sessions.last_active_at
810 )
811 SELECT COUNT(*) AS "count!", MAX(last_active_at) AS last_active_at FROM updated
812 "#,
813 since,
814 threshold,
815 i64::try_from(limit).unwrap_or(i64::MAX),
816 )
817 .traced()
818 .fetch_one(&mut *self.conn)
819 .await?;
820
821 Ok((
822 res.count.try_into().unwrap_or(usize::MAX),
823 res.last_active_at,
824 ))
825 }
826}