A better Rust ATProto crate
1//!
2//! Helpers for the local loopback server method of atproto OAuth.
3//!
4//! `OAuthClient::login_with_local_server()` is the nice helper. Here is where
5//! it and its components live. Below is what it does, so you can have more
6//! granular control without having to make your own loopback server.
7//!
8//! ```ignore
9//! let input = "your_handle_here";
10//! let cfg = LoopbackConfig::default();
11//! let opts = AuthorizeOptions::default();
12//! let port = match cfg.port {
13//! LoopbackPort::Fixed(p) => p,
14//! LoopbackPort::Ephemeral => 0,
15//! };
16//! // TODO: fix this to it also accepts ipv6 and properly finds a free port
17//! let bind_addr: SocketAddr = format!("0.0.0.0:{}", port)
18//! .parse()
19//! .expect("invalid loopback host/port");
20//! let oauth = OAuthClient::with_default_config(FileAuthStore::new(&args.store));
21//!
22//! let (local_addr, handle) = one_shot_server(bind_addr);
23//! println!("Listening on {}", local_addr);
24//!
25//! let client_data = oauth.build_localhost_client_data(&cfg, &opts, local_addr);
26//! // Build client using store and resolver
27//! let flow_client = OAuthClient::new_with_shared(
28//! self.registry.store.clone(),
29//! self.client.clone(),
30//! client_data,
31//! );
32//!
33//! // Start auth and get authorization URL
34//! let auth_url = flow_client.start_auth(input.as_ref(), opts).await?;
35//! // Print URL for copy/paste
36//! println!("To authenticate with your PDS, visit:\n{}\n", auth_url);
37//! // Optionally open browser
38//! if cfg.open_browser {
39//! let _ = try_open_in_browser(&auth_url);
40//! }
41//!
42//! handle_localhost_callback(handle, &flow_client, &cfg).await
43//! ```
44//!
45//!
46#![cfg(all(
47 feature = "loopback",
48 not(all(target_arch = "wasm32", target_os = "unknown"))
49))]
50use crate::{
51 atproto::AtprotoClientMetadata,
52 authstore::{ClientAuthStore, OAuthSessionMatch, OAuthSessionSelector},
53 client::{OAuthClient, OAuthSession},
54 dpop::DpopExt,
55 error::{CallbackError, OAuthError},
56 resolver::OAuthResolver,
57 types::{AuthorizeOptions, CallbackParams},
58};
59use jacquard_common::deps::fluent_uri::Uri;
60use jacquard_common::session::{SessionHint, SessionSelector, SessionStoreError};
61use jacquard_common::{IntoStatic, bos::BosStr};
62use smol_str::SmolStr;
63use std::net::SocketAddr;
64use tokio::{
65 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
66 net::{TcpListener, TcpStream, ToSocketAddrs},
67 sync::{mpsc, oneshot},
68};
69
70/// Port selection strategy for the loopback OAuth callback server.
71#[derive(Clone, Debug)]
72pub enum LoopbackPort {
73 /// Bind to a specific port number.
74 Fixed(u16),
75 /// Let the OS assign an available port.
76 Ephemeral,
77}
78
79/// Configuration for the loopback OAuth callback server.
80#[derive(Clone, Debug)]
81pub struct LoopbackConfig {
82 /// The host address to bind to (e.g., `"127.0.0.1"`).
83 pub host: String,
84 /// Port selection strategy.
85 pub port: LoopbackPort,
86 /// Whether to attempt opening the authorization URL in the user's browser.
87 pub open_browser: bool,
88 /// How long to wait for the callback before timing out, in milliseconds.
89 pub timeout_ms: u64,
90}
91
92impl Default for LoopbackConfig {
93 fn default() -> Self {
94 Self {
95 host: "127.0.0.1".into(),
96 port: LoopbackPort::Fixed(4000),
97 open_browser: true,
98 timeout_ms: 5 * 60 * 1000,
99 }
100 }
101}
102
103/// Attempts to open the given URL in the user's default browser.
104///
105/// Returns `true` if the browser was opened successfully, `false` otherwise.
106#[cfg(feature = "browser-open")]
107pub fn try_open_in_browser(url: &str) -> bool {
108 webbrowser::open(url).is_ok()
109}
110/// Stub for when the `browser-open` feature is disabled. Always returns `false`.
111#[cfg(not(feature = "browser-open"))]
112pub fn try_open_in_browser(_url: &str) -> bool {
113 false
114}
115
116async fn handle_callback_connection(mut stream: TcpStream, tx: mpsc::Sender<CallbackRequest>) {
117 let Some(Some(params)) = read_callback_params(&mut stream).await else {
118 let _ = write_http_response(&mut stream, 404, "Not found").await;
119 return;
120 };
121
122 let (response_tx, response_rx) = oneshot::channel();
123 match tx.try_send(CallbackRequest {
124 params,
125 response_tx,
126 }) {
127 Ok(()) => match response_rx.await {
128 Ok(response) => {
129 let _ = write_http_response(&mut stream, response.status, response.body).await;
130 }
131 Err(_) => {
132 let _ = write_http_response(&mut stream, 500, OAUTH_CALLBACK_FAILURE_BODY).await;
133 }
134 },
135 Err(_) => {
136 let _ = write_http_response(&mut stream, 500, "Could not deliver OAuth callback").await;
137 }
138 }
139}
140
141struct CallbackRequest {
142 params: CallbackParams,
143 response_tx: oneshot::Sender<CallbackResponse>,
144}
145
146struct CallbackResponse {
147 status: u16,
148 body: &'static str,
149}
150
151const OAUTH_CALLBACK_SUCCESS_BODY: &str = r#"<!doctype html>
152<html lang="en">
153<head>
154 <meta charset="utf-8">
155 <title>Jacquard OAuth login complete</title>
156</head>
157<body>
158 <h1>Jacquard OAuth login complete.</h1>
159 <p>You can close this tab and return to the application.</p>
160</body>
161</html>
162"#;
163
164const OAUTH_CALLBACK_FAILURE_BODY: &str = r#"<!doctype html>
165<html lang="en">
166<head>
167 <meta charset="utf-8">
168 <title>Jacquard OAuth login failed</title>
169</head>
170<body>
171 <h1>Jacquard OAuth login failed.</h1>
172 <p>Return to the application for details.</p>
173</body>
174</html>
175"#;
176
177async fn read_callback_params(stream: &mut TcpStream) -> Option<Option<CallbackParams>> {
178 let mut reader = BufReader::new(stream);
179 let mut request_line = String::new();
180 reader.read_line(&mut request_line).await.ok()?;
181 let mut parts = request_line.split_whitespace();
182 let method = parts.next()?;
183 let target = parts.next()?;
184 if method != "GET" {
185 return Some(None);
186 }
187 let (path, query) = target.split_once('?').unwrap_or((target, ""));
188 if path != "/oauth/callback" {
189 return Some(None);
190 }
191 serde_html_form::from_str(query).ok().map(Some)
192}
193
194async fn write_http_response(
195 stream: &mut TcpStream,
196 status: u16,
197 body: &str,
198) -> std::io::Result<()> {
199 let reason = match status {
200 200 => "OK",
201 404 => "Not Found",
202 500 => "Internal Server Error",
203 _ => "OK",
204 };
205 let content_type = if body.trim_start().starts_with("<!doctype html>") {
206 "text/html; charset=utf-8"
207 } else {
208 "text/plain; charset=utf-8"
209 };
210 let response = format!(
211 "HTTP/1.1 {status} {reason}\r\ncontent-type: {content_type}\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
212 body.len(),
213 body
214 );
215 stream.write_all(response.as_bytes()).await
216}
217
218/// Handle to a running loopback callback server, used to await the OAuth redirect.
219pub struct CallbackHandle {
220 server_handle: tokio::task::JoinHandle<()>,
221 server_stop: oneshot::Sender<()>,
222 callback_rx: mpsc::Receiver<CallbackRequest>,
223}
224
225/// One-shot OAuth callback server.
226///
227/// Starts an ephemeral in-process web server that listens for the OAuth
228/// callback redirect. Returns the server address and a [`CallbackHandle`]
229/// that can be used to wait for the callback and stop the server.
230///
231/// Use in combination with [`handle_localhost_callback`] to handle the
232/// callback for the localhost loopback server.
233pub async fn one_shot_server(
234 addr: impl ToSocketAddrs,
235) -> std::io::Result<(SocketAddr, CallbackHandle)> {
236 let (tx, callback_rx) = mpsc::channel(5);
237 let listener = TcpListener::bind(addr).await?;
238 let local_addr = listener.local_addr()?;
239 let (server_stop, mut stop_rx) = oneshot::channel();
240 let server_handle = tokio::spawn(async move {
241 loop {
242 tokio::select! {
243 _ = &mut stop_rx => break,
244 accepted = listener.accept() => {
245 match accepted {
246 Ok((stream, _)) => {
247 tokio::spawn(handle_callback_connection(stream, tx.clone()));
248 }
249 Err(_) => break,
250 }
251 }
252 }
253 }
254 });
255 let handle = CallbackHandle {
256 server_handle,
257 server_stop,
258 callback_rx,
259 };
260 Ok((local_addr, handle))
261}
262
263async fn wait_for_callback(
264 handle: CallbackHandle,
265 timeout_ms: u64,
266) -> Result<CallbackRequest, OAuthError> {
267 let CallbackHandle {
268 server_handle,
269 server_stop,
270 mut callback_rx,
271 } = handle;
272 let cb = tokio::time::timeout(
273 std::time::Duration::from_millis(timeout_ms),
274 callback_rx.recv(),
275 )
276 .await;
277 let _ = server_stop.send(());
278 let _ = server_handle.await;
279 if let Ok(Some(cb)) = cb {
280 Ok(cb)
281 } else {
282 Err(OAuthError::Callback(CallbackError::Timeout))
283 }
284}
285
286#[cfg(not(feature = "scope-check"))]
287async fn complete_callback<T, S>(
288 flow_client: &super::client::OAuthClient<T, S>,
289 request: CallbackRequest,
290) -> crate::error::Result<super::client::OAuthSession<T, S>>
291where
292 T: OAuthResolver + DpopExt + Send + Sync + 'static,
293 S: ClientAuthStore + Send + Sync + 'static,
294{
295 match flow_client.callback(request.params).await {
296 Ok(session) => {
297 let _ = request.response_tx.send(CallbackResponse {
298 status: 200,
299 body: OAUTH_CALLBACK_SUCCESS_BODY,
300 });
301 Ok(session)
302 }
303 Err(err) => {
304 let _ = request.response_tx.send(CallbackResponse {
305 status: 500,
306 body: OAUTH_CALLBACK_FAILURE_BODY,
307 });
308 Err(err)
309 }
310 }
311}
312
313#[cfg(feature = "scope-check")]
314async fn complete_callback<T, S>(
315 flow_client: &super::client::OAuthClient<T, S>,
316 request: CallbackRequest,
317) -> crate::error::Result<super::client::OAuthSession<T, S>>
318where
319 T: OAuthResolver
320 + DpopExt
321 + jacquard_identity::lexicon_resolver::LexiconSchemaResolver
322 + Send
323 + Sync
324 + 'static,
325 S: ClientAuthStore + Send + Sync + 'static,
326{
327 match flow_client.callback(request.params).await {
328 Ok(session) => {
329 let _ = request.response_tx.send(CallbackResponse {
330 status: 200,
331 body: OAUTH_CALLBACK_SUCCESS_BODY,
332 });
333 Ok(session)
334 }
335 Err(err) => {
336 let _ = request.response_tx.send(CallbackResponse {
337 status: 500,
338 body: OAUTH_CALLBACK_FAILURE_BODY,
339 });
340 Err(err)
341 }
342 }
343}
344
345/// Handles the OAuth callback for the localhost loopback server.
346///
347/// Returns a session if the callback succeeds within the configured timeout
348/// and shuts down the server.
349///
350/// When the `scope-check` feature is enabled, `T` must also implement `LexiconSchemaResolver`
351/// for eager resolution of include scopes.
352#[cfg(not(feature = "scope-check"))]
353pub async fn handle_localhost_callback<T, S>(
354 handle: CallbackHandle,
355 flow_client: &super::client::OAuthClient<T, S>,
356 cfg: &LoopbackConfig,
357) -> crate::error::Result<super::client::OAuthSession<T, S>>
358where
359 T: OAuthResolver + DpopExt + Send + Sync + 'static,
360 S: ClientAuthStore + Send + Sync + 'static,
361{
362 complete_callback(
363 flow_client,
364 wait_for_callback(handle, cfg.timeout_ms).await?,
365 )
366 .await
367}
368
369/// Handles the OAuth callback for the localhost loopback server.
370///
371/// Returns a session if the callback succeeds within the configured timeout
372/// and shuts down the server.
373///
374/// When the `scope-check` feature is enabled, `T` must also implement `LexiconSchemaResolver`
375/// for eager resolution of include scopes.
376#[cfg(feature = "scope-check")]
377pub async fn handle_localhost_callback<T, S>(
378 handle: CallbackHandle,
379 flow_client: &super::client::OAuthClient<T, S>,
380 cfg: &LoopbackConfig,
381) -> crate::error::Result<super::client::OAuthSession<T, S>>
382where
383 T: OAuthResolver
384 + DpopExt
385 + jacquard_identity::lexicon_resolver::LexiconSchemaResolver
386 + Send
387 + Sync
388 + 'static,
389 S: ClientAuthStore + Send + Sync + 'static,
390{
391 complete_callback(
392 flow_client,
393 wait_for_callback(handle, cfg.timeout_ms).await?,
394 )
395 .await
396}
397
398fn loopback_port(cfg: &LoopbackConfig) -> u16 {
399 match cfg.port {
400 LoopbackPort::Fixed(port) => port,
401 LoopbackPort::Ephemeral => 0,
402 }
403}
404
405fn redirect_host(host: &str) -> String {
406 if host.contains(':') && !host.starts_with('[') {
407 format!("[{host}]")
408 } else {
409 host.to_owned()
410 }
411}
412
413impl<T, S> OAuthClient<T, S>
414where
415 T: OAuthResolver + DpopExt + Send + Sync + 'static,
416 S: ClientAuthStore + Send + Sync + 'static,
417{
418 async fn start_loopback_flow(
419 &self,
420 input: &str,
421 opts: AuthorizeOptions<SmolStr>,
422 cfg: &LoopbackConfig,
423 ) -> crate::error::Result<(OAuthClient<T, S>, CallbackHandle)> {
424 let (local_addr, handle) = one_shot_server((cfg.host.as_str(), loopback_port(cfg)))
425 .await
426 .map_err(|err| OAuthError::Callback(CallbackError::LoopbackServer(err.to_string())))?;
427 println!("Listening on {}", local_addr);
428
429 let client_data = self.build_localhost_client_data(cfg, &opts, local_addr);
430 let flow_client = OAuthClient::new_with_shared(
431 self.registry.store.clone(),
432 self.client.clone(),
433 client_data,
434 );
435
436 let auth_url = flow_client.start_auth(input, opts).await?;
437 println!("To authenticate with your PDS, visit:\n{}\n", auth_url);
438 if cfg.open_browser {
439 let _ = try_open_in_browser(&auth_url);
440 }
441
442 Ok((flow_client, handle))
443 }
444
445 /// Builds a [`crate::session::ClientData`] for use with the local loopback server method of OAuth.
446 pub fn build_localhost_client_data(
447 &self,
448 cfg: &LoopbackConfig,
449 opts: &AuthorizeOptions<SmolStr>,
450 local_addr: SocketAddr,
451 ) -> crate::session::ClientData<SmolStr> {
452 let redirect_uri = format!(
453 "http://{}:{}/oauth/callback",
454 redirect_host(&cfg.host),
455 local_addr.port(),
456 );
457 let redirect = Uri::parse(redirect_uri).unwrap();
458
459 let scopes = if opts.scopes.is_empty() {
460 Some(self.registry.client_data.config.scopes.clone())
461 } else {
462 Some(opts.scopes.clone())
463 };
464
465 crate::session::ClientData {
466 keyset: self.registry.client_data.keyset.clone(),
467 config: AtprotoClientMetadata::new_localhost(Some(vec![redirect]), scopes),
468 }
469 .into_static()
470 }
471
472 async fn restore_matching_session<HintStr: BosStr + Send + Sync>(
473 &self,
474 hint: &SessionHint<HintStr>,
475 ) -> crate::error::Result<Option<super::client::OAuthSession<T, S>>>
476 where
477 S: SessionSelector<OAuthSessionMatch, Error = SessionStoreError>,
478 {
479 if let Some(matched) =
480 OAuthSessionSelector::new(self.registry.store.as_ref(), self.client.as_ref())
481 .select_session(hint)
482 .await?
483 {
484 Ok(Some(
485 self.restore(&matched.key.did, matched.key.session_id.as_str())
486 .await?,
487 ))
488 } else {
489 Ok(None)
490 }
491 }
492}
493
494fn loopback_start_auth_input_from_hint<S: BosStr>(hint: &SessionHint<S>) -> Option<SmolStr> {
495 match hint {
496 SessionHint::Did(did) => Some(SmolStr::new(did.as_ref())),
497 SessionHint::Handle(handle) => Some(SmolStr::new(handle.as_ref())),
498 SessionHint::Key(key) => Some(key.did.as_str().into()),
499 SessionHint::Identifier(identifier) => Some(SmolStr::new(identifier.as_ref())),
500 SessionHint::Any => None,
501 }
502}
503
504fn should_start_login_after_restore_error(err: &OAuthError) -> bool {
505 matches!(err, OAuthError::Session(session_err) if session_err.is_permanent())
506}
507
508#[cfg(not(feature = "scope-check"))]
509impl<T, S> OAuthClient<T, S>
510where
511 T: OAuthResolver + DpopExt + Send + Sync + 'static,
512 S: ClientAuthStore + Send + Sync + 'static,
513{
514 /// Drive the full OAuth flow using a local loopback server.
515 ///
516 /// This uses localhost OAuth and an ephemeral in-process web server to
517 /// handle the OAuth callback redirect. It has friendly defaults to drive
518 /// the entire callback flow for development and small CLI applications.
519 pub async fn login_with_local_server(
520 &self,
521 input: impl AsRef<str>,
522 opts: AuthorizeOptions<SmolStr>,
523 cfg: LoopbackConfig,
524 ) -> crate::error::Result<OAuthSession<T, S>> {
525 let (flow_client, handle) = self.start_loopback_flow(input.as_ref(), opts, &cfg).await?;
526 handle_localhost_callback(handle, &flow_client, &cfg).await
527 }
528
529 /// Resume a stored session, or drive the full OAuth flow using a local loopback server.
530 ///
531 /// Returns `Ok(None)` when no stored session matches and `hint` does not contain enough
532 /// information to start a new loopback OAuth flow.
533 pub async fn resume_or_login_with_local_server<HintStr: BosStr + Send + Sync>(
534 &self,
535 hint: &SessionHint<HintStr>,
536 opts: AuthorizeOptions<SmolStr>,
537 cfg: LoopbackConfig,
538 ) -> crate::error::Result<Option<OAuthSession<T, S>>>
539 where
540 S: SessionSelector<OAuthSessionMatch, Error = SessionStoreError>,
541 {
542 let input = loopback_start_auth_input_from_hint(hint);
543 match self.restore_matching_session(hint).await {
544 Ok(Some(session)) => return Ok(Some(session)),
545 Ok(None) => {}
546 Err(err) if input.is_some() && should_start_login_after_restore_error(&err) => {}
547 Err(err) => return Err(err),
548 }
549 let Some(input) = input else {
550 return Ok(None);
551 };
552 self.login_with_local_server(input.as_str(), opts, cfg)
553 .await
554 .map(Some)
555 }
556}
557
558#[cfg(feature = "scope-check")]
559impl<T, S> OAuthClient<T, S>
560where
561 T: OAuthResolver
562 + DpopExt
563 + jacquard_identity::lexicon_resolver::LexiconSchemaResolver
564 + Send
565 + Sync
566 + 'static,
567 S: ClientAuthStore + Send + Sync + 'static,
568{
569 /// Drive the full OAuth flow using a local loopback server.
570 ///
571 /// This uses localhost OAuth and an ephemeral in-process web server to
572 /// handle the OAuth callback redirect. It has friendly defaults to drive
573 /// the entire callback flow for development and small CLI applications.
574 pub async fn login_with_local_server(
575 &self,
576 input: impl AsRef<str>,
577 opts: AuthorizeOptions<SmolStr>,
578 cfg: LoopbackConfig,
579 ) -> crate::error::Result<OAuthSession<T, S>> {
580 let (flow_client, handle) = self.start_loopback_flow(input.as_ref(), opts, &cfg).await?;
581 handle_localhost_callback(handle, &flow_client, &cfg).await
582 }
583
584 /// Resume a stored session, or drive the full OAuth flow using a local loopback server.
585 ///
586 /// Returns `Ok(None)` when no stored session matches and `hint` does not contain enough
587 /// information to start a new loopback OAuth flow.
588 pub async fn resume_or_login_with_local_server<HintStr: BosStr + Send + Sync>(
589 &self,
590 hint: &SessionHint<HintStr>,
591 opts: AuthorizeOptions<SmolStr>,
592 cfg: LoopbackConfig,
593 ) -> crate::error::Result<Option<OAuthSession<T, S>>>
594 where
595 S: SessionSelector<OAuthSessionMatch, Error = SessionStoreError>,
596 {
597 let input = loopback_start_auth_input_from_hint(hint);
598 match self.restore_matching_session(hint).await {
599 Ok(Some(session)) => return Ok(Some(session)),
600 Ok(None) => {}
601 Err(err) if input.is_some() && should_start_login_after_restore_error(&err) => {}
602 Err(err) => return Err(err),
603 }
604 let Some(input) = input else {
605 return Ok(None);
606 };
607 self.login_with_local_server(input.as_str(), opts, cfg)
608 .await
609 .map(Some)
610 }
611}