Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0

Configure Feed

Select the types of activity you want to include in your feed.

1use askama::Template; 2use axum::{ 3 extract::{Query, Request}, 4 http::{self, header}, 5 middleware::{self, Next}, 6 response::{IntoResponse, Json, Response}, 7 routing::get, 8 Router, 9}; 10use axum_metrics::{ExtraMetricLabels, MetricLayer}; 11use bincode::Options; 12use serde::{Deserialize, Serialize}; 13use serde_with::serde_as; 14use std::collections::{HashMap, HashSet}; 15use std::time::{Duration, UNIX_EPOCH}; 16use tokio::net::{TcpListener, ToSocketAddrs}; 17use tokio::task::spawn_blocking; 18use tokio_util::sync::CancellationToken; 19 20use crate::storage::{LinkReader, Order, StorageStats}; 21use crate::{CountsByCount, Did, RecordId}; 22 23mod acceptable; 24mod filters; 25 26use acceptable::{acceptable, ExtractAccept}; 27 28const DEFAULT_CURSOR_LIMIT: u64 = 100; 29const DEFAULT_CURSOR_LIMIT_MAX: u64 = 1000; 30 31fn get_default_cursor_limit() -> u64 { 32 DEFAULT_CURSOR_LIMIT 33} 34 35fn to500(e: tokio::task::JoinError) -> http::StatusCode { 36 eprintln!("handler error: {e}"); 37 http::StatusCode::INTERNAL_SERVER_ERROR 38} 39 40pub async fn serve<S: LinkReader, A: ToSocketAddrs>( 41 store: S, 42 addr: A, 43 did_web_domain: Option<String>, 44 stay_alive: CancellationToken, 45) -> anyhow::Result<()> { 46 let mut app = Router::new(); 47 48 if let Some(d) = did_web_domain { 49 app = app.route( 50 "/.well-known/did.json", 51 get({ 52 let domain = d.clone(); 53 move || did_web(domain) 54 }), 55 ) 56 } 57 58 let app = app 59 .route("/robots.txt", get(robots)) 60 .route( 61 "/", 62 get({ 63 let store = store.clone(); 64 move |accept| async { 65 spawn_blocking(|| hello(accept, store)) 66 .await 67 .map_err(to500)? 68 } 69 }), 70 ) 71 .route( 72 "/xrpc/blue.microcosm.links.getManyToManyCounts", 73 get({ 74 let store = store.clone(); 75 move |accept, query| async { 76 spawn_blocking(|| get_many_to_many_counts(accept, query, store)) 77 .await 78 .map_err(to500)? 79 } 80 }), 81 ) 82 // deprecated 83 .route( 84 "/links/count", 85 get({ 86 let store = store.clone(); 87 move |accept, query| async { 88 spawn_blocking(|| count_links(accept, query, store)) 89 .await 90 .map_err(to500)? 91 } 92 }), 93 ) 94 .route( 95 "/xrpc/blue.microcosm.links.getBacklinksCount", 96 get({ 97 let store = store.clone(); 98 move |accept, query| async { 99 spawn_blocking(|| get_backlink_counts(accept, query, store)) 100 .await 101 .map_err(to500)? 102 } 103 }), 104 ) 105 .route( 106 "/links/count/distinct-dids", 107 get({ 108 let store = store.clone(); 109 move |accept, query| async { 110 spawn_blocking(|| count_distinct_dids(accept, query, store)) 111 .await 112 .map_err(to500)? 113 } 114 }), 115 ) 116 .route( 117 "/xrpc/blue.microcosm.links.getBacklinks", 118 get({ 119 let store = store.clone(); 120 move |accept, query| async { 121 spawn_blocking(|| get_backlinks(accept, query, store)) 122 .await 123 .map_err(to500)? 124 } 125 }), 126 ) 127 .route( 128 "/links", 129 get({ 130 let store = store.clone(); 131 move |accept, query| async { 132 spawn_blocking(|| get_links(accept, query, store)) 133 .await 134 .map_err(to500)? 135 } 136 }), 137 ) 138 .route( 139 "/links/distinct-dids", 140 get({ 141 let store = store.clone(); 142 move |accept, query| async { 143 spawn_blocking(|| get_distinct_dids(accept, query, store)) 144 .await 145 .map_err(to500)? 146 } 147 }), 148 ) 149 .route( 150 // deprecated 151 "/links/all/count", 152 get({ 153 let store = store.clone(); 154 move |accept, query| async { 155 spawn_blocking(|| count_all_links(accept, query, store)) 156 .await 157 .map_err(to500)? 158 } 159 }), 160 ) 161 .route( 162 "/links/all", 163 get({ 164 let store = store.clone(); 165 move |accept, query| async { 166 spawn_blocking(|| explore_links(accept, query, store)) 167 .await 168 .map_err(to500)? 169 } 170 }), 171 ) 172 .layer(tower_http::cors::CorsLayer::permissive()) 173 .layer(middleware::from_fn(add_lables)) 174 .layer(MetricLayer::default()); 175 176 let listener = TcpListener::bind(addr).await?; 177 println!("api: listening at http://{:?}", listener.local_addr()?); 178 axum::serve(listener, app) 179 .with_graceful_shutdown(async move { stay_alive.cancelled().await }) 180 .await?; 181 182 Ok(()) 183} 184 185async fn add_lables(request: Request, next: Next) -> Response { 186 let origin = request 187 .headers() 188 .get(header::ORIGIN) 189 .and_then(|o| o.to_str().map(|v| v.to_owned()).ok()); 190 let user_agent = request.headers().get(header::USER_AGENT).and_then(|ua| { 191 ua.to_str() 192 .map(|v| { 193 if v.starts_with("Mozilla/") { 194 "Mozilla/...".into() 195 } else { 196 v.to_owned() 197 } 198 }) 199 .ok() 200 }); 201 202 let mut res = next.run(request).await; 203 204 let mut labels = Vec::new(); 205 if let Some(o) = origin { 206 labels.push(metrics::Label::new("origin", o)); 207 } 208 if let Some(ua) = user_agent { 209 labels.push(metrics::Label::new("user_agent", ua)); 210 } 211 res.extensions_mut().insert(ExtraMetricLabels(labels)); 212 res 213} 214 215async fn robots() -> &'static str { 216 "\ 217User-agent: * 218Disallow: /links 219Disallow: /links/ 220Disallow: /xrpc/ 221 " 222} 223 224async fn did_web(domain: String) -> impl IntoResponse { 225 Json(serde_json::json!({ 226 "id": format!("did:web:{domain}"), 227 "service": [{ 228 "id": "#constellation", 229 "type": "ConstellationGraphService", 230 "serviceEndpoint": format!("https://{domain}") 231 }] 232 })) 233} 234 235#[derive(Template, Serialize, Deserialize)] 236#[template(path = "hello.html.j2")] 237struct HelloReponse { 238 help: &'static str, 239 days_indexed: Option<u64>, 240 stats: StorageStats, 241} 242fn hello( 243 accept: ExtractAccept, 244 store: impl LinkReader, 245) -> Result<impl IntoResponse, http::StatusCode> { 246 let stats = store 247 .get_stats() 248 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 249 let days_indexed = stats 250 .started_at 251 .map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed()) 252 .transpose() 253 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)? 254 .map(|d| d.as_secs() / 86_400); 255 Ok(acceptable(accept, HelloReponse { 256 help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.", 257 days_indexed, 258 stats, 259 })) 260} 261 262#[derive(Clone, Deserialize)] 263#[serde(rename_all = "camelCase")] 264struct GetManyToManyCountsQuery { 265 subject: String, 266 source: String, 267 /// path to the secondary link in the linking record 268 path_to_other: String, 269 /// filter to linking records (join of the m2m) by these DIDs 270 #[serde(default)] 271 did: Vec<String>, 272 /// filter to specific secondary records 273 #[serde(default)] 274 other_subject: Vec<String>, 275 cursor: Option<OpaqueApiCursor>, 276 /// Set the max number of links to return per page of results 277 #[serde(default = "get_default_cursor_limit")] 278 limit: u64, 279} 280#[derive(Serialize)] 281struct OtherSubjectCount { 282 subject: String, 283 total: u64, 284 distinct: u64, 285} 286#[derive(Template, Serialize)] 287#[template(path = "get-many-to-many-counts.html.j2")] 288struct GetManyToManyCountsResponse { 289 counts_by_other_subject: Vec<OtherSubjectCount>, 290 cursor: Option<OpaqueApiCursor>, 291 #[serde(skip_serializing)] 292 query: GetManyToManyCountsQuery, 293} 294fn get_many_to_many_counts( 295 accept: ExtractAccept, 296 query: axum_extra::extract::Query<GetManyToManyCountsQuery>, 297 store: impl LinkReader, 298) -> Result<impl IntoResponse, http::StatusCode> { 299 let cursor_key = query 300 .cursor 301 .clone() 302 .map(|oc| ApiKeyedCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 303 .transpose()? 304 .map(|c| c.next); 305 306 let limit = query.limit; 307 if limit > DEFAULT_CURSOR_LIMIT_MAX { 308 return Err(http::StatusCode::BAD_REQUEST); 309 } 310 311 let filter_dids: HashSet<Did> = HashSet::from_iter( 312 query 313 .did 314 .iter() 315 .map(|d| d.trim()) 316 .filter(|d| !d.is_empty()) 317 .map(|d| Did(d.to_string())), 318 ); 319 320 let filter_other_subjects: HashSet<String> = HashSet::from_iter( 321 query 322 .other_subject 323 .iter() 324 .map(|s| s.trim().to_string()) 325 .filter(|s| !s.is_empty()), 326 ); 327 328 let Some((collection, path)) = query.source.split_once(':') else { 329 return Err(http::StatusCode::BAD_REQUEST); 330 }; 331 let path = format!(".{path}"); 332 333 let path_to_other = format!(".{}", query.path_to_other); 334 335 let paged = store 336 .get_many_to_many_counts( 337 &query.subject, 338 collection, 339 &path, 340 &path_to_other, 341 limit, 342 cursor_key, 343 &filter_dids, 344 &filter_other_subjects, 345 ) 346 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 347 348 let cursor = paged.next.map(|next| ApiKeyedCursor { next }.into()); 349 350 let items = paged 351 .items 352 .into_iter() 353 .map(|(subject, total, distinct)| OtherSubjectCount { 354 subject, 355 total, 356 distinct, 357 }) 358 .collect(); 359 360 Ok(acceptable( 361 accept, 362 GetManyToManyCountsResponse { 363 counts_by_other_subject: items, 364 cursor, 365 query: (*query).clone(), 366 }, 367 )) 368} 369 370#[derive(Clone, Deserialize)] 371struct GetLinksCountQuery { 372 target: String, 373 collection: String, 374 path: String, 375} 376#[derive(Template, Serialize)] 377#[template(path = "links-count.html.j2")] 378struct GetLinksCountResponse { 379 total: u64, 380 #[serde(skip_serializing)] 381 query: GetLinksCountQuery, 382} 383fn count_links( 384 accept: ExtractAccept, 385 query: Query<GetLinksCountQuery>, 386 store: impl LinkReader, 387) -> Result<impl IntoResponse, http::StatusCode> { 388 let total = store 389 .get_count(&query.target, &query.collection, &query.path) 390 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 391 Ok(acceptable( 392 accept, 393 GetLinksCountResponse { 394 total, 395 query: (*query).clone(), 396 }, 397 )) 398} 399 400#[derive(Clone, Deserialize)] 401struct GetItemsCountQuery { 402 subject: String, 403 source: String, 404} 405#[derive(Template, Serialize)] 406#[template(path = "get-backlinks-count.html.j2")] 407struct GetItemsCountResponse { 408 total: u64, 409 #[serde(skip_serializing)] 410 query: GetItemsCountQuery, 411} 412fn get_backlink_counts( 413 accept: ExtractAccept, 414 query: axum_extra::extract::Query<GetItemsCountQuery>, 415 store: impl LinkReader, 416) -> Result<impl IntoResponse, http::StatusCode> { 417 let Some((collection, path)) = query.source.split_once(':') else { 418 return Err(http::StatusCode::BAD_REQUEST); 419 }; 420 let path = format!(".{path}"); 421 let total = store 422 .get_count(&query.subject, collection, &path) 423 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 424 425 Ok(acceptable( 426 accept, 427 GetItemsCountResponse { 428 total, 429 query: (*query).clone(), 430 }, 431 )) 432} 433 434#[derive(Clone, Deserialize)] 435struct GetDidsCountQuery { 436 target: String, 437 collection: String, 438 path: String, 439} 440#[derive(Template, Serialize)] 441#[template(path = "dids-count.html.j2")] 442struct GetDidsCountResponse { 443 total: u64, 444 #[serde(skip_serializing)] 445 query: GetDidsCountQuery, 446} 447fn count_distinct_dids( 448 accept: ExtractAccept, 449 query: Query<GetDidsCountQuery>, 450 store: impl LinkReader, 451) -> Result<impl IntoResponse, http::StatusCode> { 452 let total = store 453 .get_distinct_did_count(&query.target, &query.collection, &query.path) 454 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 455 Ok(acceptable( 456 accept, 457 GetDidsCountResponse { 458 total, 459 query: (*query).clone(), 460 }, 461 )) 462} 463 464#[derive(Clone, Deserialize)] 465struct GetBacklinksQuery { 466 /// The link target 467 /// 468 /// can be an AT-URI, plain DID, or regular URI 469 subject: String, 470 /// Filter links only from this link source 471 /// 472 /// eg.: `app.bsky.feed.like:subject.uri` 473 source: String, 474 cursor: Option<OpaqueApiCursor>, 475 /// Filter links only from these DIDs 476 /// 477 /// include multiple times to filter by multiple source DIDs 478 #[serde(default)] 479 did: Vec<String>, 480 /// Set the max number of links to return per page of results 481 #[serde(default = "get_default_cursor_limit")] 482 limit: u64, 483 /// Allow returning links in reverse order (default: false) 484 #[serde(default)] 485 reverse: bool, 486} 487#[derive(Template, Serialize)] 488#[template(path = "get-backlinks.html.j2")] 489struct GetBacklinksResponse { 490 total: u64, 491 records: Vec<RecordId>, 492 cursor: Option<OpaqueApiCursor>, 493 #[serde(skip_serializing)] 494 query: GetBacklinksQuery, 495 #[serde(skip_serializing)] 496 collection: String, 497 #[serde(skip_serializing)] 498 path: String, 499} 500fn get_backlinks( 501 accept: ExtractAccept, 502 query: axum_extra::extract::Query<GetBacklinksQuery>, // supports multiple param occurrences 503 store: impl LinkReader, 504) -> Result<impl IntoResponse, http::StatusCode> { 505 let until = query 506 .cursor 507 .clone() 508 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 509 .transpose()? 510 .map(|c| c.next); 511 512 let limit = query.limit; 513 if limit > DEFAULT_CURSOR_LIMIT_MAX { 514 return Err(http::StatusCode::BAD_REQUEST); 515 } 516 517 let filter_dids: HashSet<Did> = HashSet::from_iter( 518 query 519 .did 520 .iter() 521 .map(|d| d.trim()) 522 .filter(|d| !d.is_empty()) 523 .map(|d| Did(d.to_string())), 524 ); 525 526 let Some((collection, path)) = query.source.split_once(':') else { 527 return Err(http::StatusCode::BAD_REQUEST); 528 }; 529 let path = format!(".{path}"); 530 531 let order = if query.reverse { 532 Order::OldestToNewest 533 } else { 534 Order::NewestToOldest 535 }; 536 537 let paged = store 538 .get_links( 539 &query.subject, 540 collection, 541 &path, 542 order, 543 limit, 544 until, 545 &filter_dids, 546 ) 547 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 548 549 let cursor = paged.next.map(|next| { 550 ApiCursor { 551 version: paged.version, 552 next, 553 } 554 .into() 555 }); 556 557 Ok(acceptable( 558 accept, 559 GetBacklinksResponse { 560 total: paged.total, 561 records: paged.items, 562 cursor, 563 query: (*query).clone(), 564 collection: collection.to_string(), 565 path, 566 }, 567 )) 568} 569 570#[derive(Clone, Deserialize)] 571struct GetLinkItemsQuery { 572 target: String, 573 collection: String, 574 path: String, 575 cursor: Option<OpaqueApiCursor>, 576 /// Filter links only from these DIDs 577 /// 578 /// include multiple times to filter by multiple source DIDs 579 #[serde(default)] 580 did: Vec<String>, 581 /// [deprecated] Filter links only from these DIDs 582 /// 583 /// format: comma-separated sequence of DIDs 584 /// 585 /// errors: if `did` parameter is also present 586 /// 587 /// deprecated: use `did`, which can be repeated multiple times 588 from_dids: Option<String>, // comma separated: gross 589 #[serde(default = "get_default_cursor_limit")] 590 limit: u64, 591} 592#[derive(Template, Serialize)] 593#[template(path = "links.html.j2")] 594struct GetLinkItemsResponse { 595 // what does staleness mean? 596 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or, 597 // - links have been deleted. hmm. 598 total: u64, 599 linking_records: Vec<RecordId>, 600 cursor: Option<OpaqueApiCursor>, 601 #[serde(skip_serializing)] 602 query: GetLinkItemsQuery, 603} 604fn get_links( 605 accept: ExtractAccept, 606 query: axum_extra::extract::Query<GetLinkItemsQuery>, // supports multiple param occurrences 607 store: impl LinkReader, 608) -> Result<impl IntoResponse, http::StatusCode> { 609 let until = query 610 .cursor 611 .clone() 612 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 613 .transpose()? 614 .map(|c| c.next); 615 616 let limit = query.limit; 617 if limit > DEFAULT_CURSOR_LIMIT_MAX { 618 return Err(http::StatusCode::BAD_REQUEST); 619 } 620 621 let mut filter_dids: HashSet<Did> = HashSet::from_iter( 622 query 623 .did 624 .iter() 625 .map(|d| d.trim()) 626 .filter(|d| !d.is_empty()) 627 .map(|d| Did(d.to_string())), 628 ); 629 630 if let Some(comma_joined) = &query.from_dids { 631 if !filter_dids.is_empty() { 632 return Err(http::StatusCode::BAD_REQUEST); 633 } 634 for did in comma_joined.split(',') { 635 filter_dids.insert(Did(did.to_string())); 636 } 637 } 638 639 let paged = store 640 .get_links( 641 &query.target, 642 &query.collection, 643 &query.path, 644 Order::NewestToOldest, 645 limit, 646 until, 647 &filter_dids, 648 ) 649 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 650 651 let cursor = paged.next.map(|next| { 652 ApiCursor { 653 version: paged.version, 654 next, 655 } 656 .into() 657 }); 658 659 Ok(acceptable( 660 accept, 661 GetLinkItemsResponse { 662 total: paged.total, 663 linking_records: paged.items, 664 cursor, 665 query: (*query).clone(), 666 }, 667 )) 668} 669 670#[derive(Clone, Deserialize)] 671struct GetDidItemsQuery { 672 target: String, 673 collection: String, 674 path: String, 675 cursor: Option<OpaqueApiCursor>, 676 limit: Option<u64>, 677 // TODO: allow reverse (er, forward) order as well 678} 679#[derive(Template, Serialize)] 680#[template(path = "dids.html.j2")] 681struct GetDidItemsResponse { 682 // what does staleness mean? 683 // - new links have appeared. would be nice to offer a `since` cursor to fetch these. and/or, 684 // - links have been deleted. hmm. 685 total: u64, 686 linking_dids: Vec<Did>, 687 cursor: Option<OpaqueApiCursor>, 688 #[serde(skip_serializing)] 689 query: GetDidItemsQuery, 690} 691fn get_distinct_dids( 692 accept: ExtractAccept, 693 query: Query<GetDidItemsQuery>, 694 store: impl LinkReader, 695) -> Result<impl IntoResponse, http::StatusCode> { 696 let until = query 697 .cursor 698 .clone() 699 .map(|oc| ApiCursor::try_from(oc).map_err(|_| http::StatusCode::BAD_REQUEST)) 700 .transpose()? 701 .map(|c| c.next); 702 703 let limit = query.limit.unwrap_or(DEFAULT_CURSOR_LIMIT); 704 if limit > DEFAULT_CURSOR_LIMIT_MAX { 705 return Err(http::StatusCode::BAD_REQUEST); 706 } 707 708 let paged = store 709 .get_distinct_dids(&query.target, &query.collection, &query.path, limit, until) 710 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 711 712 let cursor = paged.next.map(|next| { 713 ApiCursor { 714 version: paged.version, 715 next, 716 } 717 .into() 718 }); 719 720 Ok(acceptable( 721 accept, 722 GetDidItemsResponse { 723 total: paged.total, 724 linking_dids: paged.items, 725 cursor, 726 query: (*query).clone(), 727 }, 728 )) 729} 730 731#[derive(Clone, Deserialize)] 732struct GetAllLinksQuery { 733 target: String, 734} 735#[derive(Template, Serialize)] 736#[template(path = "links-all-count.html.j2")] 737struct GetAllLinksResponse { 738 links: HashMap<String, HashMap<String, u64>>, 739 #[serde(skip_serializing)] 740 query: GetAllLinksQuery, 741} 742fn count_all_links( 743 accept: ExtractAccept, 744 query: Query<GetAllLinksQuery>, 745 store: impl LinkReader, 746) -> Result<impl IntoResponse, http::StatusCode> { 747 let links = store 748 .get_all_record_counts(&query.target) 749 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 750 Ok(acceptable( 751 accept, 752 GetAllLinksResponse { 753 links, 754 query: (*query).clone(), 755 }, 756 )) 757} 758 759#[derive(Clone, Deserialize)] 760struct ExploreLinksQuery { 761 target: String, 762} 763#[derive(Template, Serialize)] 764#[template(path = "explore-links.html.j2")] 765struct ExploreLinksResponse { 766 links: HashMap<String, HashMap<String, CountsByCount>>, 767 #[serde(skip_serializing)] 768 query: ExploreLinksQuery, 769} 770fn explore_links( 771 accept: ExtractAccept, 772 query: Query<ExploreLinksQuery>, 773 store: impl LinkReader, 774) -> Result<impl IntoResponse, http::StatusCode> { 775 let links = store 776 .get_all_counts(&query.target) 777 .map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?; 778 Ok(acceptable( 779 accept, 780 ExploreLinksResponse { 781 links, 782 query: (*query).clone(), 783 }, 784 )) 785} 786 787#[serde_as] 788#[derive(Clone, Serialize, Deserialize)] // for json 789struct OpaqueApiCursor(#[serde_as(as = "serde_with::hex::Hex")] Vec<u8>); 790 791#[derive(Serialize, Deserialize)] // for bincode 792struct ApiCursor { 793 version: (u64, u64), // (collection length, deleted item count) 794 next: u64, 795} 796 797impl TryFrom<OpaqueApiCursor> for ApiCursor { 798 type Error = bincode::Error; 799 800 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> { 801 bincode::DefaultOptions::new().deserialize(&item.0) 802 } 803} 804 805impl From<ApiCursor> for OpaqueApiCursor { 806 fn from(item: ApiCursor) -> Self { 807 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap()) 808 } 809} 810 811#[derive(Serialize, Deserialize)] // for bincode 812struct ApiKeyedCursor { 813 next: String, // the key 814} 815 816impl TryFrom<OpaqueApiCursor> for ApiKeyedCursor { 817 type Error = bincode::Error; 818 819 fn try_from(item: OpaqueApiCursor) -> Result<Self, Self::Error> { 820 bincode::DefaultOptions::new().deserialize(&item.0) 821 } 822} 823 824impl From<ApiKeyedCursor> for OpaqueApiCursor { 825 fn from(item: ApiKeyedCursor) -> Self { 826 OpaqueApiCursor(bincode::DefaultOptions::new().serialize(&item).unwrap()) 827 } 828}