A better Rust ATProto crate
1

Configure Feed

Select the types of activity you want to include in your feed.

at main 20 kB View raw
1//! 2//! Helpers for the local loopback server method of atproto OAuth. 3//! 4//! `OAuthClient::login_with_local_server()` is the nice helper. Here is where 5//! it and its components live. Below is what it does, so you can have more 6//! granular control without having to make your own loopback server. 7//! 8//! ```ignore 9//! let input = "your_handle_here"; 10//! let cfg = LoopbackConfig::default(); 11//! let opts = AuthorizeOptions::default(); 12//! let port = match cfg.port { 13//! LoopbackPort::Fixed(p) => p, 14//! LoopbackPort::Ephemeral => 0, 15//! }; 16//! // TODO: fix this to it also accepts ipv6 and properly finds a free port 17//! let bind_addr: SocketAddr = format!("0.0.0.0:{}", port) 18//! .parse() 19//! .expect("invalid loopback host/port"); 20//! let oauth = OAuthClient::with_default_config(FileAuthStore::new(&args.store)); 21//! 22//! let (local_addr, handle) = one_shot_server(bind_addr); 23//! println!("Listening on {}", local_addr); 24//! 25//! let client_data = oauth.build_localhost_client_data(&cfg, &opts, local_addr); 26//! // Build client using store and resolver 27//! let flow_client = OAuthClient::new_with_shared( 28//! self.registry.store.clone(), 29//! self.client.clone(), 30//! client_data, 31//! ); 32//! 33//! // Start auth and get authorization URL 34//! let auth_url = flow_client.start_auth(input.as_ref(), opts).await?; 35//! // Print URL for copy/paste 36//! println!("To authenticate with your PDS, visit:\n{}\n", auth_url); 37//! // Optionally open browser 38//! if cfg.open_browser { 39//! let _ = try_open_in_browser(&auth_url); 40//! } 41//! 42//! handle_localhost_callback(handle, &flow_client, &cfg).await 43//! ``` 44//! 45//! 46#![cfg(all( 47 feature = "loopback", 48 not(all(target_arch = "wasm32", target_os = "unknown")) 49))] 50use crate::{ 51 atproto::AtprotoClientMetadata, 52 authstore::{ClientAuthStore, OAuthSessionMatch, OAuthSessionSelector}, 53 client::{OAuthClient, OAuthSession}, 54 dpop::DpopExt, 55 error::{CallbackError, OAuthError}, 56 resolver::OAuthResolver, 57 types::{AuthorizeOptions, CallbackParams}, 58}; 59use jacquard_common::deps::fluent_uri::Uri; 60use jacquard_common::session::{SessionHint, SessionSelector, SessionStoreError}; 61use jacquard_common::{IntoStatic, bos::BosStr}; 62use smol_str::SmolStr; 63use std::net::SocketAddr; 64use tokio::{ 65 io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, 66 net::{TcpListener, TcpStream, ToSocketAddrs}, 67 sync::{mpsc, oneshot}, 68}; 69 70/// Port selection strategy for the loopback OAuth callback server. 71#[derive(Clone, Debug)] 72pub enum LoopbackPort { 73 /// Bind to a specific port number. 74 Fixed(u16), 75 /// Let the OS assign an available port. 76 Ephemeral, 77} 78 79/// Configuration for the loopback OAuth callback server. 80#[derive(Clone, Debug)] 81pub struct LoopbackConfig { 82 /// The host address to bind to (e.g., `"127.0.0.1"`). 83 pub host: String, 84 /// Port selection strategy. 85 pub port: LoopbackPort, 86 /// Whether to attempt opening the authorization URL in the user's browser. 87 pub open_browser: bool, 88 /// How long to wait for the callback before timing out, in milliseconds. 89 pub timeout_ms: u64, 90} 91 92impl Default for LoopbackConfig { 93 fn default() -> Self { 94 Self { 95 host: "127.0.0.1".into(), 96 port: LoopbackPort::Fixed(4000), 97 open_browser: true, 98 timeout_ms: 5 * 60 * 1000, 99 } 100 } 101} 102 103/// Attempts to open the given URL in the user's default browser. 104/// 105/// Returns `true` if the browser was opened successfully, `false` otherwise. 106#[cfg(feature = "browser-open")] 107pub fn try_open_in_browser(url: &str) -> bool { 108 webbrowser::open(url).is_ok() 109} 110/// Stub for when the `browser-open` feature is disabled. Always returns `false`. 111#[cfg(not(feature = "browser-open"))] 112pub fn try_open_in_browser(_url: &str) -> bool { 113 false 114} 115 116async fn handle_callback_connection(mut stream: TcpStream, tx: mpsc::Sender<CallbackRequest>) { 117 let Some(Some(params)) = read_callback_params(&mut stream).await else { 118 let _ = write_http_response(&mut stream, 404, "Not found").await; 119 return; 120 }; 121 122 let (response_tx, response_rx) = oneshot::channel(); 123 match tx.try_send(CallbackRequest { 124 params, 125 response_tx, 126 }) { 127 Ok(()) => match response_rx.await { 128 Ok(response) => { 129 let _ = write_http_response(&mut stream, response.status, response.body).await; 130 } 131 Err(_) => { 132 let _ = write_http_response(&mut stream, 500, OAUTH_CALLBACK_FAILURE_BODY).await; 133 } 134 }, 135 Err(_) => { 136 let _ = write_http_response(&mut stream, 500, "Could not deliver OAuth callback").await; 137 } 138 } 139} 140 141struct CallbackRequest { 142 params: CallbackParams, 143 response_tx: oneshot::Sender<CallbackResponse>, 144} 145 146struct CallbackResponse { 147 status: u16, 148 body: &'static str, 149} 150 151const OAUTH_CALLBACK_SUCCESS_BODY: &str = r#"<!doctype html> 152<html lang="en"> 153<head> 154 <meta charset="utf-8"> 155 <title>Jacquard OAuth login complete</title> 156</head> 157<body> 158 <h1>Jacquard OAuth login complete.</h1> 159 <p>You can close this tab and return to the application.</p> 160</body> 161</html> 162"#; 163 164const OAUTH_CALLBACK_FAILURE_BODY: &str = r#"<!doctype html> 165<html lang="en"> 166<head> 167 <meta charset="utf-8"> 168 <title>Jacquard OAuth login failed</title> 169</head> 170<body> 171 <h1>Jacquard OAuth login failed.</h1> 172 <p>Return to the application for details.</p> 173</body> 174</html> 175"#; 176 177async fn read_callback_params(stream: &mut TcpStream) -> Option<Option<CallbackParams>> { 178 let mut reader = BufReader::new(stream); 179 let mut request_line = String::new(); 180 reader.read_line(&mut request_line).await.ok()?; 181 let mut parts = request_line.split_whitespace(); 182 let method = parts.next()?; 183 let target = parts.next()?; 184 if method != "GET" { 185 return Some(None); 186 } 187 let (path, query) = target.split_once('?').unwrap_or((target, "")); 188 if path != "/oauth/callback" { 189 return Some(None); 190 } 191 serde_html_form::from_str(query).ok().map(Some) 192} 193 194async fn write_http_response( 195 stream: &mut TcpStream, 196 status: u16, 197 body: &str, 198) -> std::io::Result<()> { 199 let reason = match status { 200 200 => "OK", 201 404 => "Not Found", 202 500 => "Internal Server Error", 203 _ => "OK", 204 }; 205 let content_type = if body.trim_start().starts_with("<!doctype html>") { 206 "text/html; charset=utf-8" 207 } else { 208 "text/plain; charset=utf-8" 209 }; 210 let response = format!( 211 "HTTP/1.1 {status} {reason}\r\ncontent-type: {content_type}\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", 212 body.len(), 213 body 214 ); 215 stream.write_all(response.as_bytes()).await 216} 217 218/// Handle to a running loopback callback server, used to await the OAuth redirect. 219pub struct CallbackHandle { 220 server_handle: tokio::task::JoinHandle<()>, 221 server_stop: oneshot::Sender<()>, 222 callback_rx: mpsc::Receiver<CallbackRequest>, 223} 224 225/// One-shot OAuth callback server. 226/// 227/// Starts an ephemeral in-process web server that listens for the OAuth 228/// callback redirect. Returns the server address and a [`CallbackHandle`] 229/// that can be used to wait for the callback and stop the server. 230/// 231/// Use in combination with [`handle_localhost_callback`] to handle the 232/// callback for the localhost loopback server. 233pub async fn one_shot_server( 234 addr: impl ToSocketAddrs, 235) -> std::io::Result<(SocketAddr, CallbackHandle)> { 236 let (tx, callback_rx) = mpsc::channel(5); 237 let listener = TcpListener::bind(addr).await?; 238 let local_addr = listener.local_addr()?; 239 let (server_stop, mut stop_rx) = oneshot::channel(); 240 let server_handle = tokio::spawn(async move { 241 loop { 242 tokio::select! { 243 _ = &mut stop_rx => break, 244 accepted = listener.accept() => { 245 match accepted { 246 Ok((stream, _)) => { 247 tokio::spawn(handle_callback_connection(stream, tx.clone())); 248 } 249 Err(_) => break, 250 } 251 } 252 } 253 } 254 }); 255 let handle = CallbackHandle { 256 server_handle, 257 server_stop, 258 callback_rx, 259 }; 260 Ok((local_addr, handle)) 261} 262 263async fn wait_for_callback( 264 handle: CallbackHandle, 265 timeout_ms: u64, 266) -> Result<CallbackRequest, OAuthError> { 267 let CallbackHandle { 268 server_handle, 269 server_stop, 270 mut callback_rx, 271 } = handle; 272 let cb = tokio::time::timeout( 273 std::time::Duration::from_millis(timeout_ms), 274 callback_rx.recv(), 275 ) 276 .await; 277 let _ = server_stop.send(()); 278 let _ = server_handle.await; 279 if let Ok(Some(cb)) = cb { 280 Ok(cb) 281 } else { 282 Err(OAuthError::Callback(CallbackError::Timeout)) 283 } 284} 285 286#[cfg(not(feature = "scope-check"))] 287async fn complete_callback<T, S>( 288 flow_client: &super::client::OAuthClient<T, S>, 289 request: CallbackRequest, 290) -> crate::error::Result<super::client::OAuthSession<T, S>> 291where 292 T: OAuthResolver + DpopExt + Send + Sync + 'static, 293 S: ClientAuthStore + Send + Sync + 'static, 294{ 295 match flow_client.callback(request.params).await { 296 Ok(session) => { 297 let _ = request.response_tx.send(CallbackResponse { 298 status: 200, 299 body: OAUTH_CALLBACK_SUCCESS_BODY, 300 }); 301 Ok(session) 302 } 303 Err(err) => { 304 let _ = request.response_tx.send(CallbackResponse { 305 status: 500, 306 body: OAUTH_CALLBACK_FAILURE_BODY, 307 }); 308 Err(err) 309 } 310 } 311} 312 313#[cfg(feature = "scope-check")] 314async fn complete_callback<T, S>( 315 flow_client: &super::client::OAuthClient<T, S>, 316 request: CallbackRequest, 317) -> crate::error::Result<super::client::OAuthSession<T, S>> 318where 319 T: OAuthResolver 320 + DpopExt 321 + jacquard_identity::lexicon_resolver::LexiconSchemaResolver 322 + Send 323 + Sync 324 + 'static, 325 S: ClientAuthStore + Send + Sync + 'static, 326{ 327 match flow_client.callback(request.params).await { 328 Ok(session) => { 329 let _ = request.response_tx.send(CallbackResponse { 330 status: 200, 331 body: OAUTH_CALLBACK_SUCCESS_BODY, 332 }); 333 Ok(session) 334 } 335 Err(err) => { 336 let _ = request.response_tx.send(CallbackResponse { 337 status: 500, 338 body: OAUTH_CALLBACK_FAILURE_BODY, 339 }); 340 Err(err) 341 } 342 } 343} 344 345/// Handles the OAuth callback for the localhost loopback server. 346/// 347/// Returns a session if the callback succeeds within the configured timeout 348/// and shuts down the server. 349/// 350/// When the `scope-check` feature is enabled, `T` must also implement `LexiconSchemaResolver` 351/// for eager resolution of include scopes. 352#[cfg(not(feature = "scope-check"))] 353pub async fn handle_localhost_callback<T, S>( 354 handle: CallbackHandle, 355 flow_client: &super::client::OAuthClient<T, S>, 356 cfg: &LoopbackConfig, 357) -> crate::error::Result<super::client::OAuthSession<T, S>> 358where 359 T: OAuthResolver + DpopExt + Send + Sync + 'static, 360 S: ClientAuthStore + Send + Sync + 'static, 361{ 362 complete_callback( 363 flow_client, 364 wait_for_callback(handle, cfg.timeout_ms).await?, 365 ) 366 .await 367} 368 369/// Handles the OAuth callback for the localhost loopback server. 370/// 371/// Returns a session if the callback succeeds within the configured timeout 372/// and shuts down the server. 373/// 374/// When the `scope-check` feature is enabled, `T` must also implement `LexiconSchemaResolver` 375/// for eager resolution of include scopes. 376#[cfg(feature = "scope-check")] 377pub async fn handle_localhost_callback<T, S>( 378 handle: CallbackHandle, 379 flow_client: &super::client::OAuthClient<T, S>, 380 cfg: &LoopbackConfig, 381) -> crate::error::Result<super::client::OAuthSession<T, S>> 382where 383 T: OAuthResolver 384 + DpopExt 385 + jacquard_identity::lexicon_resolver::LexiconSchemaResolver 386 + Send 387 + Sync 388 + 'static, 389 S: ClientAuthStore + Send + Sync + 'static, 390{ 391 complete_callback( 392 flow_client, 393 wait_for_callback(handle, cfg.timeout_ms).await?, 394 ) 395 .await 396} 397 398fn loopback_port(cfg: &LoopbackConfig) -> u16 { 399 match cfg.port { 400 LoopbackPort::Fixed(port) => port, 401 LoopbackPort::Ephemeral => 0, 402 } 403} 404 405fn redirect_host(host: &str) -> String { 406 if host.contains(':') && !host.starts_with('[') { 407 format!("[{host}]") 408 } else { 409 host.to_owned() 410 } 411} 412 413impl<T, S> OAuthClient<T, S> 414where 415 T: OAuthResolver + DpopExt + Send + Sync + 'static, 416 S: ClientAuthStore + Send + Sync + 'static, 417{ 418 async fn start_loopback_flow( 419 &self, 420 input: &str, 421 opts: AuthorizeOptions<SmolStr>, 422 cfg: &LoopbackConfig, 423 ) -> crate::error::Result<(OAuthClient<T, S>, CallbackHandle)> { 424 let (local_addr, handle) = one_shot_server((cfg.host.as_str(), loopback_port(cfg))) 425 .await 426 .map_err(|err| OAuthError::Callback(CallbackError::LoopbackServer(err.to_string())))?; 427 println!("Listening on {}", local_addr); 428 429 let client_data = self.build_localhost_client_data(cfg, &opts, local_addr); 430 let flow_client = OAuthClient::new_with_shared( 431 self.registry.store.clone(), 432 self.client.clone(), 433 client_data, 434 ); 435 436 let auth_url = flow_client.start_auth(input, opts).await?; 437 println!("To authenticate with your PDS, visit:\n{}\n", auth_url); 438 if cfg.open_browser { 439 let _ = try_open_in_browser(&auth_url); 440 } 441 442 Ok((flow_client, handle)) 443 } 444 445 /// Builds a [`crate::session::ClientData`] for use with the local loopback server method of OAuth. 446 pub fn build_localhost_client_data( 447 &self, 448 cfg: &LoopbackConfig, 449 opts: &AuthorizeOptions<SmolStr>, 450 local_addr: SocketAddr, 451 ) -> crate::session::ClientData<SmolStr> { 452 let redirect_uri = format!( 453 "http://{}:{}/oauth/callback", 454 redirect_host(&cfg.host), 455 local_addr.port(), 456 ); 457 let redirect = Uri::parse(redirect_uri).unwrap(); 458 459 let scopes = if opts.scopes.is_empty() { 460 Some(self.registry.client_data.config.scopes.clone()) 461 } else { 462 Some(opts.scopes.clone()) 463 }; 464 465 crate::session::ClientData { 466 keyset: self.registry.client_data.keyset.clone(), 467 config: AtprotoClientMetadata::new_localhost(Some(vec![redirect]), scopes), 468 } 469 .into_static() 470 } 471 472 async fn restore_matching_session<HintStr: BosStr + Send + Sync>( 473 &self, 474 hint: &SessionHint<HintStr>, 475 ) -> crate::error::Result<Option<super::client::OAuthSession<T, S>>> 476 where 477 S: SessionSelector<OAuthSessionMatch, Error = SessionStoreError>, 478 { 479 if let Some(matched) = 480 OAuthSessionSelector::new(self.registry.store.as_ref(), self.client.as_ref()) 481 .select_session(hint) 482 .await? 483 { 484 Ok(Some( 485 self.restore(&matched.key.did, matched.key.session_id.as_str()) 486 .await?, 487 )) 488 } else { 489 Ok(None) 490 } 491 } 492} 493 494fn loopback_start_auth_input_from_hint<S: BosStr>(hint: &SessionHint<S>) -> Option<SmolStr> { 495 match hint { 496 SessionHint::Did(did) => Some(SmolStr::new(did.as_ref())), 497 SessionHint::Handle(handle) => Some(SmolStr::new(handle.as_ref())), 498 SessionHint::Key(key) => Some(key.did.as_str().into()), 499 SessionHint::Identifier(identifier) => Some(SmolStr::new(identifier.as_ref())), 500 SessionHint::Any => None, 501 } 502} 503 504fn should_start_login_after_restore_error(err: &OAuthError) -> bool { 505 matches!(err, OAuthError::Session(session_err) if session_err.is_permanent()) 506} 507 508#[cfg(not(feature = "scope-check"))] 509impl<T, S> OAuthClient<T, S> 510where 511 T: OAuthResolver + DpopExt + Send + Sync + 'static, 512 S: ClientAuthStore + Send + Sync + 'static, 513{ 514 /// Drive the full OAuth flow using a local loopback server. 515 /// 516 /// This uses localhost OAuth and an ephemeral in-process web server to 517 /// handle the OAuth callback redirect. It has friendly defaults to drive 518 /// the entire callback flow for development and small CLI applications. 519 pub async fn login_with_local_server( 520 &self, 521 input: impl AsRef<str>, 522 opts: AuthorizeOptions<SmolStr>, 523 cfg: LoopbackConfig, 524 ) -> crate::error::Result<OAuthSession<T, S>> { 525 let (flow_client, handle) = self.start_loopback_flow(input.as_ref(), opts, &cfg).await?; 526 handle_localhost_callback(handle, &flow_client, &cfg).await 527 } 528 529 /// Resume a stored session, or drive the full OAuth flow using a local loopback server. 530 /// 531 /// Returns `Ok(None)` when no stored session matches and `hint` does not contain enough 532 /// information to start a new loopback OAuth flow. 533 pub async fn resume_or_login_with_local_server<HintStr: BosStr + Send + Sync>( 534 &self, 535 hint: &SessionHint<HintStr>, 536 opts: AuthorizeOptions<SmolStr>, 537 cfg: LoopbackConfig, 538 ) -> crate::error::Result<Option<OAuthSession<T, S>>> 539 where 540 S: SessionSelector<OAuthSessionMatch, Error = SessionStoreError>, 541 { 542 let input = loopback_start_auth_input_from_hint(hint); 543 match self.restore_matching_session(hint).await { 544 Ok(Some(session)) => return Ok(Some(session)), 545 Ok(None) => {} 546 Err(err) if input.is_some() && should_start_login_after_restore_error(&err) => {} 547 Err(err) => return Err(err), 548 } 549 let Some(input) = input else { 550 return Ok(None); 551 }; 552 self.login_with_local_server(input.as_str(), opts, cfg) 553 .await 554 .map(Some) 555 } 556} 557 558#[cfg(feature = "scope-check")] 559impl<T, S> OAuthClient<T, S> 560where 561 T: OAuthResolver 562 + DpopExt 563 + jacquard_identity::lexicon_resolver::LexiconSchemaResolver 564 + Send 565 + Sync 566 + 'static, 567 S: ClientAuthStore + Send + Sync + 'static, 568{ 569 /// Drive the full OAuth flow using a local loopback server. 570 /// 571 /// This uses localhost OAuth and an ephemeral in-process web server to 572 /// handle the OAuth callback redirect. It has friendly defaults to drive 573 /// the entire callback flow for development and small CLI applications. 574 pub async fn login_with_local_server( 575 &self, 576 input: impl AsRef<str>, 577 opts: AuthorizeOptions<SmolStr>, 578 cfg: LoopbackConfig, 579 ) -> crate::error::Result<OAuthSession<T, S>> { 580 let (flow_client, handle) = self.start_loopback_flow(input.as_ref(), opts, &cfg).await?; 581 handle_localhost_callback(handle, &flow_client, &cfg).await 582 } 583 584 /// Resume a stored session, or drive the full OAuth flow using a local loopback server. 585 /// 586 /// Returns `Ok(None)` when no stored session matches and `hint` does not contain enough 587 /// information to start a new loopback OAuth flow. 588 pub async fn resume_or_login_with_local_server<HintStr: BosStr + Send + Sync>( 589 &self, 590 hint: &SessionHint<HintStr>, 591 opts: AuthorizeOptions<SmolStr>, 592 cfg: LoopbackConfig, 593 ) -> crate::error::Result<Option<OAuthSession<T, S>>> 594 where 595 S: SessionSelector<OAuthSessionMatch, Error = SessionStoreError>, 596 { 597 let input = loopback_start_auth_input_from_hint(hint); 598 match self.restore_matching_session(hint).await { 599 Ok(Some(session)) => return Ok(Some(session)), 600 Ok(None) => {} 601 Err(err) if input.is_some() && should_start_login_after_restore_error(&err) => {} 602 Err(err) => return Err(err), 603 } 604 let Some(input) = input else { 605 return Ok(None); 606 }; 607 self.login_with_local_server(input.as_str(), opts, cfg) 608 .await 609 .map(Some) 610 } 611}