Skip to main content

mas_storage_pg/compat/
mod.rs

1// Copyright 2025, 2026 Element Creations Ltd.
2// Copyright 2024, 2025 New Vector Ltd.
3// Copyright 2022-2024 The Matrix.org Foundation C.I.C.
4//
5// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
6// Please see LICENSE files in the repository root for full details.
7
8//! A module containing PostgreSQL implementation of repositories for the
9//! compatibility layer
10
11mod access_token;
12mod refresh_token;
13mod session;
14mod sso_login;
15
16pub use self::{
17    access_token::PgCompatAccessTokenRepository, refresh_token::PgCompatRefreshTokenRepository,
18    session::PgCompatSessionRepository, sso_login::PgCompatSsoLoginRepository,
19};
20
21#[cfg(test)]
22mod tests {
23    use chrono::Duration;
24    use mas_data_model::{Clock, Device, clock::MockClock};
25    use mas_storage::{
26        Pagination, RepositoryAccess,
27        compat::{
28            CompatAccessTokenRepository, CompatRefreshTokenRepository, CompatSessionFilter,
29            CompatSessionRepository, CompatSsoLoginFilter,
30        },
31        user::UserRepository,
32    };
33    use rand::SeedableRng;
34    use rand_chacha::ChaChaRng;
35    use sqlx::PgPool;
36    use ulid::Ulid;
37
38    use crate::PgRepository;
39
40    #[sqlx::test(migrator = "crate::MIGRATOR")]
41    async fn test_session_repository(pool: PgPool) {
42        let mut rng = ChaChaRng::seed_from_u64(42);
43        let clock = MockClock::default();
44        let mut repo = PgRepository::from_pool(&pool).await.unwrap();
45
46        // Create a user
47        let user = repo
48            .user()
49            .add(&mut rng, &clock, "john".to_owned())
50            .await
51            .unwrap();
52
53        let all = CompatSessionFilter::new().for_user(&user);
54        let active = all.active_only();
55        let finished = all.finished_only();
56        let pagination = Pagination::first(10);
57
58        assert_eq!(repo.compat_session().count(all).await.unwrap(), 0);
59        assert_eq!(repo.compat_session().count(active).await.unwrap(), 0);
60        assert_eq!(repo.compat_session().count(finished).await.unwrap(), 0);
61
62        let full_list = repo.compat_session().list(all, pagination).await.unwrap();
63        assert!(full_list.edges.is_empty());
64        let active_list = repo
65            .compat_session()
66            .list(active, pagination)
67            .await
68            .unwrap();
69        assert!(active_list.edges.is_empty());
70        let finished_list = repo
71            .compat_session()
72            .list(finished, pagination)
73            .await
74            .unwrap();
75        assert!(finished_list.edges.is_empty());
76
77        // Start a compat session for that user
78        let device = Device::generate(&mut rng);
79        let device_str = device.as_str().to_owned();
80        let session = repo
81            .compat_session()
82            .add(&mut rng, &clock, &user, device.clone(), None, false, None)
83            .await
84            .unwrap();
85        assert_eq!(session.user_id, user.id);
86        assert_eq!(session.device.as_ref().unwrap().as_str(), device_str);
87        assert!(session.is_valid());
88        assert!(!session.is_finished());
89
90        assert_eq!(repo.compat_session().count(all).await.unwrap(), 1);
91        assert_eq!(repo.compat_session().count(active).await.unwrap(), 1);
92        assert_eq!(repo.compat_session().count(finished).await.unwrap(), 0);
93
94        let full_list = repo.compat_session().list(all, pagination).await.unwrap();
95        assert_eq!(full_list.edges.len(), 1);
96        assert_eq!(full_list.edges[0].node.0.id, session.id);
97        let active_list = repo
98            .compat_session()
99            .list(active, pagination)
100            .await
101            .unwrap();
102        assert_eq!(active_list.edges.len(), 1);
103        assert_eq!(active_list.edges[0].node.0.id, session.id);
104        let finished_list = repo
105            .compat_session()
106            .list(finished, pagination)
107            .await
108            .unwrap();
109        assert!(finished_list.edges.is_empty());
110
111        // Lookup the session and check it didn't change
112        let session_lookup = repo
113            .compat_session()
114            .lookup(session.id)
115            .await
116            .unwrap()
117            .expect("compat session not found");
118        assert_eq!(session_lookup.id, session.id);
119        assert_eq!(session_lookup.user_id, user.id);
120        assert_eq!(session.device.as_ref().unwrap().as_str(), device_str);
121        assert!(session_lookup.is_valid());
122        assert!(!session_lookup.is_finished());
123
124        // Record a user-agent for the session
125        assert!(session_lookup.user_agent.is_none());
126        let session = repo
127            .compat_session()
128            .record_user_agent(session_lookup, "Mozilla/5.0".to_owned())
129            .await
130            .unwrap();
131        assert_eq!(session.user_agent.as_deref(), Some("Mozilla/5.0"));
132
133        // Reload the session and check again
134        let session_lookup = repo
135            .compat_session()
136            .lookup(session.id)
137            .await
138            .unwrap()
139            .expect("compat session not found");
140        assert_eq!(session_lookup.user_agent.as_deref(), Some("Mozilla/5.0"));
141
142        // Look up the session by device
143        let list = repo
144            .compat_session()
145            .list(
146                CompatSessionFilter::new()
147                    .for_user(&user)
148                    .for_device(&device),
149                pagination,
150            )
151            .await
152            .unwrap();
153        assert_eq!(list.edges.len(), 1);
154        let session_lookup = &list.edges[0].node.0;
155        assert_eq!(session_lookup.id, session.id);
156        assert_eq!(session_lookup.user_id, user.id);
157        assert_eq!(session.device.as_ref().unwrap().as_str(), device_str);
158        assert!(session_lookup.is_valid());
159        assert!(!session_lookup.is_finished());
160
161        // Finish the session
162        let session = repo.compat_session().finish(&clock, session).await.unwrap();
163        assert!(!session.is_valid());
164        assert!(session.is_finished());
165
166        assert_eq!(repo.compat_session().count(all).await.unwrap(), 1);
167        assert_eq!(repo.compat_session().count(active).await.unwrap(), 0);
168        assert_eq!(repo.compat_session().count(finished).await.unwrap(), 1);
169
170        let full_list = repo.compat_session().list(all, pagination).await.unwrap();
171        assert_eq!(full_list.edges.len(), 1);
172        assert_eq!(full_list.edges[0].node.0.id, session.id);
173        let active_list = repo
174            .compat_session()
175            .list(active, pagination)
176            .await
177            .unwrap();
178        assert!(active_list.edges.is_empty());
179        let finished_list = repo
180            .compat_session()
181            .list(finished, pagination)
182            .await
183            .unwrap();
184        assert_eq!(finished_list.edges.len(), 1);
185        assert_eq!(finished_list.edges[0].node.0.id, session.id);
186
187        // Reload the session and check again
188        let session_lookup = repo
189            .compat_session()
190            .lookup(session.id)
191            .await
192            .unwrap()
193            .expect("compat session not found");
194        assert!(!session_lookup.is_valid());
195        assert!(session_lookup.is_finished());
196
197        // Now add another session, with an SSO login this time
198        let unknown_session = session;
199        // Start a new SSO login
200        let login = repo
201            .compat_sso_login()
202            .add(
203                &mut rng,
204                &clock,
205                "login-token".to_owned(),
206                "https://example.com/callback".parse().unwrap(),
207            )
208            .await
209            .unwrap();
210        assert!(login.is_pending());
211
212        // Start a browser session for the user
213        let browser_session = repo
214            .browser_session()
215            .add(&mut rng, &clock, &user, None)
216            .await
217            .unwrap();
218
219        // Start a compat session for that user
220        let device = Device::generate(&mut rng);
221        let sso_login_session = repo
222            .compat_session()
223            .add(
224                &mut rng,
225                &clock,
226                &user,
227                device,
228                Some(&browser_session),
229                false,
230                None,
231            )
232            .await
233            .unwrap();
234
235        // Associate the login with the session
236        let login = repo
237            .compat_sso_login()
238            .fulfill(&clock, login, &browser_session)
239            .await
240            .unwrap();
241        assert!(login.is_fulfilled());
242        let login = repo
243            .compat_sso_login()
244            .exchange(&clock, login, &sso_login_session)
245            .await
246            .unwrap();
247        assert!(login.is_exchanged());
248
249        // Now query the session list with both the unknown and SSO login session type
250        // filter
251        let all = CompatSessionFilter::new().for_user(&user);
252        let sso_login = all.sso_login_only();
253        let unknown = all.unknown_only();
254        assert_eq!(repo.compat_session().count(all).await.unwrap(), 2);
255        assert_eq!(repo.compat_session().count(sso_login).await.unwrap(), 1);
256        assert_eq!(repo.compat_session().count(unknown).await.unwrap(), 1);
257
258        let list = repo
259            .compat_session()
260            .list(sso_login, pagination)
261            .await
262            .unwrap();
263        assert_eq!(list.edges.len(), 1);
264        assert_eq!(list.edges[0].node.0.id, sso_login_session.id);
265        let list = repo
266            .compat_session()
267            .list(unknown, pagination)
268            .await
269            .unwrap();
270        assert_eq!(list.edges.len(), 1);
271        assert_eq!(list.edges[0].node.0.id, unknown_session.id);
272
273        // Check that combining the two filters works
274        // At this point, there is one active SSO login session and one finished unknown
275        // session
276        assert_eq!(
277            repo.compat_session()
278                .count(all.sso_login_only().active_only())
279                .await
280                .unwrap(),
281            1
282        );
283        assert_eq!(
284            repo.compat_session()
285                .count(all.sso_login_only().finished_only())
286                .await
287                .unwrap(),
288            0
289        );
290        assert_eq!(
291            repo.compat_session()
292                .count(all.unknown_only().active_only())
293                .await
294                .unwrap(),
295            0
296        );
297        assert_eq!(
298            repo.compat_session()
299                .count(all.unknown_only().finished_only())
300                .await
301                .unwrap(),
302            1
303        );
304
305        // Check that we can batch finish sessions
306        let affected = repo
307            .compat_session()
308            .finish_bulk(&clock, all.sso_login_only().active_only())
309            .await
310            .unwrap();
311        assert_eq!(affected, 1);
312        assert_eq!(repo.compat_session().count(finished).await.unwrap(), 2);
313        assert_eq!(repo.compat_session().count(active).await.unwrap(), 0);
314    }
315
316    /// Test the created-at filters on [`CompatSessionFilter`].
317    #[sqlx::test(migrator = "crate::MIGRATOR")]
318    async fn test_list_compat_sessions_by_created_at(pool: PgPool) {
319        let mut rng = ChaChaRng::seed_from_u64(42);
320        let clock = MockClock::default();
321        let mut repo = PgRepository::from_pool(&pool).await.unwrap();
322
323        let user = repo
324            .user()
325            .add(&mut rng, &clock, "alice".to_owned())
326            .await
327            .unwrap();
328
329        // Three sessions created one minute apart, with a cutoff captured
330        // between the second and the third.
331        let device = Device::generate(&mut rng);
332        let session1 = repo
333            .compat_session()
334            .add(&mut rng, &clock, &user, device, None, false, None)
335            .await
336            .unwrap();
337        clock.advance(Duration::try_minutes(1).unwrap());
338
339        let device = Device::generate(&mut rng);
340        let session2 = repo
341            .compat_session()
342            .add(&mut rng, &clock, &user, device, None, false, None)
343            .await
344            .unwrap();
345        clock.advance(Duration::try_minutes(1).unwrap());
346
347        let cutoff = clock.now();
348
349        clock.advance(Duration::try_minutes(1).unwrap());
350        let device = Device::generate(&mut rng);
351        let session3 = repo
352            .compat_session()
353            .add(&mut rng, &clock, &user, device, None, false, None)
354            .await
355            .unwrap();
356
357        let pagination = Pagination::first(10);
358
359        // Sessions created before the cutoff
360        let filter = CompatSessionFilter::new().with_created_before(cutoff);
361        let list = repo
362            .compat_session()
363            .list(filter, pagination)
364            .await
365            .unwrap();
366        assert_eq!(list.edges.len(), 2);
367        assert_eq!(list.edges[0].node.0, session1);
368        assert_eq!(list.edges[1].node.0, session2);
369        assert_eq!(repo.compat_session().count(filter).await.unwrap(), 2);
370
371        // Sessions created after the cutoff
372        let filter = CompatSessionFilter::new().with_created_after(cutoff);
373        let list = repo
374            .compat_session()
375            .list(filter, pagination)
376            .await
377            .unwrap();
378        assert_eq!(list.edges.len(), 1);
379        assert_eq!(list.edges[0].node.0, session3);
380        assert_eq!(repo.compat_session().count(filter).await.unwrap(), 1);
381    }
382
383    #[sqlx::test(migrator = "crate::MIGRATOR")]
384    async fn test_access_token_repository(pool: PgPool) {
385        const FIRST_TOKEN: &str = "first_access_token";
386        const SECOND_TOKEN: &str = "second_access_token";
387        let mut rng = ChaChaRng::seed_from_u64(42);
388        let clock = MockClock::default();
389        let mut repo = PgRepository::from_pool(&pool).await.unwrap().boxed();
390
391        // Create a user
392        let user = repo
393            .user()
394            .add(&mut rng, &clock, "john".to_owned())
395            .await
396            .unwrap();
397
398        // Start a compat session for that user
399        let device = Device::generate(&mut rng);
400        let session = repo
401            .compat_session()
402            .add(&mut rng, &clock, &user, device, None, false, None)
403            .await
404            .unwrap();
405
406        // Add an access token to that session
407        let token = repo
408            .compat_access_token()
409            .add(
410                &mut rng,
411                &clock,
412                &session,
413                FIRST_TOKEN.to_owned(),
414                Some(Duration::try_minutes(1).unwrap()),
415            )
416            .await
417            .unwrap();
418        assert_eq!(token.session_id, session.id);
419        assert_eq!(token.token, FIRST_TOKEN);
420
421        // Commit the txn and grab a new transaction, to test a conflict
422        repo.save().await.unwrap();
423
424        {
425            let mut repo = PgRepository::from_pool(&pool).await.unwrap().boxed();
426            // Adding the same token a second time should conflict
427            assert!(
428                repo.compat_access_token()
429                    .add(
430                        &mut rng,
431                        &clock,
432                        &session,
433                        FIRST_TOKEN.to_owned(),
434                        Some(Duration::try_minutes(1).unwrap()),
435                    )
436                    .await
437                    .is_err()
438            );
439            repo.cancel().await.unwrap();
440        }
441
442        // Grab a new repo
443        let mut repo = PgRepository::from_pool(&pool).await.unwrap().boxed();
444
445        // Looking up via ID works
446        let token_lookup = repo
447            .compat_access_token()
448            .lookup(token.id)
449            .await
450            .unwrap()
451            .expect("compat access token not found");
452        assert_eq!(token.id, token_lookup.id);
453        assert_eq!(token_lookup.session_id, session.id);
454
455        // Looking up via the token value works
456        let token_lookup = repo
457            .compat_access_token()
458            .find_by_token(FIRST_TOKEN)
459            .await
460            .unwrap()
461            .expect("compat access token not found");
462        assert_eq!(token.id, token_lookup.id);
463        assert_eq!(token_lookup.session_id, session.id);
464
465        // Token is currently valid
466        assert!(token.is_valid(clock.now()));
467
468        clock.advance(Duration::try_minutes(1).unwrap());
469        // Token should have expired
470        assert!(!token.is_valid(clock.now()));
471
472        // Add a second access token, this time without expiration
473        let token = repo
474            .compat_access_token()
475            .add(&mut rng, &clock, &session, SECOND_TOKEN.to_owned(), None)
476            .await
477            .unwrap();
478        assert_eq!(token.session_id, session.id);
479        assert_eq!(token.token, SECOND_TOKEN);
480
481        // Token is currently valid
482        assert!(token.is_valid(clock.now()));
483
484        // Make it expire
485        repo.compat_access_token()
486            .expire(&clock, token)
487            .await
488            .unwrap();
489
490        // Reload it
491        let token = repo
492            .compat_access_token()
493            .find_by_token(SECOND_TOKEN)
494            .await
495            .unwrap()
496            .expect("compat access token not found");
497
498        // Token is not valid anymore
499        assert!(!token.is_valid(clock.now()));
500
501        repo.save().await.unwrap();
502    }
503
504    #[sqlx::test(migrator = "crate::MIGRATOR")]
505    async fn test_refresh_token_repository(pool: PgPool) {
506        const ACCESS_TOKEN: &str = "access_token";
507        const REFRESH_TOKEN: &str = "refresh_token";
508        const REFRESH_TOKEN2: &str = "refresh_token2";
509        let mut rng = ChaChaRng::seed_from_u64(42);
510        let clock = MockClock::default();
511        let mut repo = PgRepository::from_pool(&pool).await.unwrap().boxed();
512
513        // Create a user
514        let user = repo
515            .user()
516            .add(&mut rng, &clock, "john".to_owned())
517            .await
518            .unwrap();
519
520        // Start a compat session for that user
521        let device = Device::generate(&mut rng);
522        let session = repo
523            .compat_session()
524            .add(&mut rng, &clock, &user, device, None, false, None)
525            .await
526            .unwrap();
527
528        // Add an access token to that session
529        let access_token = repo
530            .compat_access_token()
531            .add(&mut rng, &clock, &session, ACCESS_TOKEN.to_owned(), None)
532            .await
533            .unwrap();
534
535        let refresh_token = repo
536            .compat_refresh_token()
537            .add(
538                &mut rng,
539                &clock,
540                &session,
541                &access_token,
542                REFRESH_TOKEN.to_owned(),
543            )
544            .await
545            .unwrap();
546        assert_eq!(refresh_token.session_id, session.id);
547        assert_eq!(refresh_token.access_token_id, access_token.id);
548        assert_eq!(refresh_token.token, REFRESH_TOKEN);
549        assert!(refresh_token.is_valid());
550        assert!(!refresh_token.is_consumed());
551
552        // Look it up by ID and check everything matches
553        let refresh_token_lookup = repo
554            .compat_refresh_token()
555            .lookup(refresh_token.id)
556            .await
557            .unwrap()
558            .expect("refresh token not found");
559        assert_eq!(refresh_token_lookup.id, refresh_token.id);
560        assert_eq!(refresh_token_lookup.session_id, session.id);
561        assert_eq!(refresh_token_lookup.access_token_id, access_token.id);
562        assert_eq!(refresh_token_lookup.token, REFRESH_TOKEN);
563        assert!(refresh_token_lookup.is_valid());
564        assert!(!refresh_token_lookup.is_consumed());
565
566        // Look it up by token and check everything matches
567        let refresh_token_lookup = repo
568            .compat_refresh_token()
569            .find_by_token(REFRESH_TOKEN)
570            .await
571            .unwrap()
572            .expect("refresh token not found");
573        assert_eq!(refresh_token_lookup.id, refresh_token.id);
574        assert_eq!(refresh_token_lookup.session_id, session.id);
575        assert_eq!(refresh_token_lookup.access_token_id, access_token.id);
576        assert_eq!(refresh_token_lookup.token, REFRESH_TOKEN);
577        assert!(refresh_token_lookup.is_valid());
578        assert!(!refresh_token_lookup.is_consumed());
579
580        // Consume the first token, but to do so we need a 2nd to replace it with
581        let refresh_token2 = repo
582            .compat_refresh_token()
583            .add(
584                &mut rng,
585                &clock,
586                &session,
587                &access_token,
588                REFRESH_TOKEN2.to_owned(),
589            )
590            .await
591            .unwrap();
592
593        let refresh_token = repo
594            .compat_refresh_token()
595            .consume_and_replace(&clock, refresh_token, &refresh_token2)
596            .await
597            .unwrap();
598        assert!(!refresh_token.is_valid());
599        assert!(refresh_token.is_consumed());
600
601        // Reload the first token and check again
602        let refresh_token_lookup = repo
603            .compat_refresh_token()
604            .find_by_token(REFRESH_TOKEN)
605            .await
606            .unwrap()
607            .expect("refresh token not found");
608        assert!(!refresh_token_lookup.is_valid());
609        assert!(refresh_token_lookup.is_consumed());
610
611        // Consuming it again should not work
612        assert!(
613            repo.compat_refresh_token()
614                .consume_and_replace(&clock, refresh_token, &refresh_token2)
615                .await
616                .is_err()
617        );
618
619        repo.save().await.unwrap();
620    }
621
622    #[sqlx::test(migrator = "crate::MIGRATOR")]
623    async fn test_compat_sso_login_repository(pool: PgPool) {
624        let mut rng = ChaChaRng::seed_from_u64(42);
625        let clock = MockClock::default();
626        let mut repo = PgRepository::from_pool(&pool).await.unwrap().boxed();
627
628        // Create a user
629        let user = repo
630            .user()
631            .add(&mut rng, &clock, "john".to_owned())
632            .await
633            .unwrap();
634
635        // Lookup an unknown SSO login
636        let login = repo.compat_sso_login().lookup(Ulid::nil()).await.unwrap();
637        assert_eq!(login, None);
638
639        let all = CompatSsoLoginFilter::new();
640        let for_user = all.for_user(&user);
641        let pending = all.pending_only();
642        let fulfilled = all.fulfilled_only();
643        let exchanged = all.exchanged_only();
644
645        // Check the initial counts
646        assert_eq!(repo.compat_sso_login().count(all).await.unwrap(), 0);
647        assert_eq!(repo.compat_sso_login().count(for_user).await.unwrap(), 0);
648        assert_eq!(repo.compat_sso_login().count(pending).await.unwrap(), 0);
649        assert_eq!(repo.compat_sso_login().count(fulfilled).await.unwrap(), 0);
650        assert_eq!(repo.compat_sso_login().count(exchanged).await.unwrap(), 0);
651
652        // Lookup an unknown login token
653        let login = repo
654            .compat_sso_login()
655            .find_by_token("login-token")
656            .await
657            .unwrap();
658        assert_eq!(login, None);
659
660        // Start a new SSO login
661        let login = repo
662            .compat_sso_login()
663            .add(
664                &mut rng,
665                &clock,
666                "login-token".to_owned(),
667                "https://example.com/callback".parse().unwrap(),
668            )
669            .await
670            .unwrap();
671        assert!(login.is_pending());
672
673        // Check the counts
674        assert_eq!(repo.compat_sso_login().count(all).await.unwrap(), 1);
675        assert_eq!(repo.compat_sso_login().count(for_user).await.unwrap(), 0);
676        assert_eq!(repo.compat_sso_login().count(pending).await.unwrap(), 1);
677        assert_eq!(repo.compat_sso_login().count(fulfilled).await.unwrap(), 0);
678        assert_eq!(repo.compat_sso_login().count(exchanged).await.unwrap(), 0);
679
680        // Lookup the login by ID
681        let login_lookup = repo
682            .compat_sso_login()
683            .lookup(login.id)
684            .await
685            .unwrap()
686            .expect("login not found");
687        assert_eq!(login_lookup, login);
688
689        // Find the login by token
690        let login_lookup = repo
691            .compat_sso_login()
692            .find_by_token("login-token")
693            .await
694            .unwrap()
695            .expect("login not found");
696        assert_eq!(login_lookup, login);
697
698        // Start a compat session for that user
699        let device = Device::generate(&mut rng);
700        let compat_session = repo
701            .compat_session()
702            .add(&mut rng, &clock, &user, device, None, false, None)
703            .await
704            .unwrap();
705
706        // Exchanging before fulfilling should not work
707        // Note: It should also not poison the SQL transaction
708        let res = repo
709            .compat_sso_login()
710            .exchange(&clock, login.clone(), &compat_session)
711            .await;
712        assert!(res.is_err());
713
714        // Start a browser session for that user
715        let browser_session = repo
716            .browser_session()
717            .add(&mut rng, &clock, &user, None)
718            .await
719            .unwrap();
720
721        // Associate the login with the session
722        let login = repo
723            .compat_sso_login()
724            .fulfill(&clock, login, &browser_session)
725            .await
726            .unwrap();
727        assert!(login.is_fulfilled());
728
729        // Check the counts
730        assert_eq!(repo.compat_sso_login().count(all).await.unwrap(), 1);
731        assert_eq!(repo.compat_sso_login().count(for_user).await.unwrap(), 1);
732        assert_eq!(repo.compat_sso_login().count(pending).await.unwrap(), 0);
733        assert_eq!(repo.compat_sso_login().count(fulfilled).await.unwrap(), 1);
734        assert_eq!(repo.compat_sso_login().count(exchanged).await.unwrap(), 0);
735
736        // Fulfilling again should not work
737        // Note: It should also not poison the SQL transaction
738        let res = repo
739            .compat_sso_login()
740            .fulfill(&clock, login.clone(), &browser_session)
741            .await;
742        assert!(res.is_err());
743
744        // Exchange that login
745        let login = repo
746            .compat_sso_login()
747            .exchange(&clock, login, &compat_session)
748            .await
749            .unwrap();
750        assert!(login.is_exchanged());
751
752        // Check the counts
753        assert_eq!(repo.compat_sso_login().count(all).await.unwrap(), 1);
754        assert_eq!(repo.compat_sso_login().count(for_user).await.unwrap(), 1);
755        assert_eq!(repo.compat_sso_login().count(pending).await.unwrap(), 0);
756        assert_eq!(repo.compat_sso_login().count(fulfilled).await.unwrap(), 0);
757        assert_eq!(repo.compat_sso_login().count(exchanged).await.unwrap(), 1);
758
759        // Exchange again should not work
760        // Note: It should also not poison the SQL transaction
761        let res = repo
762            .compat_sso_login()
763            .exchange(&clock, login.clone(), &compat_session)
764            .await;
765        assert!(res.is_err());
766
767        // Fulfilling after exchanging should not work
768        // Note: It should also not poison the SQL transaction
769        let res = repo
770            .compat_sso_login()
771            .fulfill(&clock, login.clone(), &browser_session)
772            .await;
773        assert!(res.is_err());
774
775        let pagination = Pagination::first(10);
776
777        // List all logins
778        let logins = repo.compat_sso_login().list(all, pagination).await.unwrap();
779        assert!(!logins.has_next_page);
780        assert_eq!(logins.edges.len(), 1);
781        assert_eq!(logins.edges[0].node, login);
782
783        // List the logins for the user
784        let logins = repo
785            .compat_sso_login()
786            .list(for_user, pagination)
787            .await
788            .unwrap();
789        assert!(!logins.has_next_page);
790        assert_eq!(logins.edges.len(), 1);
791        assert_eq!(logins.edges[0].node, login);
792
793        // List only the pending logins for the user
794        let logins = repo
795            .compat_sso_login()
796            .list(for_user.pending_only(), pagination)
797            .await
798            .unwrap();
799        assert!(!logins.has_next_page);
800        assert!(logins.edges.is_empty());
801
802        // List only the fulfilled logins for the user
803        let logins = repo
804            .compat_sso_login()
805            .list(for_user.fulfilled_only(), pagination)
806            .await
807            .unwrap();
808        assert!(!logins.has_next_page);
809        assert!(logins.edges.is_empty());
810
811        // List only the exchanged logins for the user
812        let logins = repo
813            .compat_sso_login()
814            .list(for_user.exchanged_only(), pagination)
815            .await
816            .unwrap();
817        assert!(!logins.has_next_page);
818        assert_eq!(logins.edges.len(), 1);
819        assert_eq!(logins.edges[0].node, login);
820    }
821}