Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1use std::future::Future; 2use std::sync::Arc; 3 4use bobbin_resolver::{ 5 NormalizeRepoRefs, decode_canon_or_upgrade, normalize_record_fields, scrub_record_bytes, 6 upgrade_wire_bytes, 7}; 8 9use axum::{ 10 Router, 11 body::Body, 12 extract::{FromRequestParts, Query, RawQuery, State, rejection::QueryRejection}, 13 http::{ 14 HeaderMap, HeaderName, StatusCode, 15 header::{ 16 ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LENGTH, 17 CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_RANGE, 18 LAST_MODIFIED, RANGE, 19 }, 20 request::Parts, 21 }, 22 response::{IntoResponse, Json, Response}, 23 routing::get, 24}; 25use bobbin_edge_index::{ 26 Coverage, CoverageWatch, CursorParseError, EdgePage, EdgeStore, IssueStateKind, PageCursor, 27 PageLimit, PageToken, PullStatusKind, SortDir, StateIndex, StateKind, 28}; 29use bobbin_knot_proxy::{KnotHost, KnotProxy, KnotProxyError, ProxyResponse, RepoSlug}; 30use bobbin_record_lru::RecordStore; 31use bobbin_resolver::RepoIdResolver; 32use bobbin_search::{ 33 SearchCursor, SearchError, SearchFilters, SearchHit, SearchOffset, SearchReader, 34}; 35use bobbin_slingshot_client::{SlingshotClient, SlingshotError}; 36use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static}; 37use bobbin_types::record::RecordBody; 38use bobbin_types::search::SearchableRecord; 39use bobbin_types::sh_tangled::actor::profile::{Profile, ProfileGetRecordOutput, ProfileRecord}; 40use bobbin_types::sh_tangled::feed::comment::{ 41 Comment as FeedComment, CommentRecord as FeedCommentRecord, 42}; 43use bobbin_types::sh_tangled::feed::reaction::{Reaction, ReactionRecord}; 44use bobbin_types::sh_tangled::feed::star::{Star, StarRecord}; 45use bobbin_types::sh_tangled::git::ref_update::{RefUpdate, RefUpdateRecord}; 46use bobbin_types::sh_tangled::graph::follow::{Follow, FollowRecord}; 47use bobbin_types::sh_tangled::graph::vouch::{Vouch, VouchRecord}; 48use bobbin_types::sh_tangled::knot::member::{ 49 Member as KnotMember, MemberRecord as KnotMemberRecord, 50}; 51use bobbin_types::sh_tangled::knot::{Knot, KnotRecord}; 52use bobbin_types::sh_tangled::label::definition::{ 53 Definition as LabelDefinition, DefinitionRecord as LabelDefinitionRecord, 54}; 55use bobbin_types::sh_tangled::label::op::{Op as LabelOp, OpRecord as LabelOpRecord}; 56use bobbin_types::sh_tangled::pipeline::status::{ 57 Status as PipelineStatus, StatusRecord as PipelineStatusRecord, 58}; 59use bobbin_types::sh_tangled::pipeline::{Pipeline, PipelineRecord}; 60use bobbin_types::sh_tangled::public_key::{PublicKey, PublicKeyRecord}; 61use bobbin_types::sh_tangled::repo::artifact::{Artifact, ArtifactRecord}; 62use bobbin_types::sh_tangled::repo::collaborator::{Collaborator, CollaboratorRecord}; 63use bobbin_types::sh_tangled::repo::issue::state::{ 64 State as IssueState, StateRecord as IssueStateRecord, 65}; 66use bobbin_types::sh_tangled::repo::issue::{Issue, IssueGetRecordOutput, IssueRecord}; 67use bobbin_types::sh_tangled::repo::pull::status::{ 68 Status as PullStatus, StatusRecord as PullStatusRecord, 69}; 70use bobbin_types::sh_tangled::repo::pull::{Pull, PullGetRecordOutput, PullRecord}; 71use bobbin_types::sh_tangled::repo::{Repo, RepoGetRecordOutput, RepoRecord}; 72use bobbin_types::sh_tangled::spindle::member::{ 73 Member as SpindleMember, MemberRecord as SpindleMemberRecord, 74}; 75use bobbin_types::sh_tangled::spindle::{Spindle, SpindleRecord}; 76use bobbin_types::sh_tangled::string::{TangledString, TangledStringRecord}; 77use futures::Stream; 78use futures::stream::{self, StreamExt, TryStreamExt}; 79use jacquard_common::types::did::Did; 80use jacquard_common::types::ident::AtIdentifier; 81use jacquard_common::types::nsid::Nsid; 82use jacquard_common::types::recordkey::Rkey; 83use jacquard_common::types::string::{AtUri, Cid}; 84use jacquard_common::xrpc::XrpcResp; 85use jacquard_common::{DefaultStr, IntoStatic}; 86use serde::{Deserialize, Serialize}; 87use std::convert::Infallible; 88use std::time::Duration; 89use thiserror::Error; 90use url::form_urlencoded; 91 92use tower_http::classify::ServerErrorsFailureClass; 93use tower_http::trace::{DefaultMakeSpan, OnFailure, OnResponse, TraceLayer}; 94use tracing::{Level, Span}; 95 96mod backpressure; 97mod filter; 98 99pub use backpressure::{ 100 HeavyLimiter, HeavyPermit, MaxInFlight, PerRequestAnonBytes, PressureVerdict, ReservedFloor, 101}; 102use filter::{IssueFilter, ListFilter, NoFilter, PullFilter}; 103 104const DEFAULT_LIMIT: u32 = 50; 105const FETCH_CONCURRENCY: usize = 8; 106 107#[derive(Clone)] 108pub struct AppState { 109 pub records: Arc<dyn RecordStore>, 110 pub slingshot: SlingshotClient, 111 pub edges: Arc<EdgeStore>, 112 pub issue_states: Arc<StateIndex<IssueStateKind>>, 113 pub pull_statuses: Arc<StateIndex<PullStatusKind>>, 114 pub coverage: Arc<CoverageWatch>, 115 pub knots: Arc<KnotProxy>, 116 pub search: Arc<dyn SearchReader>, 117 pub resolver: Arc<RepoIdResolver>, 118 pub limiter: Option<Arc<HeavyLimiter>>, 119} 120 121impl AppState { 122 #[allow(clippy::too_many_arguments)] 123 pub fn new( 124 records: Arc<dyn RecordStore>, 125 slingshot: SlingshotClient, 126 edges: Arc<EdgeStore>, 127 issue_states: Arc<StateIndex<IssueStateKind>>, 128 pull_statuses: Arc<StateIndex<PullStatusKind>>, 129 coverage: Arc<CoverageWatch>, 130 knots: Arc<KnotProxy>, 131 search: Arc<dyn SearchReader>, 132 resolver: Arc<RepoIdResolver>, 133 ) -> Self { 134 Self { 135 records, 136 slingshot, 137 edges, 138 issue_states, 139 pull_statuses, 140 coverage, 141 knots, 142 search, 143 resolver, 144 limiter: None, 145 } 146 } 147 148 pub fn with_limiter(mut self, limiter: Option<Arc<HeavyLimiter>>) -> Self { 149 self.limiter = limiter; 150 self 151 } 152 153 fn heavy_permit(&self) -> Result<Option<HeavyPermit>, XrpcError> { 154 self.limiter.as_ref().map(|l| l.try_enter()).transpose() 155 } 156} 157 158pub fn router(state: AppState) -> Router { 159 Router::new() 160 .route("/xrpc/sh.tangled.repo.getRepo", get(get_repo)) 161 .route("/xrpc/sh.tangled.repo.getRepos", get(get_repos)) 162 .route( 163 "/xrpc/sh.tangled.repo.getRepoByRepoDid", 164 get(get_repo_by_repo_did), 165 ) 166 .route("/xrpc/sh.tangled.actor.getProfile", get(get_profile)) 167 .route("/xrpc/sh.tangled.actor.getProfiles", get(get_profiles)) 168 .route("/xrpc/sh.tangled.repo.getIssue", get(get_issue)) 169 .route("/xrpc/sh.tangled.repo.getIssues", get(get_issues)) 170 .route("/xrpc/sh.tangled.repo.getPull", get(get_pull)) 171 .route("/xrpc/sh.tangled.repo.getPulls", get(get_pulls)) 172 .route("/xrpc/sh.tangled.feed.listStars", get(list_stars)) 173 .route("/xrpc/sh.tangled.feed.countStars", get(count_stars)) 174 .route("/xrpc/sh.tangled.graph.listFollows", get(list_follows)) 175 .route("/xrpc/sh.tangled.graph.countFollows", get(count_follows)) 176 .route("/xrpc/sh.tangled.repo.listIssues", get(list_issues)) 177 .route("/xrpc/sh.tangled.repo.countIssues", get(count_issues)) 178 .route("/xrpc/sh.tangled.repo.listPulls", get(list_pulls)) 179 .route("/xrpc/sh.tangled.repo.countPulls", get(count_pulls)) 180 .route( 181 "/xrpc/sh.tangled.feed.listComments", 182 get(list_feed_comments), 183 ) 184 .route( 185 "/xrpc/sh.tangled.feed.countComments", 186 get(count_feed_comments), 187 ) 188 .route("/xrpc/sh.tangled.feed.listReactions", get(list_reactions)) 189 .route("/xrpc/sh.tangled.feed.countReactions", get(count_reactions)) 190 .route("/xrpc/sh.tangled.git.listRefUpdates", get(list_ref_updates)) 191 .route( 192 "/xrpc/sh.tangled.git.countRefUpdates", 193 get(count_ref_updates), 194 ) 195 .route( 196 "/xrpc/sh.tangled.repo.listCollaborators", 197 get(list_collaborators), 198 ) 199 .route( 200 "/xrpc/sh.tangled.repo.countCollaborators", 201 get(count_collaborators), 202 ) 203 .route( 204 "/xrpc/sh.tangled.repo.issue.listStates", 205 get(list_issue_states), 206 ) 207 .route( 208 "/xrpc/sh.tangled.repo.issue.countStates", 209 get(count_issue_states), 210 ) 211 .route( 212 "/xrpc/sh.tangled.repo.pull.listStatuses", 213 get(list_pull_statuses), 214 ) 215 .route( 216 "/xrpc/sh.tangled.repo.pull.countStatuses", 217 get(count_pull_statuses), 218 ) 219 .route("/xrpc/sh.tangled.repo.listRepos", get(list_repos)) 220 .route("/xrpc/sh.tangled.repo.countRepos", get(count_repos)) 221 .route("/xrpc/sh.tangled.knot.listKnots", get(list_knots)) 222 .route("/xrpc/sh.tangled.knot.countKnots", get(count_knots)) 223 .route("/xrpc/sh.tangled.spindle.listSpindles", get(list_spindles)) 224 .route( 225 "/xrpc/sh.tangled.spindle.countSpindles", 226 get(count_spindles), 227 ) 228 .route("/xrpc/sh.tangled.publicKey.listKeys", get(list_public_keys)) 229 .route( 230 "/xrpc/sh.tangled.publicKey.countKeys", 231 get(count_public_keys), 232 ) 233 .route("/xrpc/sh.tangled.graph.listVouches", get(list_vouches)) 234 .route("/xrpc/sh.tangled.graph.countVouches", get(count_vouches)) 235 .route("/xrpc/sh.tangled.feed.listStarsBy", get(list_stars_by)) 236 .route("/xrpc/sh.tangled.feed.countStarsBy", get(count_stars_by)) 237 .route( 238 "/xrpc/sh.tangled.feed.listReactionsBy", 239 get(list_reactions_by), 240 ) 241 .route( 242 "/xrpc/sh.tangled.feed.countReactionsBy", 243 get(count_reactions_by), 244 ) 245 .route("/xrpc/sh.tangled.graph.listFollowsBy", get(list_follows_by)) 246 .route( 247 "/xrpc/sh.tangled.graph.countFollowsBy", 248 get(count_follows_by), 249 ) 250 .route("/xrpc/sh.tangled.graph.listVouchesBy", get(list_vouches_by)) 251 .route( 252 "/xrpc/sh.tangled.graph.countVouchesBy", 253 get(count_vouches_by), 254 ) 255 .route( 256 "/xrpc/sh.tangled.git.listRefUpdatesBy", 257 get(list_ref_updates_by), 258 ) 259 .route( 260 "/xrpc/sh.tangled.git.countRefUpdatesBy", 261 get(count_ref_updates_by), 262 ) 263 .route( 264 "/xrpc/sh.tangled.knot.listMembersBy", 265 get(list_knot_members_by), 266 ) 267 .route( 268 "/xrpc/sh.tangled.knot.countMembersBy", 269 get(count_knot_members_by), 270 ) 271 .route("/xrpc/sh.tangled.label.listOpsBy", get(list_label_ops_by)) 272 .route("/xrpc/sh.tangled.label.countOpsBy", get(count_label_ops_by)) 273 .route( 274 "/xrpc/sh.tangled.pipeline.listPipelinesBy", 275 get(list_pipelines_by), 276 ) 277 .route( 278 "/xrpc/sh.tangled.pipeline.countPipelinesBy", 279 get(count_pipelines_by), 280 ) 281 .route( 282 "/xrpc/sh.tangled.pipeline.listStatusesBy", 283 get(list_pipeline_statuses_by), 284 ) 285 .route( 286 "/xrpc/sh.tangled.pipeline.countStatusesBy", 287 get(count_pipeline_statuses_by), 288 ) 289 .route( 290 "/xrpc/sh.tangled.repo.listArtifactsBy", 291 get(list_artifacts_by), 292 ) 293 .route( 294 "/xrpc/sh.tangled.repo.countArtifactsBy", 295 get(count_artifacts_by), 296 ) 297 .route( 298 "/xrpc/sh.tangled.repo.listCollaboratorsBy", 299 get(list_collaborators_by), 300 ) 301 .route( 302 "/xrpc/sh.tangled.repo.countCollaboratorsBy", 303 get(count_collaborators_by), 304 ) 305 .route("/xrpc/sh.tangled.repo.listIssuesBy", get(list_issues_by)) 306 .route("/xrpc/sh.tangled.repo.countIssuesBy", get(count_issues_by)) 307 .route( 308 "/xrpc/sh.tangled.feed.listCommentsBy", 309 get(list_feed_comments_by), 310 ) 311 .route( 312 "/xrpc/sh.tangled.feed.countCommentsBy", 313 get(count_feed_comments_by), 314 ) 315 .route( 316 "/xrpc/sh.tangled.repo.issue.listStatesBy", 317 get(list_issue_states_by), 318 ) 319 .route( 320 "/xrpc/sh.tangled.repo.issue.countStatesBy", 321 get(count_issue_states_by), 322 ) 323 .route("/xrpc/sh.tangled.repo.listPullsBy", get(list_pulls_by)) 324 .route("/xrpc/sh.tangled.repo.countPullsBy", get(count_pulls_by)) 325 .route( 326 "/xrpc/sh.tangled.repo.pull.listStatusesBy", 327 get(list_pull_statuses_by), 328 ) 329 .route( 330 "/xrpc/sh.tangled.repo.pull.countStatusesBy", 331 get(count_pull_statuses_by), 332 ) 333 .route( 334 "/xrpc/sh.tangled.spindle.listMembersBy", 335 get(list_spindle_members_by), 336 ) 337 .route( 338 "/xrpc/sh.tangled.spindle.countMembersBy", 339 get(count_spindle_members_by), 340 ) 341 .route( 342 "/xrpc/sh.tangled.label.listDefinitions", 343 get(list_label_definitions), 344 ) 345 .route( 346 "/xrpc/sh.tangled.label.countDefinitions", 347 get(count_label_definitions), 348 ) 349 .route("/xrpc/sh.tangled.label.listOps", get(list_label_ops)) 350 .route("/xrpc/sh.tangled.label.countOps", get(count_label_ops)) 351 .route( 352 "/xrpc/sh.tangled.pipeline.listPipelines", 353 get(list_pipelines), 354 ) 355 .route( 356 "/xrpc/sh.tangled.pipeline.countPipelines", 357 get(count_pipelines), 358 ) 359 .route( 360 "/xrpc/sh.tangled.pipeline.listStatuses", 361 get(list_pipeline_statuses), 362 ) 363 .route( 364 "/xrpc/sh.tangled.pipeline.countStatuses", 365 get(count_pipeline_statuses), 366 ) 367 .route("/xrpc/sh.tangled.repo.listArtifacts", get(list_artifacts)) 368 .route("/xrpc/sh.tangled.repo.countArtifacts", get(count_artifacts)) 369 .route("/xrpc/sh.tangled.knot.listMembers", get(list_knot_members)) 370 .route( 371 "/xrpc/sh.tangled.knot.countMembers", 372 get(count_knot_members), 373 ) 374 .route( 375 "/xrpc/sh.tangled.spindle.listMembers", 376 get(list_spindle_members), 377 ) 378 .route( 379 "/xrpc/sh.tangled.spindle.countMembers", 380 get(count_spindle_members), 381 ) 382 .route("/xrpc/sh.tangled.string.listStrings", get(list_strings)) 383 .route("/xrpc/sh.tangled.string.countStrings", get(count_strings)) 384 .route("/xrpc/sh.tangled.search.query", get(search_query)) 385 .route("/xrpc/sh.tangled.bobbin.getCoverage", get(get_coverage)) 386 .route( 387 "/xrpc/com.bad-example.identity.resolveMiniDoc", 388 get(resolve_mini_doc), 389 ) 390 .merge(knot_proxied_routes()) 391 .layer( 392 TraceLayer::new_for_http() 393 .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) 394 .on_request(()) 395 .on_response(LatencyFreeTrace) 396 .on_failure(LatencyFreeTrace), 397 ) 398 .with_state(state) 399} 400 401#[derive(Clone, Copy, Debug)] 402struct LatencyFreeTrace; 403 404impl<B> OnResponse<B> for LatencyFreeTrace { 405 fn on_response(self, response: &Response<B>, _latency: Duration, _span: &Span) { 406 tracing::event!( 407 target: "tower_http::trace::on_response", 408 Level::INFO, 409 status = response.status().as_u16(), 410 "request completed", 411 ); 412 } 413} 414 415impl OnFailure<ServerErrorsFailureClass> for LatencyFreeTrace { 416 fn on_failure(&mut self, error: ServerErrorsFailureClass, _latency: Duration, _span: &Span) { 417 tracing::event!( 418 target: "tower_http::trace::on_failure", 419 Level::WARN, 420 error = %error, 421 "request failed", 422 ); 423 } 424} 425 426const REPO_PROXIED_NSIDS: &[&str] = &[ 427 "sh.tangled.repo.archive", 428 "sh.tangled.repo.blob", 429 "sh.tangled.repo.branch", 430 "sh.tangled.repo.branches", 431 "sh.tangled.repo.compare", 432 "sh.tangled.repo.describeRepo", 433 "sh.tangled.repo.diff", 434 "sh.tangled.repo.getDefaultBranch", 435 "sh.tangled.repo.languages", 436 "sh.tangled.repo.listSecrets", 437 "sh.tangled.repo.log", 438 "sh.tangled.repo.tag", 439 "sh.tangled.repo.tags", 440 "sh.tangled.repo.tree", 441]; 442 443const KNOT_PROXIED_NSIDS: &[&str] = &[ 444 "sh.tangled.owner", 445 "sh.tangled.knot.version", 446 "sh.tangled.knot.listKeys", 447]; 448 449const PASSTHROUGH_HEADERS: &[&HeaderName] = &[ 450 &CONTENT_TYPE, 451 &CONTENT_LENGTH, 452 &CONTENT_ENCODING, 453 &ETAG, 454 &CACHE_CONTROL, 455 &LAST_MODIFIED, 456 &CONTENT_DISPOSITION, 457 &ACCEPT_RANGES, 458 &CONTENT_RANGE, 459]; 460 461const FORWARDED_REQUEST_HEADERS: &[&HeaderName] = 462 &[&RANGE, &IF_RANGE, &IF_NONE_MATCH, &IF_MODIFIED_SINCE]; 463 464const KNOT_HOST_PARAM: &str = "knot"; 465const REPO_PARAM: &str = "repo"; 466 467type ProxyParams = Vec<(String, String)>; 468 469fn knot_proxied_routes() -> Router<AppState> { 470 let with_repo = register_proxied(Router::new(), REPO_PROXIED_NSIDS, proxy_repo_handler); 471 register_proxied(with_repo, KNOT_PROXIED_NSIDS, proxy_knot_handler) 472} 473 474fn register_proxied<H, Fut>( 475 router: Router<AppState>, 476 nsids: &[&'static str], 477 handler: H, 478) -> Router<AppState> 479where 480 H: Fn(AppState, HeaderMap, ProxyParams, Nsid<DefaultStr>) -> Fut 481 + Clone 482 + Send 483 + Sync 484 + 'static, 485 Fut: Future<Output = Result<Response, XrpcError>> + Send + 'static, 486{ 487 nsids.iter().fold(router, |router, &nsid_lit| { 488 let handler = handler.clone(); 489 let nsid = nsid_static(nsid_lit); 490 router.route( 491 &format!("/xrpc/{nsid_lit}"), 492 get( 493 move |State(state): State<AppState>, 494 headers: HeaderMap, 495 Query(params): Query<ProxyParams>| { 496 handler(state, headers, params, nsid.clone()) 497 }, 498 ), 499 ) 500 }) 501} 502 503#[derive(Clone, Debug)] 504pub enum SubjectQuery { 505 Did(Did<DefaultStr>), 506 Uri(AtUri<DefaultStr>), 507} 508 509impl<'de> Deserialize<'de> for SubjectQuery { 510 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 511 where 512 D: serde::Deserializer<'de>, 513 { 514 let raw = String::deserialize(deserializer)?; 515 if let Ok(did) = Did::<DefaultStr>::new_owned(&raw) { 516 return Ok(Self::Did(did)); 517 } 518 AtUri::<DefaultStr>::new_owned(&raw) 519 .map(Self::Uri) 520 .map_err(serde::de::Error::custom) 521 } 522} 523 524#[derive(Clone, Debug, Eq, PartialEq)] 525pub struct ExpectedNsid { 526 canon: Nsid<DefaultStr>, 527 aliases: &'static [&'static str], 528} 529 530const FEED_COMMENT_LEGACY_ALIASES: &[&str] = &[ 531 "sh.tangled.repo.issue.comment", 532 "sh.tangled.repo.pull.comment", 533]; 534 535fn aliases_for(nsid: &str) -> &'static [&'static str] { 536 match nsid { 537 "sh.tangled.feed.comment" => FEED_COMMENT_LEGACY_ALIASES, 538 _ => &[], 539 } 540} 541 542impl ExpectedNsid { 543 pub fn new(nsid: Nsid<DefaultStr>) -> Self { 544 let aliases = aliases_for(nsid.as_ref()); 545 Self { 546 canon: nsid, 547 aliases, 548 } 549 } 550 551 pub fn from_static(s: &'static str) -> Self { 552 let canon = nsid_static(s); 553 let aliases = aliases_for(s); 554 Self { canon, aliases } 555 } 556 557 pub fn as_nsid(&self) -> &Nsid<DefaultStr> { 558 &self.canon 559 } 560 561 pub fn as_str(&self) -> &str { 562 self.canon.as_ref() 563 } 564 565 fn accepts(&self, other: &str) -> bool { 566 other == self.canon.as_ref() || self.aliases.contains(&other) 567 } 568} 569 570#[derive(Debug, Deserialize)] 571struct GetRepoQuery { 572 repo: AtUri<DefaultStr>, 573} 574 575#[derive(Debug, Deserialize)] 576struct GetRepoByRepoDidQuery { 577 #[serde(rename = "repoDid")] 578 repo_did: Did<DefaultStr>, 579} 580 581#[derive(Debug, Deserialize)] 582struct GetProfileQuery { 583 actor: AtUri<DefaultStr>, 584} 585 586#[derive(Debug, Deserialize)] 587struct GetIssueQuery { 588 issue: AtUri<DefaultStr>, 589} 590 591#[derive(Debug, Deserialize)] 592struct GetPullQuery { 593 pull: AtUri<DefaultStr>, 594} 595 596#[derive(Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)] 597#[serde(rename_all = "lowercase")] 598enum Order { 599 Asc, 600 #[default] 601 Desc, 602} 603 604impl From<Order> for SortDir { 605 fn from(o: Order) -> Self { 606 match o { 607 Order::Asc => SortDir::Asc, 608 Order::Desc => SortDir::Desc, 609 } 610 } 611} 612 613#[derive(Debug, Deserialize)] 614struct TypedListQuery<F> { 615 subject: SubjectQuery, 616 cursor: Option<String>, 617 limit: Option<u32>, 618 #[serde(default)] 619 order: Order, 620 #[serde(flatten)] 621 filter: F, 622} 623 624impl<F> TypedListQuery<F> { 625 fn dir(&self) -> SortDir { 626 self.order.into() 627 } 628} 629 630#[derive(Debug, Deserialize)] 631struct CountQuery { 632 subject: SubjectQuery, 633} 634 635#[derive(Debug, Deserialize)] 636struct SearchQueryParams { 637 q: String, 638 nsid: Option<Nsid<DefaultStr>>, 639 author: Option<Did<DefaultStr>>, 640 repo: Option<Did<DefaultStr>>, 641 since: Option<String>, 642 until: Option<String>, 643 cursor: Option<String>, 644 limit: Option<u32>, 645} 646 647pub struct XrpcQuery<T>(pub T); 648 649impl<S, T> FromRequestParts<S> for XrpcQuery<T> 650where 651 S: Send + Sync, 652 T: serde::de::DeserializeOwned + Send + 'static, 653{ 654 type Rejection = XrpcError; 655 656 async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> { 657 Query::<T>::from_request_parts(parts, state) 658 .await 659 .map(|Query(t)| Self(t)) 660 .map_err(|rej: QueryRejection| XrpcError::InvalidParams(rej.body_text())) 661 } 662} 663 664#[derive(Debug, Error)] 665pub enum XrpcError { 666 #[error("invalid request: {0}")] 667 InvalidParams(String), 668 #[error("record not found")] 669 NotFound, 670 #[error("upstream unavailable: {0}")] 671 UpstreamUnavailable(String), 672 #[error("upstream gone: {0}")] 673 UpstreamGone(String), 674 #[error("invalid record: {0}")] 675 InvalidRecord(String), 676 #[error("internal: {0}")] 677 Internal(String), 678 #[error("overloaded, shedding under memory pressure")] 679 Overloaded, 680} 681 682impl XrpcError { 683 pub fn overloaded() -> Self { 684 Self::Overloaded 685 } 686} 687 688#[derive(Serialize)] 689struct ErrorBody { 690 error: &'static str, 691 message: String, 692} 693 694impl IntoResponse for XrpcError { 695 fn into_response(self) -> Response { 696 let (status, error) = match &self { 697 Self::InvalidParams(_) => (StatusCode::BAD_REQUEST, "InvalidRequest"), 698 Self::NotFound => (StatusCode::NOT_FOUND, "RecordNotFound"), 699 Self::UpstreamUnavailable(_) => (StatusCode::BAD_GATEWAY, "UpstreamFailed"), 700 Self::UpstreamGone(_) => (StatusCode::BAD_GATEWAY, "UpstreamGone"), 701 Self::InvalidRecord(_) => (StatusCode::BAD_GATEWAY, "InvalidRecord"), 702 Self::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "InternalError"), 703 Self::Overloaded => (StatusCode::SERVICE_UNAVAILABLE, "Overloaded"), 704 }; 705 let body = ErrorBody { 706 error, 707 message: self.to_string(), 708 }; 709 (status, Json(body)).into_response() 710 } 711} 712 713#[derive(Serialize)] 714#[serde(rename_all = "camelCase")] 715struct CoverageEnvelope { 716 ready: bool, 717 events_processed: u64, 718 last_cursor: u64, 719} 720 721impl From<Coverage> for CoverageEnvelope { 722 fn from(c: Coverage) -> Self { 723 Self { 724 ready: c.is_ready(), 725 events_processed: c.events_processed(), 726 last_cursor: c.last_cursor().raw(), 727 } 728 } 729} 730 731#[derive(Serialize)] 732#[serde(rename_all = "camelCase")] 733struct RecordView<V> { 734 uri: AtUri<DefaultStr>, 735 cid: Option<Cid<DefaultStr>>, 736 value: V, 737} 738 739#[derive(Serialize)] 740#[serde(rename_all = "camelCase")] 741struct StatefulItem<V> { 742 #[serde(flatten)] 743 view: RecordView<V>, 744 state: &'static str, 745 #[serde(skip_serializing_if = "Option::is_none")] 746 state_updated_at: Option<String>, 747 comment_count: u64, 748} 749 750fn format_micros(micros: u64) -> String { 751 let signed = i64::try_from(micros).ok(); 752 let rfc = signed 753 .and_then(chrono::DateTime::<chrono::Utc>::from_timestamp_micros) 754 .map(|dt| dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true)); 755 rfc.unwrap_or_else(|| micros.to_string()) 756} 757 758pub(crate) fn source_authority_did(source: &AtUri<DefaultStr>) -> Option<Did<DefaultStr>> { 759 match source.authority() { 760 AtIdentifier::Did(d) => Some(d.clone().into_static()), 761 AtIdentifier::Handle(_) => None, 762 } 763} 764 765fn enrich_issue_view( 766 state: &AppState, 767 view: RecordView<Issue<DefaultStr>>, 768) -> StatefulItem<Issue<DefaultStr>> { 769 let issue_author = source_authority_did(&view.uri); 770 let repo_did = view.value.repo.clone(); 771 enrich_view( 772 &state.edges, 773 nsid_static("sh.tangled.feed.comment"), 774 &state.issue_states, 775 view, 776 move |src| accept_state_source(src, issue_author.as_ref(), &repo_did), 777 ) 778} 779 780fn enrich_pull_view( 781 state: &AppState, 782 view: RecordView<Pull<DefaultStr>>, 783) -> StatefulItem<Pull<DefaultStr>> { 784 let pull_author = source_authority_did(&view.uri); 785 let target_repo = view.value.target.repo.clone(); 786 enrich_view( 787 &state.edges, 788 nsid_static("sh.tangled.feed.comment"), 789 &state.pull_statuses, 790 view, 791 move |src| accept_state_source(src, pull_author.as_ref(), &target_repo), 792 ) 793} 794 795pub(crate) fn accept_state_source( 796 source: &AtUri<DefaultStr>, 797 entity_author: Option<&Did<DefaultStr>>, 798 repo_owner: &Did<DefaultStr>, 799) -> bool { 800 let Some(src) = source_authority_did(source) else { 801 return false; 802 }; 803 Some(&src) == entity_author || &src == repo_owner 804} 805 806fn enrich_view<V, K, F>( 807 edges: &EdgeStore, 808 comment_nsid: Nsid<DefaultStr>, 809 states: &StateIndex<K>, 810 view: RecordView<V>, 811 accept: F, 812) -> StatefulItem<V> 813where 814 K: StateKind + Default, 815 F: Fn(&AtUri<DefaultStr>) -> bool, 816{ 817 let comment_count = edges.count(&EdgeKey::new( 818 comment_nsid, 819 SubjectRef::Uri(view.uri.clone()), 820 )); 821 let (state, state_updated_at) = states 822 .latest_by(&view.uri, accept) 823 .map_or((K::default().wire(), None), |(kind, micros)| { 824 (kind.wire(), Some(format_micros(micros))) 825 }); 826 StatefulItem { 827 view, 828 state, 829 state_updated_at, 830 comment_count, 831 } 832} 833 834#[derive(Serialize)] 835#[serde(rename_all = "camelCase")] 836struct CountResponse { 837 count: u64, 838 distinct_authors: u64, 839} 840 841#[derive(Serialize)] 842#[serde(rename_all = "camelCase")] 843struct SearchHitView { 844 uri: AtUri<DefaultStr>, 845 cid: Option<Cid<DefaultStr>>, 846 nsid: Nsid<DefaultStr>, 847 score: f32, 848 value: SearchableRecord, 849} 850 851fn map_slingshot(err: SlingshotError) -> XrpcError { 852 use SlingshotError as E; 853 match err { 854 E::NotFound => XrpcError::NotFound, 855 e @ (E::Decode(_) 856 | E::MissingField(_) 857 | E::InvalidAtUri(_) 858 | E::InvalidCid(_) 859 | E::UriMismatch { .. }) => XrpcError::InvalidRecord(e.to_string()), 860 e @ (E::Network(_) 861 | E::Build(_) 862 | E::Upstream(_) 863 | E::BodyTooLarge { .. } 864 | E::BadScheme(_)) => XrpcError::UpstreamUnavailable(e.to_string()), 865 } 866} 867 868fn parse_uri(raw: &str) -> Result<AtUri<DefaultStr>, XrpcError> { 869 AtUri::<DefaultStr>::new_owned(raw).map_err(|e| XrpcError::InvalidParams(format!("uri: {e}"))) 870} 871 872#[derive(Clone, Copy, Debug, Eq, PartialEq)] 873pub enum SubjectShape { 874 BareDid, 875 Collection(&'static str), 876 BareDidOrOneOfCollections(&'static [&'static str]), 877 OneOfCollections(&'static [&'static str]), 878 AnyAtUri, 879} 880 881pub trait HasSubject { 882 const SHAPE: SubjectShape; 883} 884 885pub trait MirrorOf { 886 type Record: XrpcResp; 887 const EDGE_KIND: &'static str; 888 const SHAPE: SubjectShape; 889} 890 891pub struct StarBy; 892pub struct ReactionBy; 893pub struct FollowBy; 894pub struct VouchBy; 895pub struct RefUpdateBy; 896pub struct KnotMemberBy; 897pub struct LabelOpBy; 898pub struct PipelineBy; 899pub struct PipelineStatusBy; 900pub struct ArtifactBy; 901pub struct CollaboratorBy; 902pub struct FeedCommentBy; 903pub struct IssueBy; 904pub struct IssueStateBy; 905pub struct PullBy; 906pub struct PullStatusBy; 907pub struct SpindleMemberBy; 908 909impl MirrorOf for StarBy { 910 type Record = StarRecord; 911 const EDGE_KIND: &'static str = "sh.tangled.feed.star.by"; 912 const SHAPE: SubjectShape = SubjectShape::BareDid; 913} 914impl MirrorOf for ReactionBy { 915 type Record = ReactionRecord; 916 const EDGE_KIND: &'static str = "sh.tangled.feed.reaction.by"; 917 const SHAPE: SubjectShape = SubjectShape::BareDid; 918} 919impl MirrorOf for FollowBy { 920 type Record = FollowRecord; 921 const EDGE_KIND: &'static str = "sh.tangled.graph.follow.by"; 922 const SHAPE: SubjectShape = SubjectShape::BareDid; 923} 924impl MirrorOf for VouchBy { 925 type Record = VouchRecord; 926 const EDGE_KIND: &'static str = "sh.tangled.graph.vouch.by"; 927 const SHAPE: SubjectShape = SubjectShape::BareDid; 928} 929impl MirrorOf for RefUpdateBy { 930 type Record = RefUpdateRecord; 931 const EDGE_KIND: &'static str = "sh.tangled.git.refUpdate.by"; 932 const SHAPE: SubjectShape = SubjectShape::BareDid; 933} 934impl MirrorOf for KnotMemberBy { 935 type Record = KnotMemberRecord; 936 const EDGE_KIND: &'static str = "sh.tangled.knot.member.by"; 937 const SHAPE: SubjectShape = SubjectShape::BareDid; 938} 939impl MirrorOf for LabelOpBy { 940 type Record = LabelOpRecord; 941 const EDGE_KIND: &'static str = "sh.tangled.label.op.by"; 942 const SHAPE: SubjectShape = SubjectShape::BareDid; 943} 944impl MirrorOf for PipelineBy { 945 type Record = PipelineRecord; 946 const EDGE_KIND: &'static str = "sh.tangled.pipeline.by"; 947 const SHAPE: SubjectShape = SubjectShape::BareDid; 948} 949impl MirrorOf for PipelineStatusBy { 950 type Record = PipelineStatusRecord; 951 const EDGE_KIND: &'static str = "sh.tangled.pipeline.status.by"; 952 const SHAPE: SubjectShape = SubjectShape::BareDid; 953} 954impl MirrorOf for ArtifactBy { 955 type Record = ArtifactRecord; 956 const EDGE_KIND: &'static str = "sh.tangled.repo.artifact.by"; 957 const SHAPE: SubjectShape = SubjectShape::BareDid; 958} 959impl MirrorOf for CollaboratorBy { 960 type Record = CollaboratorRecord; 961 const EDGE_KIND: &'static str = "sh.tangled.repo.collaborator.by"; 962 const SHAPE: SubjectShape = SubjectShape::BareDid; 963} 964impl MirrorOf for FeedCommentBy { 965 type Record = FeedCommentRecord; 966 const EDGE_KIND: &'static str = "sh.tangled.feed.comment.by"; 967 const SHAPE: SubjectShape = SubjectShape::BareDid; 968} 969impl MirrorOf for IssueBy { 970 type Record = IssueRecord; 971 const EDGE_KIND: &'static str = "sh.tangled.repo.issue.by"; 972 const SHAPE: SubjectShape = SubjectShape::BareDid; 973} 974impl MirrorOf for IssueStateBy { 975 type Record = IssueStateRecord; 976 const EDGE_KIND: &'static str = "sh.tangled.repo.issue.state.by"; 977 const SHAPE: SubjectShape = SubjectShape::BareDid; 978} 979impl MirrorOf for PullBy { 980 type Record = PullRecord; 981 const EDGE_KIND: &'static str = "sh.tangled.repo.pull.by"; 982 const SHAPE: SubjectShape = SubjectShape::BareDid; 983} 984impl MirrorOf for PullStatusBy { 985 type Record = PullStatusRecord; 986 const EDGE_KIND: &'static str = "sh.tangled.repo.pull.status.by"; 987 const SHAPE: SubjectShape = SubjectShape::BareDid; 988} 989impl MirrorOf for SpindleMemberBy { 990 type Record = SpindleMemberRecord; 991 const EDGE_KIND: &'static str = "sh.tangled.spindle.member.by"; 992 const SHAPE: SubjectShape = SubjectShape::BareDid; 993} 994 995impl HasSubject for StarRecord { 996 const SHAPE: SubjectShape = SubjectShape::BareDidOrOneOfCollections(&["sh.tangled.string"]); 997} 998impl HasSubject for FollowRecord { 999 const SHAPE: SubjectShape = SubjectShape::BareDid; 1000} 1001impl HasSubject for IssueRecord { 1002 const SHAPE: SubjectShape = SubjectShape::BareDid; 1003} 1004impl HasSubject for PullRecord { 1005 const SHAPE: SubjectShape = SubjectShape::BareDid; 1006} 1007impl HasSubject for FeedCommentRecord { 1008 const SHAPE: SubjectShape = SubjectShape::OneOfCollections(&[ 1009 "sh.tangled.repo.issue", 1010 "sh.tangled.repo.pull", 1011 "sh.tangled.string", 1012 ]); 1013} 1014impl HasSubject for LabelDefinitionRecord { 1015 const SHAPE: SubjectShape = SubjectShape::BareDid; 1016} 1017impl HasSubject for LabelOpRecord { 1018 const SHAPE: SubjectShape = 1019 SubjectShape::OneOfCollections(&["sh.tangled.repo.issue", "sh.tangled.repo.pull"]); 1020} 1021impl HasSubject for PipelineRecord { 1022 const SHAPE: SubjectShape = SubjectShape::BareDid; 1023} 1024impl HasSubject for PipelineStatusRecord { 1025 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.pipeline"); 1026} 1027impl HasSubject for ArtifactRecord { 1028 const SHAPE: SubjectShape = SubjectShape::BareDid; 1029} 1030impl HasSubject for KnotMemberRecord { 1031 const SHAPE: SubjectShape = SubjectShape::BareDid; 1032} 1033impl HasSubject for SpindleMemberRecord { 1034 const SHAPE: SubjectShape = SubjectShape::BareDid; 1035} 1036impl HasSubject for TangledStringRecord { 1037 const SHAPE: SubjectShape = SubjectShape::BareDid; 1038} 1039impl HasSubject for ReactionRecord { 1040 const SHAPE: SubjectShape = SubjectShape::AnyAtUri; 1041} 1042impl HasSubject for RefUpdateRecord { 1043 const SHAPE: SubjectShape = SubjectShape::BareDid; 1044} 1045impl HasSubject for CollaboratorRecord { 1046 const SHAPE: SubjectShape = SubjectShape::BareDid; 1047} 1048impl HasSubject for IssueStateRecord { 1049 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.repo.issue"); 1050} 1051impl HasSubject for PullStatusRecord { 1052 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.repo.pull"); 1053} 1054impl HasSubject for KnotRecord { 1055 const SHAPE: SubjectShape = SubjectShape::BareDid; 1056} 1057impl HasSubject for SpindleRecord { 1058 const SHAPE: SubjectShape = SubjectShape::BareDid; 1059} 1060impl HasSubject for PublicKeyRecord { 1061 const SHAPE: SubjectShape = SubjectShape::BareDid; 1062} 1063impl HasSubject for RepoRecord { 1064 const SHAPE: SubjectShape = SubjectShape::BareDid; 1065} 1066impl HasSubject for VouchRecord { 1067 const SHAPE: SubjectShape = SubjectShape::BareDid; 1068} 1069 1070fn parse_subject(raw: &SubjectQuery, shape: SubjectShape) -> Result<SubjectRef, XrpcError> { 1071 let uri = match raw { 1072 SubjectQuery::Did(did) => { 1073 return match shape { 1074 SubjectShape::BareDid | SubjectShape::BareDidOrOneOfCollections(_) => { 1075 Ok(SubjectRef::Did(did.clone())) 1076 } 1077 SubjectShape::Collection(expected) => Err(XrpcError::InvalidParams(format!( 1078 "subject must be at://<did>/{expected}/<rkey>, got bare did" 1079 ))), 1080 SubjectShape::OneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!( 1081 "subject must be at://<did>/<nsid>/<rkey> with nsid in [{}], got bare did", 1082 allowed.join(", "), 1083 ))), 1084 SubjectShape::AnyAtUri => Err(XrpcError::InvalidParams( 1085 "subject must be at-uri form, got bare did".into(), 1086 )), 1087 }; 1088 } 1089 SubjectQuery::Uri(uri) => uri, 1090 }; 1091 if matches!(uri.authority(), AtIdentifier::Handle(_)) { 1092 return Err(XrpcError::InvalidParams( 1093 "subject authority must be a did, not a handle".into(), 1094 )); 1095 } 1096 let Some(collection) = uri.collection() else { 1097 return Err(XrpcError::InvalidParams( 1098 "subject must be a bare did or full at://<did>/<nsid>/<rkey>".into(), 1099 )); 1100 }; 1101 let c = collection.as_ref(); 1102 match shape { 1103 SubjectShape::BareDid => Err(XrpcError::InvalidParams(format!( 1104 "subject must be a bare did, got at-uri with collection {c}" 1105 ))), 1106 SubjectShape::Collection(expected) if c == expected => { 1107 require_rkey(uri, expected)?; 1108 Ok(SubjectRef::Uri(uri.clone())) 1109 } 1110 SubjectShape::Collection(expected) => Err(XrpcError::InvalidParams(format!( 1111 "subject must be at://<did>/{expected}/<rkey>, got collection {c}" 1112 ))), 1113 SubjectShape::OneOfCollections(allowed) if allowed.contains(&c) => { 1114 require_rkey(uri, c)?; 1115 Ok(SubjectRef::Uri(uri.clone())) 1116 } 1117 SubjectShape::OneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!( 1118 "subject must be at://<did>/<nsid>/<rkey> with nsid in [{}], got collection {c}", 1119 allowed.join(", "), 1120 ))), 1121 SubjectShape::BareDidOrOneOfCollections(allowed) if allowed.contains(&c) => { 1122 require_rkey(uri, c)?; 1123 Ok(SubjectRef::Uri(uri.clone())) 1124 } 1125 SubjectShape::BareDidOrOneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!( 1126 "subject must be a bare did or at://<did>/<nsid>/<rkey> with nsid in [{}], got collection {c}", 1127 allowed.join(", "), 1128 ))), 1129 SubjectShape::AnyAtUri => Ok(SubjectRef::Uri(uri.clone())), 1130 } 1131} 1132 1133fn require_rkey(uri: &AtUri<DefaultStr>, expected: &str) -> Result<(), XrpcError> { 1134 uri.rkey().map(|_| ()).ok_or_else(|| { 1135 XrpcError::InvalidParams(format!( 1136 "subject must be at://<did>/{expected}/<rkey>; missing rkey" 1137 )) 1138 }) 1139} 1140 1141fn parse_cursor(raw: Option<&str>) -> Result<PageCursor, XrpcError> { 1142 PageCursor::from_token(raw) 1143 .map_err(|e: CursorParseError| XrpcError::InvalidParams(format!("cursor: {e}"))) 1144} 1145 1146fn parse_limit(raw: Option<u32>) -> Result<PageLimit, XrpcError> { 1147 PageLimit::new(raw.unwrap_or(DEFAULT_LIMIT)) 1148 .map_err(|e| XrpcError::InvalidParams(format!("limit: {e}"))) 1149} 1150 1151pub(crate) fn at_uri_owned_by(uri: &AtUri<DefaultStr>, author: &Did<DefaultStr>) -> bool { 1152 match uri.authority() { 1153 AtIdentifier::Did(d) => d.as_ref() == author.as_ref(), 1154 AtIdentifier::Handle(_) => false, 1155 } 1156} 1157 1158async fn resolve_for_view( 1159 state: &AppState, 1160 expected_nsid: &Nsid<DefaultStr>, 1161 uri: AtUri<DefaultStr>, 1162) -> Result<Arc<RecordBody>, XrpcError> { 1163 let raw = uri.as_ref().to_owned(); 1164 resolve(state, ExpectedNsid::new(expected_nsid.clone()), uri) 1165 .await 1166 .map(|(body, _did)| body) 1167 .map_err(|e| match e { 1168 XrpcError::NotFound => XrpcError::UpstreamGone(raw), 1169 other => other, 1170 }) 1171} 1172 1173async fn resolve( 1174 state: &AppState, 1175 expected: ExpectedNsid, 1176 uri: AtUri<DefaultStr>, 1177) -> Result<(Arc<RecordBody>, Did<DefaultStr>), XrpcError> { 1178 let collection = uri 1179 .collection() 1180 .ok_or_else(|| XrpcError::InvalidParams("uri missing collection".into()))?; 1181 if !expected.accepts(collection.as_ref()) { 1182 return Err(XrpcError::InvalidParams(format!( 1183 "collection mismatch: expected {}, got {}", 1184 expected.as_str(), 1185 collection.as_ref() 1186 ))); 1187 } 1188 let rkey = uri 1189 .rkey() 1190 .ok_or_else(|| XrpcError::InvalidParams("uri missing rkey".into()))?; 1191 let did_ref = match uri.authority() { 1192 AtIdentifier::Did(d) => d, 1193 AtIdentifier::Handle(_) => { 1194 return Err(XrpcError::InvalidParams( 1195 "uri authority must be a did, not a handle".into(), 1196 )); 1197 } 1198 }; 1199 let did: Did<DefaultStr> = did_ref.clone().into_static(); 1200 1201 if let Some(hit) = state.records.get(&uri) { 1202 return Ok((hit, did)); 1203 } 1204 let body = state 1205 .slingshot 1206 .get_record(&did_ref, &collection, &rkey) 1207 .await 1208 .map_err(map_slingshot)?; 1209 verify_type_tag(&body, &expected)?; 1210 state.records.put(uri, body.clone()); 1211 Ok((body, did)) 1212} 1213 1214#[derive(Deserialize)] 1215struct TypeTag<'a> { 1216 #[serde(rename = "$type", borrow)] 1217 ty: &'a str, 1218} 1219 1220fn verify_type_tag(body: &RecordBody, expected: &ExpectedNsid) -> Result<(), XrpcError> { 1221 let bytes = body.value.as_ref(); 1222 let ty: std::borrow::Cow<'_, str> = match serde_json::from_slice::<TypeTag>(bytes) { 1223 Ok(t) => std::borrow::Cow::Borrowed(t.ty), 1224 Err(_) => { 1225 let value: serde_json::Value = serde_json::from_slice(bytes) 1226 .map_err(|e| XrpcError::InvalidRecord(format!("$type peek: {e}")))?; 1227 value 1228 .as_object() 1229 .and_then(|m| m.get("$type")) 1230 .and_then(|v| v.as_str()) 1231 .map(|s| std::borrow::Cow::Owned(s.to_owned())) 1232 .ok_or_else(|| XrpcError::InvalidRecord("$type peek: missing $type field".into()))? 1233 } 1234 }; 1235 if !expected.accepts(ty.as_ref()) { 1236 return Err(XrpcError::InvalidRecord(format!( 1237 "$type mismatch: expected {}, got {}", 1238 expected.as_str(), 1239 ty 1240 ))); 1241 } 1242 Ok(()) 1243} 1244 1245fn wire_type_nsid(bytes: &[u8]) -> Option<Nsid<DefaultStr>> { 1246 let ty = serde_json::from_slice::<TypeTag>(bytes).ok()?.ty; 1247 Nsid::<DefaultStr>::new_owned(ty).ok() 1248} 1249 1250async fn deserialize_or_upgrade<V>( 1251 state: &AppState, 1252 nsid: &Nsid<DefaultStr>, 1253 bytes: &[u8], 1254) -> Result<V, XrpcError> 1255where 1256 V: serde::de::DeserializeOwned, 1257{ 1258 match serde_json::from_slice::<V>(bytes) { 1259 Ok(v) => Ok(v), 1260 Err(canon_err) => { 1261 let normalized = normalize_record_fields(bytes); 1262 let working: &[u8] = normalized.as_deref().unwrap_or(bytes); 1263 if normalized.is_some() 1264 && let Ok(v) = serde_json::from_slice::<V>(working) 1265 { 1266 return Ok(v); 1267 } 1268 let scrubbed = scrub_record_bytes(nsid, working); 1269 let retry_bytes: &[u8] = scrubbed.as_deref().unwrap_or(working); 1270 if scrubbed.is_some() 1271 && let Ok(v) = serde_json::from_slice::<V>(retry_bytes) 1272 { 1273 return Ok(v); 1274 } 1275 let wire_nsid = wire_type_nsid(retry_bytes).unwrap_or_else(|| nsid.clone()); 1276 match upgrade_wire_bytes(&wire_nsid, retry_bytes, &state.resolver).await { 1277 Ok(canon_bytes) => serde_json::from_slice(&canon_bytes) 1278 .map_err(|e| XrpcError::InvalidRecord(e.to_string())), 1279 Err(_) => Err(XrpcError::InvalidRecord(canon_err.to_string())), 1280 } 1281 } 1282 } 1283} 1284 1285async fn fetch_from_uri<R, V>( 1286 state: &AppState, 1287 uri: AtUri<DefaultStr>, 1288) -> Result<(Arc<RecordBody>, V), XrpcError> 1289where 1290 R: XrpcResp, 1291 V: serde::de::DeserializeOwned + NormalizeRepoRefs, 1292{ 1293 let raw = uri.as_str().to_owned(); 1294 let nsid = nsid_static(R::NSID); 1295 let (body, _did) = resolve(state, ExpectedNsid::new(nsid.clone()), uri).await?; 1296 let value: V = deserialize_or_upgrade(state, &nsid, &body.value).await?; 1297 let value = value 1298 .normalize(&state.resolver) 1299 .await 1300 .ok_or(XrpcError::UpstreamGone(raw))?; 1301 Ok((body, value)) 1302} 1303 1304async fn fetch<R, V>( 1305 state: &AppState, 1306 uri: &AtUri<DefaultStr>, 1307) -> Result<(Arc<RecordBody>, V), XrpcError> 1308where 1309 R: XrpcResp, 1310 V: serde::de::DeserializeOwned + NormalizeRepoRefs, 1311{ 1312 fetch_from_uri::<R, V>(state, uri.clone()).await 1313} 1314 1315async fn get_repo( 1316 State(state): State<AppState>, 1317 XrpcQuery(q): XrpcQuery<GetRepoQuery>, 1318) -> Result<Json<RepoGetRecordOutput<DefaultStr>>, XrpcError> { 1319 let (body, value) = fetch::<RepoRecord, Repo<DefaultStr>>(&state, &q.repo).await?; 1320 Ok(Json(RepoGetRecordOutput { 1321 cid: Some(body.cid.clone()), 1322 uri: body.uri.clone(), 1323 value, 1324 })) 1325} 1326 1327async fn get_repo_by_repo_did( 1328 State(state): State<AppState>, 1329 XrpcQuery(q): XrpcQuery<GetRepoByRepoDidQuery>, 1330) -> Result<Json<RepoGetRecordOutput<DefaultStr>>, XrpcError> { 1331 let ident = state 1332 .resolver 1333 .lookup_by_repo_did(&q.repo_did) 1334 .await 1335 .ok_or(XrpcError::NotFound)?; 1336 let uri = AtUri::<DefaultStr>::from_parts_owned( 1337 ident.owner.as_str(), 1338 RepoRecord::NSID, 1339 ident.rkey.as_str(), 1340 ) 1341 .expect("Did and Rkey newtypes already validated, at-uri assembly cannot fail"); 1342 let (body, value) = fetch_from_uri::<RepoRecord, Repo<DefaultStr>>(&state, uri).await?; 1343 Ok(Json(RepoGetRecordOutput { 1344 cid: Some(body.cid.clone()), 1345 uri: body.uri.clone(), 1346 value, 1347 })) 1348} 1349 1350async fn get_profile( 1351 State(state): State<AppState>, 1352 XrpcQuery(q): XrpcQuery<GetProfileQuery>, 1353) -> Result<Json<ProfileGetRecordOutput<DefaultStr>>, XrpcError> { 1354 let (body, value) = fetch::<ProfileRecord, Profile<DefaultStr>>(&state, &q.actor).await?; 1355 Ok(Json(ProfileGetRecordOutput { 1356 cid: Some(body.cid.clone()), 1357 uri: body.uri.clone(), 1358 value, 1359 })) 1360} 1361 1362async fn get_issue( 1363 State(state): State<AppState>, 1364 XrpcQuery(q): XrpcQuery<GetIssueQuery>, 1365) -> Result<Json<IssueGetRecordOutput<DefaultStr>>, XrpcError> { 1366 let (body, value) = fetch::<IssueRecord, Issue<DefaultStr>>(&state, &q.issue).await?; 1367 Ok(Json(IssueGetRecordOutput { 1368 cid: Some(body.cid.clone()), 1369 uri: body.uri.clone(), 1370 value, 1371 })) 1372} 1373 1374async fn get_pull( 1375 State(state): State<AppState>, 1376 XrpcQuery(q): XrpcQuery<GetPullQuery>, 1377) -> Result<Json<PullGetRecordOutput<DefaultStr>>, XrpcError> { 1378 let (body, value) = fetch::<PullRecord, Pull<DefaultStr>>(&state, &q.pull).await?; 1379 Ok(Json(PullGetRecordOutput { 1380 cid: Some(body.cid.clone()), 1381 uri: body.uri.clone(), 1382 value, 1383 })) 1384} 1385 1386async fn get_repos( 1387 State(state): State<AppState>, 1388 RawQuery(query): RawQuery, 1389) -> Result<Response, XrpcError> { 1390 let uris = collect_repeated(query.as_deref(), BULK_REPOS_KEY); 1391 bulk_fetch::<RepoRecord, Repo<DefaultStr>>(&state, uris).await 1392} 1393 1394async fn get_profiles( 1395 State(state): State<AppState>, 1396 RawQuery(query): RawQuery, 1397) -> Result<Response, XrpcError> { 1398 let uris = collect_repeated(query.as_deref(), BULK_PROFILES_KEY); 1399 bulk_fetch::<ProfileRecord, Profile<DefaultStr>>(&state, uris).await 1400} 1401 1402async fn get_issues( 1403 State(state): State<AppState>, 1404 RawQuery(query): RawQuery, 1405) -> Result<Response, XrpcError> { 1406 let uris = collect_repeated(query.as_deref(), BULK_ISSUES_KEY); 1407 bulk_fetch::<IssueRecord, Issue<DefaultStr>>(&state, uris).await 1408} 1409 1410async fn get_pulls( 1411 State(state): State<AppState>, 1412 RawQuery(query): RawQuery, 1413) -> Result<Response, XrpcError> { 1414 let uris = collect_repeated(query.as_deref(), BULK_PULLS_KEY); 1415 bulk_fetch::<PullRecord, Pull<DefaultStr>>(&state, uris).await 1416} 1417 1418const BULK_REPOS_KEY: &str = "repos"; 1419const BULK_PROFILES_KEY: &str = "actors"; 1420const BULK_ISSUES_KEY: &str = "issues"; 1421const BULK_PULLS_KEY: &str = "pulls"; 1422const BULK_LIMIT: usize = 50; 1423 1424fn collect_repeated(query: Option<&str>, key: &str) -> Vec<String> { 1425 let Some(q) = query else { 1426 return Vec::new(); 1427 }; 1428 form_urlencoded::parse(q.as_bytes()) 1429 .filter_map(|(k, v)| (k == key).then(|| v.into_owned())) 1430 .collect() 1431} 1432 1433async fn hydrate_record_view<V>( 1434 state: &AppState, 1435 nsid: &Nsid<DefaultStr>, 1436 uri: AtUri<DefaultStr>, 1437) -> Result<Option<RecordView<V>>, XrpcError> 1438where 1439 V: serde::de::DeserializeOwned + NormalizeRepoRefs, 1440{ 1441 let body = resolve_for_view(state, nsid, uri).await?; 1442 let value: V = deserialize_or_upgrade::<V>(state, nsid, &body.value).await?; 1443 let Some(value) = value.normalize(&state.resolver).await else { 1444 return Ok(None); 1445 }; 1446 Ok(Some(RecordView { 1447 uri: body.uri.clone(), 1448 cid: Some(body.cid.clone()), 1449 value, 1450 })) 1451} 1452 1453#[derive(Clone, Copy)] 1454enum HitProvenance { 1455 ClientSupplied, 1456 Indexed, 1457} 1458 1459fn is_index_evictable(err: &XrpcError) -> bool { 1460 matches!( 1461 err, 1462 XrpcError::NotFound 1463 | XrpcError::UpstreamGone(_) 1464 | XrpcError::InvalidRecord(_) 1465 | XrpcError::InvalidParams(_) 1466 ) 1467} 1468 1469fn drop_unhydratable<V>( 1470 provenance: HitProvenance, 1471 nsid: &Nsid<DefaultStr>, 1472 uri: &AtUri<DefaultStr>, 1473 result: Result<Option<V>, XrpcError>, 1474) -> Result<Option<V>, XrpcError> { 1475 match result { 1476 Ok(view) => Ok(view), 1477 Err(err @ (XrpcError::NotFound | XrpcError::UpstreamGone(_))) => { 1478 tracing::debug!( 1479 uri = %uri, 1480 nsid = %nsid.as_ref(), 1481 error = %err, 1482 "dropping gone hit during hydration", 1483 ); 1484 Ok(None) 1485 } 1486 Err(err @ XrpcError::UpstreamUnavailable(_)) => { 1487 tracing::warn!( 1488 uri = %uri, 1489 nsid = %nsid.as_ref(), 1490 error = %err, 1491 "dropping hit, upstream unavailable during hydration", 1492 ); 1493 Ok(None) 1494 } 1495 Err(err @ XrpcError::InvalidRecord(_)) => { 1496 tracing::warn!( 1497 uri = %uri, 1498 nsid = %nsid.as_ref(), 1499 error = %err, 1500 "dropping invalid hit during hydration", 1501 ); 1502 Ok(None) 1503 } 1504 Err(err @ XrpcError::InvalidParams(_)) => match provenance { 1505 HitProvenance::ClientSupplied => Err(err), 1506 HitProvenance::Indexed => { 1507 tracing::warn!( 1508 uri = %uri, 1509 nsid = %nsid.as_ref(), 1510 error = %err, 1511 "dropping malformed indexed hit during hydration", 1512 ); 1513 Ok(None) 1514 } 1515 }, 1516 Err(err @ (XrpcError::Internal(_) | XrpcError::Overloaded)) => Err(err), 1517 } 1518} 1519 1520fn hydrate_stream<T, Fut, V>( 1521 items: impl IntoIterator<Item = T>, 1522 produce: impl FnMut(T) -> Fut, 1523) -> impl Stream<Item = Result<V, XrpcError>> 1524where 1525 Fut: Future<Output = Result<Option<V>, XrpcError>>, 1526{ 1527 stream::iter(items) 1528 .map(produce) 1529 .buffered(FETCH_CONCURRENCY) 1530 .try_filter_map(|view| async move { Ok(view) }) 1531} 1532 1533fn hydrate_record_stream<V>( 1534 state: &AppState, 1535 nsid: Nsid<DefaultStr>, 1536 uris: Vec<AtUri<DefaultStr>>, 1537 provenance: HitProvenance, 1538) -> impl Stream<Item = Result<RecordView<V>, XrpcError>> + Send + 'static 1539where 1540 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static, 1541{ 1542 let owned = state.clone(); 1543 hydrate_stream(uris, move |uri| { 1544 let owned = owned.clone(); 1545 let nsid = nsid.clone(); 1546 async move { 1547 let result = hydrate_record_view::<V>(&owned, &nsid, uri.clone()).await; 1548 if matches!(provenance, HitProvenance::Indexed) 1549 && let Err(err) = &result 1550 && is_index_evictable(err) 1551 { 1552 owned.edges.remove_source(&uri); 1553 } 1554 drop_unhydratable(provenance, &nsid, &uri, result) 1555 } 1556 }) 1557} 1558 1559enum PagePhase { 1560 Head, 1561 Body { first: bool }, 1562 Done, 1563} 1564 1565struct PageState<S> { 1566 items: std::pin::Pin<Box<S>>, 1567 phase: PagePhase, 1568 array_key: &'static str, 1569 tail: Vec<u8>, 1570 permit: Option<HeavyPermit>, 1571} 1572 1573fn paged_tail(cursor: Option<String>) -> Vec<u8> { 1574 let encoded = serde_json::to_string(&cursor).unwrap_or_else(|_| "null".to_owned()); 1575 format!("],\"cursor\":{encoded}}}").into_bytes() 1576} 1577 1578fn unpaged_tail() -> Vec<u8> { 1579 b"]}".to_vec() 1580} 1581 1582fn json_stream<V, S>( 1583 array_key: &'static str, 1584 items: S, 1585 tail: Vec<u8>, 1586 permit: Option<HeavyPermit>, 1587) -> Response 1588where 1589 V: Serialize + Send + 'static, 1590 S: Stream<Item = Result<V, XrpcError>> + Send + 'static, 1591{ 1592 let init = PageState { 1593 items: Box::pin(items), 1594 phase: PagePhase::Head, 1595 array_key, 1596 tail, 1597 permit, 1598 }; 1599 let chunks = stream::unfold(init, |mut st| async move { 1600 match st.phase { 1601 PagePhase::Head => { 1602 let head = format!("{{\"{}\":[", st.array_key).into_bytes(); 1603 st.phase = PagePhase::Body { first: true }; 1604 Some((Ok::<Vec<u8>, Infallible>(head), st)) 1605 } 1606 PagePhase::Body { first } => match st.items.next().await { 1607 Some(Ok(view)) => match serde_json::to_vec(&view) { 1608 Ok(encoded) => { 1609 let mut chunk = Vec::with_capacity(encoded.len() + 1); 1610 if !first { 1611 chunk.push(b','); 1612 } 1613 chunk.extend_from_slice(&encoded); 1614 st.phase = PagePhase::Body { first: false }; 1615 Some((Ok(chunk), st)) 1616 } 1617 Err(e) => { 1618 tracing::warn!(error = %e, "skipping hit, serialize failed mid-stream"); 1619 Some((Ok(Vec::new()), st)) 1620 } 1621 }, 1622 Some(Err(e)) => { 1623 tracing::warn!(error = %e, "ending page early, hydration failed mid-stream"); 1624 let tail = std::mem::take(&mut st.tail); 1625 st.phase = PagePhase::Done; 1626 Some((Ok(tail), st)) 1627 } 1628 None => { 1629 let tail = std::mem::take(&mut st.tail); 1630 st.phase = PagePhase::Done; 1631 Some((Ok(tail), st)) 1632 } 1633 }, 1634 PagePhase::Done => { 1635 drop(st.permit.take()); 1636 None 1637 } 1638 } 1639 }); 1640 ( 1641 [(CONTENT_TYPE, "application/json")], 1642 Body::from_stream(chunks), 1643 ) 1644 .into_response() 1645} 1646 1647async fn bulk_fetch<R, V>(state: &AppState, uris: Vec<String>) -> Result<Response, XrpcError> 1648where 1649 R: XrpcResp, 1650 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static, 1651{ 1652 if uris.is_empty() { 1653 return Err(XrpcError::InvalidParams("at least one uri required".into())); 1654 } 1655 if uris.len() > BULK_LIMIT { 1656 return Err(XrpcError::InvalidParams(format!( 1657 "at most {BULK_LIMIT} uris per request" 1658 ))); 1659 } 1660 let parsed: Vec<AtUri<DefaultStr>> = uris 1661 .iter() 1662 .map(|s| parse_uri(s)) 1663 .collect::<Result<_, _>>()?; 1664 let nsid = nsid_static(R::NSID); 1665 if let Some(bad) = parsed 1666 .iter() 1667 .find(|uri| uri.collection().is_none_or(|c| c.as_ref() != nsid.as_ref())) 1668 { 1669 return Err(XrpcError::InvalidParams(format!( 1670 "uri collection must be {}, got {}", 1671 nsid.as_ref(), 1672 bad.as_ref() 1673 ))); 1674 } 1675 let permit = state.heavy_permit()?; 1676 let views = hydrate_record_stream::<V>(state, nsid, parsed, HitProvenance::ClientSupplied); 1677 Ok(json_stream::<RecordView<V>, _>( 1678 "items", 1679 views, 1680 unpaged_tail(), 1681 permit, 1682 )) 1683} 1684 1685fn record_edge_page<R, F>( 1686 state: &AppState, 1687 q: &TypedListQuery<F>, 1688) -> Result<(EdgePage, Nsid<DefaultStr>), XrpcError> 1689where 1690 R: XrpcResp + HasSubject, 1691 F: ListFilter, 1692{ 1693 let subject = parse_subject(&q.subject, R::SHAPE)?; 1694 let cursor = parse_cursor(q.cursor.as_deref())?; 1695 let limit = parse_limit(q.limit)?; 1696 let dir = q.dir(); 1697 let nsid = nsid_static(R::NSID); 1698 let page = if q.filter.is_identity() { 1699 let key = EdgeKey::new(nsid.clone(), subject); 1700 state.edges.list(&key, cursor, limit, dir) 1701 } else { 1702 let pred = q.filter.predicate(state, &subject); 1703 let key = EdgeKey::new(nsid.clone(), subject); 1704 state.edges.list_filtered(&key, cursor, limit, dir, pred) 1705 }; 1706 Ok((page, nsid)) 1707} 1708 1709async fn list_records<R, V, F>( 1710 state: &AppState, 1711 q: TypedListQuery<F>, 1712) -> Result<Response, XrpcError> 1713where 1714 R: XrpcResp + HasSubject, 1715 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static, 1716 F: ListFilter, 1717{ 1718 let (page, nsid) = record_edge_page::<R, F>(state, &q)?; 1719 let permit = state.heavy_permit()?; 1720 let views = hydrate_record_stream::<V>(state, nsid, page.items, HitProvenance::Indexed); 1721 Ok(json_stream::<RecordView<V>, _>( 1722 "items", 1723 views, 1724 paged_tail(page.next.map(PageToken::encode_token)), 1725 permit, 1726 )) 1727} 1728 1729fn count_for<R: XrpcResp + HasSubject>( 1730 state: &AppState, 1731 q: CountQuery, 1732) -> Result<CountResponse, XrpcError> { 1733 let subject = parse_subject(&q.subject, R::SHAPE)?; 1734 let key = EdgeKey::new(nsid_static(R::NSID), subject); 1735 Ok(CountResponse { 1736 count: state.edges.count(&key), 1737 distinct_authors: state.edges.count_distinct_authors(&key), 1738 }) 1739} 1740 1741fn mirror_edge_page<M, F>( 1742 state: &AppState, 1743 q: &TypedListQuery<F>, 1744) -> Result<(EdgePage, Nsid<DefaultStr>), XrpcError> 1745where 1746 M: MirrorOf, 1747 F: ListFilter, 1748{ 1749 let subject = parse_subject(&q.subject, M::SHAPE)?; 1750 let cursor = parse_cursor(q.cursor.as_deref())?; 1751 let limit = parse_limit(q.limit)?; 1752 let dir = q.dir(); 1753 let edge_nsid = nsid_static(M::EDGE_KIND); 1754 let page = if q.filter.is_identity() { 1755 let key = EdgeKey::new(edge_nsid, subject); 1756 state.edges.list(&key, cursor, limit, dir) 1757 } else { 1758 let pred = q.filter.predicate(state, &subject); 1759 let key = EdgeKey::new(edge_nsid, subject); 1760 state.edges.list_filtered(&key, cursor, limit, dir, pred) 1761 }; 1762 let record_nsid = nsid_static(<M::Record as XrpcResp>::NSID); 1763 Ok((page, record_nsid)) 1764} 1765 1766async fn list_mirror<M, V, F>(state: &AppState, q: TypedListQuery<F>) -> Result<Response, XrpcError> 1767where 1768 M: MirrorOf, 1769 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static, 1770 F: ListFilter, 1771{ 1772 let (page, record_nsid) = mirror_edge_page::<M, F>(state, &q)?; 1773 let permit = state.heavy_permit()?; 1774 let views = hydrate_record_stream::<V>(state, record_nsid, page.items, HitProvenance::Indexed); 1775 Ok(json_stream::<RecordView<V>, _>( 1776 "items", 1777 views, 1778 paged_tail(page.next.map(PageToken::encode_token)), 1779 permit, 1780 )) 1781} 1782 1783fn count_mirror<M: MirrorOf>(state: &AppState, q: CountQuery) -> Result<CountResponse, XrpcError> { 1784 let subject = parse_subject(&q.subject, M::SHAPE)?; 1785 let key = EdgeKey::new(nsid_static(M::EDGE_KIND), subject); 1786 Ok(CountResponse { 1787 count: state.edges.count(&key), 1788 distinct_authors: state.edges.count_distinct_authors(&key), 1789 }) 1790} 1791 1792async fn list_stars( 1793 State(state): State<AppState>, 1794 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1795) -> Result<Response, XrpcError> { 1796 list_records::<StarRecord, Star<DefaultStr>, _>(&state, q).await 1797} 1798 1799async fn count_stars( 1800 State(state): State<AppState>, 1801 XrpcQuery(q): XrpcQuery<CountQuery>, 1802) -> Result<Json<CountResponse>, XrpcError> { 1803 count_for::<StarRecord>(&state, q).map(Json) 1804} 1805 1806async fn list_follows( 1807 State(state): State<AppState>, 1808 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1809) -> Result<Response, XrpcError> { 1810 list_records::<FollowRecord, Follow<DefaultStr>, _>(&state, q).await 1811} 1812 1813async fn count_follows( 1814 State(state): State<AppState>, 1815 XrpcQuery(q): XrpcQuery<CountQuery>, 1816) -> Result<Json<CountResponse>, XrpcError> { 1817 count_for::<FollowRecord>(&state, q).map(Json) 1818} 1819 1820async fn list_issues( 1821 State(state): State<AppState>, 1822 XrpcQuery(q): XrpcQuery<TypedListQuery<IssueFilter>>, 1823) -> Result<Response, XrpcError> { 1824 let (page, nsid) = record_edge_page::<IssueRecord, _>(&state, &q)?; 1825 let permit = state.heavy_permit()?; 1826 let owned = state.clone(); 1827 let items = hydrate_record_stream::<Issue<DefaultStr>>( 1828 &state, 1829 nsid, 1830 page.items, 1831 HitProvenance::Indexed, 1832 ) 1833 .map(move |view| view.map(|v| enrich_issue_view(&owned, v))); 1834 Ok(json_stream::<StatefulItem<Issue<DefaultStr>>, _>( 1835 "items", 1836 items, 1837 paged_tail(page.next.map(PageToken::encode_token)), 1838 permit, 1839 )) 1840} 1841 1842async fn count_issues( 1843 State(state): State<AppState>, 1844 XrpcQuery(q): XrpcQuery<CountQuery>, 1845) -> Result<Json<CountResponse>, XrpcError> { 1846 count_for::<IssueRecord>(&state, q).map(Json) 1847} 1848 1849async fn list_pulls( 1850 State(state): State<AppState>, 1851 XrpcQuery(q): XrpcQuery<TypedListQuery<PullFilter>>, 1852) -> Result<Response, XrpcError> { 1853 let (page, nsid) = record_edge_page::<PullRecord, _>(&state, &q)?; 1854 let permit = state.heavy_permit()?; 1855 let owned = state.clone(); 1856 let items = 1857 hydrate_record_stream::<Pull<DefaultStr>>(&state, nsid, page.items, HitProvenance::Indexed) 1858 .map(move |view| view.map(|v| enrich_pull_view(&owned, v))); 1859 Ok(json_stream::<StatefulItem<Pull<DefaultStr>>, _>( 1860 "items", 1861 items, 1862 paged_tail(page.next.map(PageToken::encode_token)), 1863 permit, 1864 )) 1865} 1866 1867async fn count_pulls( 1868 State(state): State<AppState>, 1869 XrpcQuery(q): XrpcQuery<CountQuery>, 1870) -> Result<Json<CountResponse>, XrpcError> { 1871 count_for::<PullRecord>(&state, q).map(Json) 1872} 1873 1874async fn list_feed_comments( 1875 State(state): State<AppState>, 1876 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1877) -> Result<Response, XrpcError> { 1878 list_records::<FeedCommentRecord, FeedComment<DefaultStr>, _>(&state, q).await 1879} 1880 1881async fn count_feed_comments( 1882 State(state): State<AppState>, 1883 XrpcQuery(q): XrpcQuery<CountQuery>, 1884) -> Result<Json<CountResponse>, XrpcError> { 1885 count_for::<FeedCommentRecord>(&state, q).map(Json) 1886} 1887 1888async fn list_reactions( 1889 State(state): State<AppState>, 1890 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1891) -> Result<Response, XrpcError> { 1892 list_records::<ReactionRecord, Reaction<DefaultStr>, _>(&state, q).await 1893} 1894 1895async fn count_reactions( 1896 State(state): State<AppState>, 1897 XrpcQuery(q): XrpcQuery<CountQuery>, 1898) -> Result<Json<CountResponse>, XrpcError> { 1899 count_for::<ReactionRecord>(&state, q).map(Json) 1900} 1901 1902async fn list_ref_updates( 1903 State(state): State<AppState>, 1904 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1905) -> Result<Response, XrpcError> { 1906 list_records::<RefUpdateRecord, RefUpdate<DefaultStr>, _>(&state, q).await 1907} 1908 1909async fn count_ref_updates( 1910 State(state): State<AppState>, 1911 XrpcQuery(q): XrpcQuery<CountQuery>, 1912) -> Result<Json<CountResponse>, XrpcError> { 1913 count_for::<RefUpdateRecord>(&state, q).map(Json) 1914} 1915 1916async fn list_collaborators( 1917 State(state): State<AppState>, 1918 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1919) -> Result<Response, XrpcError> { 1920 list_records::<CollaboratorRecord, Collaborator<DefaultStr>, _>(&state, q).await 1921} 1922 1923async fn count_collaborators( 1924 State(state): State<AppState>, 1925 XrpcQuery(q): XrpcQuery<CountQuery>, 1926) -> Result<Json<CountResponse>, XrpcError> { 1927 count_for::<CollaboratorRecord>(&state, q).map(Json) 1928} 1929 1930async fn list_issue_states( 1931 State(state): State<AppState>, 1932 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1933) -> Result<Response, XrpcError> { 1934 list_records::<IssueStateRecord, IssueState<DefaultStr>, _>(&state, q).await 1935} 1936 1937async fn count_issue_states( 1938 State(state): State<AppState>, 1939 XrpcQuery(q): XrpcQuery<CountQuery>, 1940) -> Result<Json<CountResponse>, XrpcError> { 1941 count_for::<IssueStateRecord>(&state, q).map(Json) 1942} 1943 1944async fn list_pull_statuses( 1945 State(state): State<AppState>, 1946 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1947) -> Result<Response, XrpcError> { 1948 list_records::<PullStatusRecord, PullStatus<DefaultStr>, _>(&state, q).await 1949} 1950 1951async fn count_pull_statuses( 1952 State(state): State<AppState>, 1953 XrpcQuery(q): XrpcQuery<CountQuery>, 1954) -> Result<Json<CountResponse>, XrpcError> { 1955 count_for::<PullStatusRecord>(&state, q).map(Json) 1956} 1957 1958async fn list_repos( 1959 State(state): State<AppState>, 1960 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1961) -> Result<Response, XrpcError> { 1962 list_records::<RepoRecord, Repo<DefaultStr>, _>(&state, q).await 1963} 1964 1965async fn count_repos( 1966 State(state): State<AppState>, 1967 XrpcQuery(q): XrpcQuery<CountQuery>, 1968) -> Result<Json<CountResponse>, XrpcError> { 1969 count_for::<RepoRecord>(&state, q).map(Json) 1970} 1971 1972async fn list_knots( 1973 State(state): State<AppState>, 1974 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1975) -> Result<Response, XrpcError> { 1976 list_records::<KnotRecord, Knot<DefaultStr>, _>(&state, q).await 1977} 1978 1979async fn count_knots( 1980 State(state): State<AppState>, 1981 XrpcQuery(q): XrpcQuery<CountQuery>, 1982) -> Result<Json<CountResponse>, XrpcError> { 1983 count_for::<KnotRecord>(&state, q).map(Json) 1984} 1985 1986async fn list_spindles( 1987 State(state): State<AppState>, 1988 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1989) -> Result<Response, XrpcError> { 1990 list_records::<SpindleRecord, Spindle<DefaultStr>, _>(&state, q).await 1991} 1992 1993async fn count_spindles( 1994 State(state): State<AppState>, 1995 XrpcQuery(q): XrpcQuery<CountQuery>, 1996) -> Result<Json<CountResponse>, XrpcError> { 1997 count_for::<SpindleRecord>(&state, q).map(Json) 1998} 1999 2000async fn list_public_keys( 2001 State(state): State<AppState>, 2002 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2003) -> Result<Response, XrpcError> { 2004 list_records::<PublicKeyRecord, PublicKey<DefaultStr>, _>(&state, q).await 2005} 2006 2007async fn count_public_keys( 2008 State(state): State<AppState>, 2009 XrpcQuery(q): XrpcQuery<CountQuery>, 2010) -> Result<Json<CountResponse>, XrpcError> { 2011 count_for::<PublicKeyRecord>(&state, q).map(Json) 2012} 2013 2014async fn list_vouches( 2015 State(state): State<AppState>, 2016 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2017) -> Result<Response, XrpcError> { 2018 list_records::<VouchRecord, Vouch<DefaultStr>, _>(&state, q).await 2019} 2020 2021async fn count_vouches( 2022 State(state): State<AppState>, 2023 XrpcQuery(q): XrpcQuery<CountQuery>, 2024) -> Result<Json<CountResponse>, XrpcError> { 2025 count_for::<VouchRecord>(&state, q).map(Json) 2026} 2027 2028async fn list_stars_by( 2029 State(state): State<AppState>, 2030 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2031) -> Result<Response, XrpcError> { 2032 list_mirror::<StarBy, Star<DefaultStr>, _>(&state, q).await 2033} 2034async fn count_stars_by( 2035 State(state): State<AppState>, 2036 XrpcQuery(q): XrpcQuery<CountQuery>, 2037) -> Result<Json<CountResponse>, XrpcError> { 2038 count_mirror::<StarBy>(&state, q).map(Json) 2039} 2040 2041async fn list_reactions_by( 2042 State(state): State<AppState>, 2043 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2044) -> Result<Response, XrpcError> { 2045 list_mirror::<ReactionBy, Reaction<DefaultStr>, _>(&state, q).await 2046} 2047async fn count_reactions_by( 2048 State(state): State<AppState>, 2049 XrpcQuery(q): XrpcQuery<CountQuery>, 2050) -> Result<Json<CountResponse>, XrpcError> { 2051 count_mirror::<ReactionBy>(&state, q).map(Json) 2052} 2053 2054async fn list_follows_by( 2055 State(state): State<AppState>, 2056 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2057) -> Result<Response, XrpcError> { 2058 list_mirror::<FollowBy, Follow<DefaultStr>, _>(&state, q).await 2059} 2060async fn count_follows_by( 2061 State(state): State<AppState>, 2062 XrpcQuery(q): XrpcQuery<CountQuery>, 2063) -> Result<Json<CountResponse>, XrpcError> { 2064 count_mirror::<FollowBy>(&state, q).map(Json) 2065} 2066 2067async fn list_vouches_by( 2068 State(state): State<AppState>, 2069 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2070) -> Result<Response, XrpcError> { 2071 list_mirror::<VouchBy, Vouch<DefaultStr>, _>(&state, q).await 2072} 2073async fn count_vouches_by( 2074 State(state): State<AppState>, 2075 XrpcQuery(q): XrpcQuery<CountQuery>, 2076) -> Result<Json<CountResponse>, XrpcError> { 2077 count_mirror::<VouchBy>(&state, q).map(Json) 2078} 2079 2080async fn list_ref_updates_by( 2081 State(state): State<AppState>, 2082 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2083) -> Result<Response, XrpcError> { 2084 list_mirror::<RefUpdateBy, RefUpdate<DefaultStr>, _>(&state, q).await 2085} 2086async fn count_ref_updates_by( 2087 State(state): State<AppState>, 2088 XrpcQuery(q): XrpcQuery<CountQuery>, 2089) -> Result<Json<CountResponse>, XrpcError> { 2090 count_mirror::<RefUpdateBy>(&state, q).map(Json) 2091} 2092 2093async fn list_knot_members_by( 2094 State(state): State<AppState>, 2095 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2096) -> Result<Response, XrpcError> { 2097 list_mirror::<KnotMemberBy, KnotMember<DefaultStr>, _>(&state, q).await 2098} 2099async fn count_knot_members_by( 2100 State(state): State<AppState>, 2101 XrpcQuery(q): XrpcQuery<CountQuery>, 2102) -> Result<Json<CountResponse>, XrpcError> { 2103 count_mirror::<KnotMemberBy>(&state, q).map(Json) 2104} 2105 2106async fn list_label_ops_by( 2107 State(state): State<AppState>, 2108 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2109) -> Result<Response, XrpcError> { 2110 list_mirror::<LabelOpBy, LabelOp<DefaultStr>, _>(&state, q).await 2111} 2112async fn count_label_ops_by( 2113 State(state): State<AppState>, 2114 XrpcQuery(q): XrpcQuery<CountQuery>, 2115) -> Result<Json<CountResponse>, XrpcError> { 2116 count_mirror::<LabelOpBy>(&state, q).map(Json) 2117} 2118 2119async fn list_pipelines_by( 2120 State(state): State<AppState>, 2121 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2122) -> Result<Response, XrpcError> { 2123 list_mirror::<PipelineBy, Pipeline<DefaultStr>, _>(&state, q).await 2124} 2125async fn count_pipelines_by( 2126 State(state): State<AppState>, 2127 XrpcQuery(q): XrpcQuery<CountQuery>, 2128) -> Result<Json<CountResponse>, XrpcError> { 2129 count_mirror::<PipelineBy>(&state, q).map(Json) 2130} 2131 2132async fn list_pipeline_statuses_by( 2133 State(state): State<AppState>, 2134 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2135) -> Result<Response, XrpcError> { 2136 list_mirror::<PipelineStatusBy, PipelineStatus<DefaultStr>, _>(&state, q).await 2137} 2138async fn count_pipeline_statuses_by( 2139 State(state): State<AppState>, 2140 XrpcQuery(q): XrpcQuery<CountQuery>, 2141) -> Result<Json<CountResponse>, XrpcError> { 2142 count_mirror::<PipelineStatusBy>(&state, q).map(Json) 2143} 2144 2145async fn list_artifacts_by( 2146 State(state): State<AppState>, 2147 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2148) -> Result<Response, XrpcError> { 2149 list_mirror::<ArtifactBy, Artifact<DefaultStr>, _>(&state, q).await 2150} 2151async fn count_artifacts_by( 2152 State(state): State<AppState>, 2153 XrpcQuery(q): XrpcQuery<CountQuery>, 2154) -> Result<Json<CountResponse>, XrpcError> { 2155 count_mirror::<ArtifactBy>(&state, q).map(Json) 2156} 2157 2158async fn list_collaborators_by( 2159 State(state): State<AppState>, 2160 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2161) -> Result<Response, XrpcError> { 2162 list_mirror::<CollaboratorBy, Collaborator<DefaultStr>, _>(&state, q).await 2163} 2164async fn count_collaborators_by( 2165 State(state): State<AppState>, 2166 XrpcQuery(q): XrpcQuery<CountQuery>, 2167) -> Result<Json<CountResponse>, XrpcError> { 2168 count_mirror::<CollaboratorBy>(&state, q).map(Json) 2169} 2170 2171async fn list_issues_by( 2172 State(state): State<AppState>, 2173 XrpcQuery(q): XrpcQuery<TypedListQuery<IssueFilter>>, 2174) -> Result<Response, XrpcError> { 2175 let (page, nsid) = mirror_edge_page::<IssueBy, _>(&state, &q)?; 2176 let permit = state.heavy_permit()?; 2177 let owned = state.clone(); 2178 let items = hydrate_record_stream::<Issue<DefaultStr>>( 2179 &state, 2180 nsid, 2181 page.items, 2182 HitProvenance::Indexed, 2183 ) 2184 .map(move |view| view.map(|v| enrich_issue_view(&owned, v))); 2185 Ok(json_stream::<StatefulItem<Issue<DefaultStr>>, _>( 2186 "items", 2187 items, 2188 paged_tail(page.next.map(PageToken::encode_token)), 2189 permit, 2190 )) 2191} 2192async fn count_issues_by( 2193 State(state): State<AppState>, 2194 XrpcQuery(q): XrpcQuery<CountQuery>, 2195) -> Result<Json<CountResponse>, XrpcError> { 2196 count_mirror::<IssueBy>(&state, q).map(Json) 2197} 2198 2199async fn list_feed_comments_by( 2200 State(state): State<AppState>, 2201 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2202) -> Result<Response, XrpcError> { 2203 list_mirror::<FeedCommentBy, FeedComment<DefaultStr>, _>(&state, q).await 2204} 2205async fn count_feed_comments_by( 2206 State(state): State<AppState>, 2207 XrpcQuery(q): XrpcQuery<CountQuery>, 2208) -> Result<Json<CountResponse>, XrpcError> { 2209 count_mirror::<FeedCommentBy>(&state, q).map(Json) 2210} 2211 2212async fn list_issue_states_by( 2213 State(state): State<AppState>, 2214 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2215) -> Result<Response, XrpcError> { 2216 list_mirror::<IssueStateBy, IssueState<DefaultStr>, _>(&state, q).await 2217} 2218async fn count_issue_states_by( 2219 State(state): State<AppState>, 2220 XrpcQuery(q): XrpcQuery<CountQuery>, 2221) -> Result<Json<CountResponse>, XrpcError> { 2222 count_mirror::<IssueStateBy>(&state, q).map(Json) 2223} 2224 2225async fn list_pulls_by( 2226 State(state): State<AppState>, 2227 XrpcQuery(q): XrpcQuery<TypedListQuery<PullFilter>>, 2228) -> Result<Response, XrpcError> { 2229 let (page, nsid) = mirror_edge_page::<PullBy, _>(&state, &q)?; 2230 let permit = state.heavy_permit()?; 2231 let owned = state.clone(); 2232 let items = 2233 hydrate_record_stream::<Pull<DefaultStr>>(&state, nsid, page.items, HitProvenance::Indexed) 2234 .map(move |view| view.map(|v| enrich_pull_view(&owned, v))); 2235 Ok(json_stream::<StatefulItem<Pull<DefaultStr>>, _>( 2236 "items", 2237 items, 2238 paged_tail(page.next.map(PageToken::encode_token)), 2239 permit, 2240 )) 2241} 2242async fn count_pulls_by( 2243 State(state): State<AppState>, 2244 XrpcQuery(q): XrpcQuery<CountQuery>, 2245) -> Result<Json<CountResponse>, XrpcError> { 2246 count_mirror::<PullBy>(&state, q).map(Json) 2247} 2248 2249async fn list_pull_statuses_by( 2250 State(state): State<AppState>, 2251 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2252) -> Result<Response, XrpcError> { 2253 list_mirror::<PullStatusBy, PullStatus<DefaultStr>, _>(&state, q).await 2254} 2255async fn count_pull_statuses_by( 2256 State(state): State<AppState>, 2257 XrpcQuery(q): XrpcQuery<CountQuery>, 2258) -> Result<Json<CountResponse>, XrpcError> { 2259 count_mirror::<PullStatusBy>(&state, q).map(Json) 2260} 2261 2262async fn list_spindle_members_by( 2263 State(state): State<AppState>, 2264 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2265) -> Result<Response, XrpcError> { 2266 list_mirror::<SpindleMemberBy, SpindleMember<DefaultStr>, _>(&state, q).await 2267} 2268async fn count_spindle_members_by( 2269 State(state): State<AppState>, 2270 XrpcQuery(q): XrpcQuery<CountQuery>, 2271) -> Result<Json<CountResponse>, XrpcError> { 2272 count_mirror::<SpindleMemberBy>(&state, q).map(Json) 2273} 2274 2275async fn list_label_definitions( 2276 State(state): State<AppState>, 2277 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2278) -> Result<Response, XrpcError> { 2279 list_records::<LabelDefinitionRecord, LabelDefinition<DefaultStr>, _>(&state, q).await 2280} 2281 2282async fn count_label_definitions( 2283 State(state): State<AppState>, 2284 XrpcQuery(q): XrpcQuery<CountQuery>, 2285) -> Result<Json<CountResponse>, XrpcError> { 2286 count_for::<LabelDefinitionRecord>(&state, q).map(Json) 2287} 2288 2289async fn list_label_ops( 2290 State(state): State<AppState>, 2291 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2292) -> Result<Response, XrpcError> { 2293 list_records::<LabelOpRecord, LabelOp<DefaultStr>, _>(&state, q).await 2294} 2295 2296async fn count_label_ops( 2297 State(state): State<AppState>, 2298 XrpcQuery(q): XrpcQuery<CountQuery>, 2299) -> Result<Json<CountResponse>, XrpcError> { 2300 count_for::<LabelOpRecord>(&state, q).map(Json) 2301} 2302 2303async fn list_pipelines( 2304 State(state): State<AppState>, 2305 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2306) -> Result<Response, XrpcError> { 2307 list_records::<PipelineRecord, Pipeline<DefaultStr>, _>(&state, q).await 2308} 2309 2310async fn count_pipelines( 2311 State(state): State<AppState>, 2312 XrpcQuery(q): XrpcQuery<CountQuery>, 2313) -> Result<Json<CountResponse>, XrpcError> { 2314 count_for::<PipelineRecord>(&state, q).map(Json) 2315} 2316 2317async fn list_pipeline_statuses( 2318 State(state): State<AppState>, 2319 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2320) -> Result<Response, XrpcError> { 2321 list_records::<PipelineStatusRecord, PipelineStatus<DefaultStr>, _>(&state, q).await 2322} 2323 2324async fn count_pipeline_statuses( 2325 State(state): State<AppState>, 2326 XrpcQuery(q): XrpcQuery<CountQuery>, 2327) -> Result<Json<CountResponse>, XrpcError> { 2328 count_for::<PipelineStatusRecord>(&state, q).map(Json) 2329} 2330 2331async fn list_artifacts( 2332 State(state): State<AppState>, 2333 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2334) -> Result<Response, XrpcError> { 2335 list_records::<ArtifactRecord, Artifact<DefaultStr>, _>(&state, q).await 2336} 2337 2338async fn count_artifacts( 2339 State(state): State<AppState>, 2340 XrpcQuery(q): XrpcQuery<CountQuery>, 2341) -> Result<Json<CountResponse>, XrpcError> { 2342 count_for::<ArtifactRecord>(&state, q).map(Json) 2343} 2344 2345async fn list_knot_members( 2346 State(state): State<AppState>, 2347 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2348) -> Result<Response, XrpcError> { 2349 list_records::<KnotMemberRecord, KnotMember<DefaultStr>, _>(&state, q).await 2350} 2351 2352async fn count_knot_members( 2353 State(state): State<AppState>, 2354 XrpcQuery(q): XrpcQuery<CountQuery>, 2355) -> Result<Json<CountResponse>, XrpcError> { 2356 count_for::<KnotMemberRecord>(&state, q).map(Json) 2357} 2358 2359async fn list_spindle_members( 2360 State(state): State<AppState>, 2361 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2362) -> Result<Response, XrpcError> { 2363 list_records::<SpindleMemberRecord, SpindleMember<DefaultStr>, _>(&state, q).await 2364} 2365 2366async fn count_spindle_members( 2367 State(state): State<AppState>, 2368 XrpcQuery(q): XrpcQuery<CountQuery>, 2369) -> Result<Json<CountResponse>, XrpcError> { 2370 count_for::<SpindleMemberRecord>(&state, q).map(Json) 2371} 2372 2373async fn list_strings( 2374 State(state): State<AppState>, 2375 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2376) -> Result<Response, XrpcError> { 2377 list_records::<TangledStringRecord, TangledString<DefaultStr>, _>(&state, q).await 2378} 2379 2380async fn count_strings( 2381 State(state): State<AppState>, 2382 XrpcQuery(q): XrpcQuery<CountQuery>, 2383) -> Result<Json<CountResponse>, XrpcError> { 2384 count_for::<TangledStringRecord>(&state, q).map(Json) 2385} 2386 2387#[derive(Deserialize)] 2388struct ResolveMiniDocParams { 2389 identifier: AtIdentifier<DefaultStr>, 2390} 2391 2392async fn resolve_mini_doc( 2393 State(state): State<AppState>, 2394 XrpcQuery(q): XrpcQuery<ResolveMiniDocParams>, 2395) -> Result<Response, XrpcError> { 2396 let body = state 2397 .slingshot 2398 .resolve_mini_doc(&q.identifier) 2399 .await 2400 .map_err(map_slingshot)?; 2401 Ok((StatusCode::OK, [(CONTENT_TYPE, "application/json")], body).into_response()) 2402} 2403 2404async fn get_coverage(State(state): State<AppState>) -> Json<CoverageEnvelope> { 2405 Json(state.coverage.snapshot().into()) 2406} 2407 2408async fn search_query( 2409 State(state): State<AppState>, 2410 XrpcQuery(q): XrpcQuery<SearchQueryParams>, 2411) -> Result<Response, XrpcError> { 2412 if q.q.trim().is_empty() { 2413 return Err(XrpcError::InvalidParams("q must not be empty".into())); 2414 } 2415 let cursor = SearchCursor::from_token(q.cursor.as_deref()) 2416 .map_err(|e| XrpcError::InvalidParams(format!("cursor: {e}")))?; 2417 let limit = parse_limit(q.limit)?; 2418 let filters = build_search_filters(&q)?; 2419 let permit = state.heavy_permit()?; 2420 let page = state 2421 .search 2422 .search(&q.q, filters, cursor, limit.get()) 2423 .await 2424 .map_err(map_search_err)?; 2425 let next = page.next.map(SearchOffset::encode_token); 2426 let owned = state.clone(); 2427 let hits = hydrate_stream(page.hits, move |hit| { 2428 let owned = owned.clone(); 2429 let uri = hit.uri.clone(); 2430 let nsid = hit.nsid.clone(); 2431 async move { 2432 let result = hydrate_search_hit(&owned, hit).await; 2433 drop_unhydratable(HitProvenance::Indexed, &nsid, &uri, result) 2434 } 2435 }); 2436 Ok(json_stream::<SearchHitView, _>( 2437 "hits", 2438 hits, 2439 paged_tail(next), 2440 permit, 2441 )) 2442} 2443 2444fn build_search_filters(q: &SearchQueryParams) -> Result<SearchFilters, XrpcError> { 2445 let since = q 2446 .since 2447 .as_deref() 2448 .map(parse_rfc3339_seconds) 2449 .transpose() 2450 .map_err(|e| XrpcError::InvalidParams(format!("since: {e}")))?; 2451 let until = q 2452 .until 2453 .as_deref() 2454 .map(parse_rfc3339_seconds) 2455 .transpose() 2456 .map_err(|e| XrpcError::InvalidParams(format!("until: {e}")))?; 2457 if let (Some(s), Some(u)) = (since, until) 2458 && s > u 2459 { 2460 return Err(XrpcError::InvalidParams("since must be <= until".into())); 2461 } 2462 Ok(SearchFilters { 2463 nsid: q.nsid.clone(), 2464 author: q.author.clone(), 2465 repo: q.repo.clone(), 2466 since, 2467 until, 2468 }) 2469} 2470 2471fn parse_rfc3339_seconds(raw: &str) -> Result<i64, String> { 2472 chrono::DateTime::parse_from_rfc3339(raw) 2473 .map(|dt| dt.timestamp()) 2474 .map_err(|e| format!("expected RFC3339, got {raw}: {e}")) 2475} 2476 2477async fn hydrate_search_hit( 2478 state: &AppState, 2479 hit: SearchHit, 2480) -> Result<Option<SearchHitView>, XrpcError> { 2481 let SearchHit { uri, nsid, score } = hit; 2482 let body = resolve_for_view(state, &nsid, uri).await?; 2483 let record = decode_canon_or_upgrade(&nsid, &body.value, &state.resolver) 2484 .await 2485 .map_err(|err| XrpcError::InvalidRecord(err.to_string()))?; 2486 let Some(value) = SearchableRecord::try_from_record(record) else { 2487 return Ok(None); 2488 }; 2489 let Some(value) = value.normalize(&state.resolver).await else { 2490 return Ok(None); 2491 }; 2492 Ok(Some(SearchHitView { 2493 uri: body.uri.clone(), 2494 cid: Some(body.cid.clone()), 2495 nsid, 2496 score, 2497 value, 2498 })) 2499} 2500 2501fn map_search_err(err: SearchError) -> XrpcError { 2502 use SearchError as E; 2503 match err { 2504 E::Query(e) => XrpcError::InvalidParams(format!("query: {e}")), 2505 e @ (E::Tantivy(_) 2506 | E::InvalidUri(_) 2507 | E::InvalidNsid(_) 2508 | E::MissingField(_) 2509 | E::Cancelled(_)) => XrpcError::Internal(format!("search: {e}")), 2510 } 2511} 2512 2513fn map_proxy_error(err: KnotProxyError) -> XrpcError { 2514 match err { 2515 KnotProxyError::CircuitOpen => { 2516 XrpcError::UpstreamUnavailable("knot circuit breaker open".into()) 2517 } 2518 KnotProxyError::BlockedHost { host, reason } => { 2519 XrpcError::InvalidRecord(format!("knot host {host} is {reason} address space")) 2520 } 2521 KnotProxyError::PlaintextHttp { host } => { 2522 XrpcError::InvalidRecord(format!("knot host {host} requires https")) 2523 } 2524 KnotProxyError::Connect(e) => XrpcError::UpstreamUnavailable(format!("connect: {e}")), 2525 KnotProxyError::Timeout(e) => { 2526 XrpcError::UpstreamUnavailable(format!("upstream timeout: {e}")) 2527 } 2528 KnotProxyError::Redirect(e) => XrpcError::UpstreamUnavailable(format!("redirect: {e}")), 2529 KnotProxyError::Transport(e) => XrpcError::UpstreamUnavailable(format!("transport: {e}")), 2530 KnotProxyError::Upstream(s) => XrpcError::UpstreamUnavailable(format!("status {s}")), 2531 } 2532} 2533 2534fn validate_client_supplied_knot(state: &AppState, host: &KnotHost) -> Result<(), XrpcError> { 2535 let host_str = || host.url().host_str().unwrap_or_default().to_owned(); 2536 if state.knots.requires_https() && host.url().scheme() != "https" { 2537 return Err(XrpcError::InvalidParams(format!( 2538 "knot host {} must be https", 2539 host_str(), 2540 ))); 2541 } 2542 if state.knots.allows_private_hosts() { 2543 return Ok(()); 2544 } 2545 match host.private_literal_reason() { 2546 None => Ok(()), 2547 Some(reason) => Err(XrpcError::InvalidParams(format!( 2548 "knot host {} blocked: {} address space", 2549 host_str(), 2550 reason, 2551 ))), 2552 } 2553} 2554 2555async fn resolve_knot_target( 2556 state: &AppState, 2557 repo_uri: AtUri<DefaultStr>, 2558) -> Result<(KnotHost, RepoSlug), XrpcError> { 2559 let rkey: Option<Rkey<DefaultStr>> = repo_uri.rkey().map(|r| r.clone().into_static()); 2560 let (body, did) = resolve(state, ExpectedNsid::from_static(RepoRecord::NSID), repo_uri).await?; 2561 let value: Repo<DefaultStr> = serde_json::from_slice(&body.value) 2562 .map_err(|e| XrpcError::InvalidRecord(format!("decode repo record: {e}")))?; 2563 let host = KnotHost::parse(value.knot.as_ref()) 2564 .map_err(|e| XrpcError::InvalidRecord(format!("knot field: {e}")))?; 2565 let name = pick_human_slug(rkey.as_ref(), value.name.as_deref()).ok_or_else(|| { 2566 XrpcError::InvalidRecord("at-uri missing rkey and record missing name".to_string()) 2567 })?; 2568 let slug = RepoSlug::new(&did, &name) 2569 .map_err(|e| XrpcError::InvalidRecord(format!("repo slug: {e}")))?; 2570 Ok((host, slug)) 2571} 2572 2573fn pick_human_slug(rkey: Option<&Rkey<DefaultStr>>, name: Option<&str>) -> Option<String> { 2574 match rkey { 2575 Some(r) if jacquard_common::types::tid::Tid::new(r.as_ref()).is_ok() => { 2576 Some(name.unwrap_or(r.as_ref()).to_owned()) 2577 } 2578 Some(r) => Some(r.as_ref().to_owned()), 2579 None => name.map(str::to_owned), 2580 } 2581} 2582 2583fn filter_request_headers(client: &HeaderMap) -> HeaderMap { 2584 FORWARDED_REQUEST_HEADERS 2585 .iter() 2586 .fold(HeaderMap::new(), |mut acc, name| { 2587 if let Some(value) = client.get(*name) { 2588 acc.insert((*name).clone(), value.clone()); 2589 } 2590 acc 2591 }) 2592} 2593 2594fn upstream_to_axum(resp: ProxyResponse) -> Response { 2595 let status = resp.status(); 2596 let upstream_headers = resp.headers().clone(); 2597 let body = Body::from_stream(resp.into_body_stream()); 2598 let mut response = Response::builder() 2599 .status(status) 2600 .body(body) 2601 .expect("response body construction must succeed"); 2602 let response_headers = response.headers_mut(); 2603 PASSTHROUGH_HEADERS.iter().for_each(|name| { 2604 if let Some(value) = upstream_headers.get(*name) { 2605 response_headers.insert((*name).clone(), value.clone()); 2606 } 2607 }); 2608 response 2609} 2610 2611async fn dispatch_proxy( 2612 state: AppState, 2613 headers: HeaderMap, 2614 nsid: Nsid<DefaultStr>, 2615 host: KnotHost, 2616 params: ProxyParams, 2617) -> Result<Response, XrpcError> { 2618 let forward: Vec<(&str, &str)> = params 2619 .iter() 2620 .map(|(k, v)| (k.as_str(), v.as_str())) 2621 .collect(); 2622 let allowed = filter_request_headers(&headers); 2623 let upstream = state 2624 .knots 2625 .forward(&host, &nsid, &forward, allowed) 2626 .await 2627 .map_err(map_proxy_error)?; 2628 Ok(upstream_to_axum(upstream)) 2629} 2630 2631fn extract_param( 2632 params: ProxyParams, 2633 key: &str, 2634) -> Result<Option<(String, ProxyParams)>, XrpcError> { 2635 let (matching, rest): (ProxyParams, ProxyParams) = 2636 params.into_iter().partition(|(k, _)| k == key); 2637 match matching.as_slice() { 2638 [] => Ok(None), 2639 [_] => Ok(matching.into_iter().next().map(|(_, v)| (v, rest))), 2640 _ => Err(XrpcError::InvalidParams(format!( 2641 "{key} parameter must appear at most once, got {}", 2642 matching.len(), 2643 ))), 2644 } 2645} 2646 2647async fn proxy_repo_handler( 2648 state: AppState, 2649 headers: HeaderMap, 2650 params: ProxyParams, 2651 nsid: Nsid<DefaultStr>, 2652) -> Result<Response, XrpcError> { 2653 let (repo_raw, rest) = extract_param(params, REPO_PARAM)? 2654 .ok_or_else(|| XrpcError::InvalidParams("missing repo".into()))?; 2655 let repo_uri = parse_uri(&repo_raw)?; 2656 let (host, slug) = resolve_knot_target(&state, repo_uri).await?; 2657 let forward = rest 2658 .into_iter() 2659 .chain(std::iter::once(( 2660 REPO_PARAM.to_owned(), 2661 slug.as_str().to_owned(), 2662 ))) 2663 .collect(); 2664 dispatch_proxy(state, headers, nsid, host, forward).await 2665} 2666 2667async fn proxy_knot_handler( 2668 state: AppState, 2669 headers: HeaderMap, 2670 params: ProxyParams, 2671 nsid: Nsid<DefaultStr>, 2672) -> Result<Response, XrpcError> { 2673 let (knot_raw, forward) = extract_param(params, KNOT_HOST_PARAM)? 2674 .ok_or_else(|| XrpcError::InvalidParams("missing knot".into()))?; 2675 let host = 2676 KnotHost::parse(&knot_raw).map_err(|e| XrpcError::InvalidParams(format!("knot: {e}")))?; 2677 validate_client_supplied_knot(&state, &host)?; 2678 dispatch_proxy(state, headers, nsid, host, forward).await 2679}