Monorepo for Tangled tangled.org
10

Configure Feed

Select the types of activity you want to include in your feed.

1use std::future::Future; 2use std::sync::Arc; 3 4use bobbin_resolver::{ 5 NormalizeRepoRefs, decode_canon_or_upgrade, normalize_record_fields, scrub_record_bytes, 6 upgrade_wire_bytes, 7}; 8 9use axum::{ 10 Router, 11 body::Body, 12 extract::{FromRequestParts, Query, RawQuery, State, rejection::QueryRejection}, 13 http::{ 14 HeaderMap, HeaderName, StatusCode, 15 header::{ 16 ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LENGTH, 17 CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_RANGE, 18 LAST_MODIFIED, RANGE, 19 }, 20 request::Parts, 21 }, 22 response::{IntoResponse, Json, Response}, 23 routing::get, 24}; 25use bobbin_edge_index::{ 26 Coverage, CoverageWatch, CursorParseError, EdgeItem, EdgePage, EdgeStore, IssueStateKind, 27 PageCursor, PageLimit, PageToken, PullStatusKind, SortDir, StateIndex, StateKind, 28}; 29use bobbin_knot_proxy::{KnotHost, KnotProxy, KnotProxyError, ProxyResponse, RepoSlug}; 30use bobbin_record_lru::RecordStore; 31use bobbin_resolver::RepoIdResolver; 32use bobbin_search::{ 33 SearchCursor, SearchError, SearchFilters, SearchHit, SearchOffset, SearchReader, 34}; 35use bobbin_slingshot_client::{SlingshotClient, SlingshotError}; 36use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static}; 37use bobbin_types::knot_acl::{KnotOwnedSource, decode_knot_owned_source, knot_did_host}; 38use bobbin_types::record::RecordBody; 39use bobbin_types::search::SearchableRecord; 40use bobbin_types::sh_tangled::actor::profile::{Profile, ProfileGetRecordOutput, ProfileRecord}; 41use bobbin_types::sh_tangled::feed::comment::{ 42 Comment as FeedComment, CommentRecord as FeedCommentRecord, 43}; 44use bobbin_types::sh_tangled::feed::reaction::{Reaction, ReactionRecord}; 45use bobbin_types::sh_tangled::feed::star::{Star, StarRecord}; 46use bobbin_types::sh_tangled::git::ref_update::{RefUpdate, RefUpdateRecord}; 47use bobbin_types::sh_tangled::graph::follow::{Follow, FollowRecord}; 48use bobbin_types::sh_tangled::graph::vouch::{Vouch, VouchRecord}; 49use bobbin_types::sh_tangled::knot::member::{ 50 Member as KnotMember, MemberRecord as KnotMemberRecord, 51}; 52use bobbin_types::sh_tangled::knot::{Knot, KnotRecord}; 53use bobbin_types::sh_tangled::label::definition::{ 54 Definition as LabelDefinition, DefinitionRecord as LabelDefinitionRecord, 55}; 56use bobbin_types::sh_tangled::label::op::{Op as LabelOp, OpRecord as LabelOpRecord}; 57use bobbin_types::sh_tangled::pipeline::status::{ 58 Status as PipelineStatus, StatusRecord as PipelineStatusRecord, 59}; 60use bobbin_types::sh_tangled::pipeline::{Pipeline, PipelineRecord}; 61use bobbin_types::sh_tangled::public_key::{PublicKey, PublicKeyRecord}; 62use bobbin_types::sh_tangled::repo::artifact::{Artifact, ArtifactRecord}; 63use bobbin_types::sh_tangled::repo::collaborator::{Collaborator, CollaboratorRecord}; 64use bobbin_types::sh_tangled::repo::issue::state::{ 65 State as IssueState, StateRecord as IssueStateRecord, 66}; 67use bobbin_types::sh_tangled::repo::issue::{Issue, IssueGetRecordOutput, IssueRecord}; 68use bobbin_types::sh_tangled::repo::pull::status::{ 69 Status as PullStatus, StatusRecord as PullStatusRecord, 70}; 71use bobbin_types::sh_tangled::repo::pull::{Pull, PullGetRecordOutput, PullRecord}; 72use bobbin_types::sh_tangled::repo::{Repo, RepoGetRecordOutput, RepoRecord}; 73use bobbin_types::sh_tangled::spindle::member::{ 74 Member as SpindleMember, MemberRecord as SpindleMemberRecord, 75}; 76use bobbin_types::sh_tangled::spindle::{Spindle, SpindleRecord}; 77use bobbin_types::sh_tangled::string::{TangledString, TangledStringRecord}; 78use futures::Stream; 79use futures::stream::{self, StreamExt, TryStreamExt}; 80use jacquard_common::types::did::Did; 81use jacquard_common::types::ident::AtIdentifier; 82use jacquard_common::types::nsid::Nsid; 83use jacquard_common::types::recordkey::Rkey; 84use jacquard_common::types::string::{AtUri, Cid}; 85use jacquard_common::xrpc::XrpcResp; 86use jacquard_common::{DefaultStr, IntoStatic}; 87use serde::{Deserialize, Serialize}; 88use std::convert::Infallible; 89use std::time::Duration; 90use thiserror::Error; 91use url::form_urlencoded; 92 93use tower_http::classify::ServerErrorsFailureClass; 94use tower_http::trace::{DefaultMakeSpan, OnFailure, OnResponse, TraceLayer}; 95use tracing::{Level, Span}; 96 97mod backpressure; 98mod filter; 99 100pub use backpressure::{ 101 HeavyLimiter, HeavyPermit, MaxInFlight, PerRequestAnonBytes, PressureVerdict, ReservedFloor, 102}; 103use filter::{IssueFilter, ListFilter, NoFilter, PullFilter}; 104 105const DEFAULT_LIMIT: u32 = 50; 106const FETCH_CONCURRENCY: usize = 8; 107 108#[derive(Clone)] 109pub struct AppState { 110 pub records: Arc<dyn RecordStore>, 111 pub slingshot: SlingshotClient, 112 pub edges: Arc<EdgeStore>, 113 pub issue_states: Arc<StateIndex<IssueStateKind>>, 114 pub pull_statuses: Arc<StateIndex<PullStatusKind>>, 115 pub coverage: Arc<CoverageWatch>, 116 pub knots: Arc<KnotProxy>, 117 pub search: Arc<dyn SearchReader>, 118 pub resolver: Arc<RepoIdResolver>, 119 pub limiter: Option<Arc<HeavyLimiter>>, 120} 121 122impl AppState { 123 #[allow(clippy::too_many_arguments)] 124 pub fn new( 125 records: Arc<dyn RecordStore>, 126 slingshot: SlingshotClient, 127 edges: Arc<EdgeStore>, 128 issue_states: Arc<StateIndex<IssueStateKind>>, 129 pull_statuses: Arc<StateIndex<PullStatusKind>>, 130 coverage: Arc<CoverageWatch>, 131 knots: Arc<KnotProxy>, 132 search: Arc<dyn SearchReader>, 133 resolver: Arc<RepoIdResolver>, 134 ) -> Self { 135 Self { 136 records, 137 slingshot, 138 edges, 139 issue_states, 140 pull_statuses, 141 coverage, 142 knots, 143 search, 144 resolver, 145 limiter: None, 146 } 147 } 148 149 pub fn with_limiter(mut self, limiter: Option<Arc<HeavyLimiter>>) -> Self { 150 self.limiter = limiter; 151 self 152 } 153 154 fn heavy_permit(&self) -> Result<Option<HeavyPermit>, XrpcError> { 155 self.limiter.as_ref().map(|l| l.try_enter()).transpose() 156 } 157} 158 159pub fn router(state: AppState) -> Router { 160 Router::new() 161 .route("/xrpc/sh.tangled.repo.getRepo", get(get_repo)) 162 .route("/xrpc/sh.tangled.repo.getRepos", get(get_repos)) 163 .route( 164 "/xrpc/sh.tangled.repo.getRepoByRepoDid", 165 get(get_repo_by_repo_did), 166 ) 167 .route("/xrpc/sh.tangled.actor.getProfile", get(get_profile)) 168 .route("/xrpc/sh.tangled.actor.getProfiles", get(get_profiles)) 169 .route("/xrpc/sh.tangled.repo.getIssue", get(get_issue)) 170 .route("/xrpc/sh.tangled.repo.getIssues", get(get_issues)) 171 .route("/xrpc/sh.tangled.repo.getPull", get(get_pull)) 172 .route("/xrpc/sh.tangled.repo.getPulls", get(get_pulls)) 173 .route("/xrpc/sh.tangled.feed.listStars", get(list_stars)) 174 .route("/xrpc/sh.tangled.feed.countStars", get(count_stars)) 175 .route("/xrpc/sh.tangled.graph.listFollows", get(list_follows)) 176 .route("/xrpc/sh.tangled.graph.countFollows", get(count_follows)) 177 .route("/xrpc/sh.tangled.repo.listIssues", get(list_issues)) 178 .route("/xrpc/sh.tangled.repo.countIssues", get(count_issues)) 179 .route("/xrpc/sh.tangled.repo.listPulls", get(list_pulls)) 180 .route("/xrpc/sh.tangled.repo.countPulls", get(count_pulls)) 181 .route( 182 "/xrpc/sh.tangled.feed.listComments", 183 get(list_feed_comments), 184 ) 185 .route( 186 "/xrpc/sh.tangled.feed.countComments", 187 get(count_feed_comments), 188 ) 189 .route("/xrpc/sh.tangled.feed.listReactions", get(list_reactions)) 190 .route("/xrpc/sh.tangled.feed.countReactions", get(count_reactions)) 191 .route("/xrpc/sh.tangled.git.listRefUpdates", get(list_ref_updates)) 192 .route( 193 "/xrpc/sh.tangled.git.countRefUpdates", 194 get(count_ref_updates), 195 ) 196 .route( 197 "/xrpc/sh.tangled.repo.listCollaborators", 198 get(list_collaborators), 199 ) 200 .route( 201 "/xrpc/sh.tangled.repo.countCollaborators", 202 get(count_collaborators), 203 ) 204 .route( 205 "/xrpc/sh.tangled.repo.issue.listStates", 206 get(list_issue_states), 207 ) 208 .route( 209 "/xrpc/sh.tangled.repo.issue.countStates", 210 get(count_issue_states), 211 ) 212 .route( 213 "/xrpc/sh.tangled.repo.pull.listStatuses", 214 get(list_pull_statuses), 215 ) 216 .route( 217 "/xrpc/sh.tangled.repo.pull.countStatuses", 218 get(count_pull_statuses), 219 ) 220 .route("/xrpc/sh.tangled.repo.listRepos", get(list_repos)) 221 .route("/xrpc/sh.tangled.repo.countRepos", get(count_repos)) 222 .route("/xrpc/sh.tangled.knot.listKnots", get(list_knots)) 223 .route("/xrpc/sh.tangled.knot.countKnots", get(count_knots)) 224 .route("/xrpc/sh.tangled.spindle.listSpindles", get(list_spindles)) 225 .route( 226 "/xrpc/sh.tangled.spindle.countSpindles", 227 get(count_spindles), 228 ) 229 .route("/xrpc/sh.tangled.publicKey.listKeys", get(list_public_keys)) 230 .route( 231 "/xrpc/sh.tangled.publicKey.countKeys", 232 get(count_public_keys), 233 ) 234 .route("/xrpc/sh.tangled.graph.listVouches", get(list_vouches)) 235 .route("/xrpc/sh.tangled.graph.countVouches", get(count_vouches)) 236 .route("/xrpc/sh.tangled.feed.listStarsBy", get(list_stars_by)) 237 .route("/xrpc/sh.tangled.feed.countStarsBy", get(count_stars_by)) 238 .route( 239 "/xrpc/sh.tangled.feed.listReactionsBy", 240 get(list_reactions_by), 241 ) 242 .route( 243 "/xrpc/sh.tangled.feed.countReactionsBy", 244 get(count_reactions_by), 245 ) 246 .route("/xrpc/sh.tangled.graph.listFollowsBy", get(list_follows_by)) 247 .route( 248 "/xrpc/sh.tangled.graph.countFollowsBy", 249 get(count_follows_by), 250 ) 251 .route("/xrpc/sh.tangled.graph.listVouchesBy", get(list_vouches_by)) 252 .route( 253 "/xrpc/sh.tangled.graph.countVouchesBy", 254 get(count_vouches_by), 255 ) 256 .route( 257 "/xrpc/sh.tangled.git.listRefUpdatesBy", 258 get(list_ref_updates_by), 259 ) 260 .route( 261 "/xrpc/sh.tangled.git.countRefUpdatesBy", 262 get(count_ref_updates_by), 263 ) 264 .route( 265 "/xrpc/sh.tangled.knot.listMembersBy", 266 get(list_knot_members_by), 267 ) 268 .route( 269 "/xrpc/sh.tangled.knot.countMembersBy", 270 get(count_knot_members_by), 271 ) 272 .route("/xrpc/sh.tangled.label.listOpsBy", get(list_label_ops_by)) 273 .route("/xrpc/sh.tangled.label.countOpsBy", get(count_label_ops_by)) 274 .route( 275 "/xrpc/sh.tangled.pipeline.listPipelinesBy", 276 get(list_pipelines_by), 277 ) 278 .route( 279 "/xrpc/sh.tangled.pipeline.countPipelinesBy", 280 get(count_pipelines_by), 281 ) 282 .route( 283 "/xrpc/sh.tangled.pipeline.listStatusesBy", 284 get(list_pipeline_statuses_by), 285 ) 286 .route( 287 "/xrpc/sh.tangled.pipeline.countStatusesBy", 288 get(count_pipeline_statuses_by), 289 ) 290 .route( 291 "/xrpc/sh.tangled.repo.listArtifactsBy", 292 get(list_artifacts_by), 293 ) 294 .route( 295 "/xrpc/sh.tangled.repo.countArtifactsBy", 296 get(count_artifacts_by), 297 ) 298 .route( 299 "/xrpc/sh.tangled.repo.listCollaboratorsBy", 300 get(list_collaborators_by), 301 ) 302 .route( 303 "/xrpc/sh.tangled.repo.countCollaboratorsBy", 304 get(count_collaborators_by), 305 ) 306 .route("/xrpc/sh.tangled.repo.listIssuesBy", get(list_issues_by)) 307 .route("/xrpc/sh.tangled.repo.countIssuesBy", get(count_issues_by)) 308 .route( 309 "/xrpc/sh.tangled.feed.listCommentsBy", 310 get(list_feed_comments_by), 311 ) 312 .route( 313 "/xrpc/sh.tangled.feed.countCommentsBy", 314 get(count_feed_comments_by), 315 ) 316 .route( 317 "/xrpc/sh.tangled.repo.issue.listStatesBy", 318 get(list_issue_states_by), 319 ) 320 .route( 321 "/xrpc/sh.tangled.repo.issue.countStatesBy", 322 get(count_issue_states_by), 323 ) 324 .route("/xrpc/sh.tangled.repo.listPullsBy", get(list_pulls_by)) 325 .route("/xrpc/sh.tangled.repo.countPullsBy", get(count_pulls_by)) 326 .route( 327 "/xrpc/sh.tangled.repo.pull.listStatusesBy", 328 get(list_pull_statuses_by), 329 ) 330 .route( 331 "/xrpc/sh.tangled.repo.pull.countStatusesBy", 332 get(count_pull_statuses_by), 333 ) 334 .route( 335 "/xrpc/sh.tangled.spindle.listMembersBy", 336 get(list_spindle_members_by), 337 ) 338 .route( 339 "/xrpc/sh.tangled.spindle.countMembersBy", 340 get(count_spindle_members_by), 341 ) 342 .route( 343 "/xrpc/sh.tangled.label.listDefinitions", 344 get(list_label_definitions), 345 ) 346 .route( 347 "/xrpc/sh.tangled.label.countDefinitions", 348 get(count_label_definitions), 349 ) 350 .route("/xrpc/sh.tangled.label.listOps", get(list_label_ops)) 351 .route("/xrpc/sh.tangled.label.countOps", get(count_label_ops)) 352 .route( 353 "/xrpc/sh.tangled.pipeline.listPipelines", 354 get(list_pipelines), 355 ) 356 .route( 357 "/xrpc/sh.tangled.pipeline.countPipelines", 358 get(count_pipelines), 359 ) 360 .route( 361 "/xrpc/sh.tangled.pipeline.listStatuses", 362 get(list_pipeline_statuses), 363 ) 364 .route( 365 "/xrpc/sh.tangled.pipeline.countStatuses", 366 get(count_pipeline_statuses), 367 ) 368 .route("/xrpc/sh.tangled.repo.listArtifacts", get(list_artifacts)) 369 .route("/xrpc/sh.tangled.repo.countArtifacts", get(count_artifacts)) 370 .route("/xrpc/sh.tangled.knot.listMembers", get(list_knot_members)) 371 .route( 372 "/xrpc/sh.tangled.knot.countMembers", 373 get(count_knot_members), 374 ) 375 .route( 376 "/xrpc/sh.tangled.spindle.listMembers", 377 get(list_spindle_members), 378 ) 379 .route( 380 "/xrpc/sh.tangled.spindle.countMembers", 381 get(count_spindle_members), 382 ) 383 .route("/xrpc/sh.tangled.string.listStrings", get(list_strings)) 384 .route("/xrpc/sh.tangled.string.countStrings", get(count_strings)) 385 .route("/xrpc/sh.tangled.search.query", get(search_query)) 386 .route("/xrpc/sh.tangled.bobbin.getCoverage", get(get_coverage)) 387 .route( 388 "/xrpc/com.bad-example.identity.resolveMiniDoc", 389 get(resolve_mini_doc), 390 ) 391 .merge(knot_proxied_routes()) 392 .layer( 393 TraceLayer::new_for_http() 394 .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) 395 .on_request(()) 396 .on_response(LatencyFreeTrace) 397 .on_failure(LatencyFreeTrace), 398 ) 399 .with_state(state) 400} 401 402#[derive(Clone, Copy, Debug)] 403struct LatencyFreeTrace; 404 405impl<B> OnResponse<B> for LatencyFreeTrace { 406 fn on_response(self, response: &Response<B>, _latency: Duration, _span: &Span) { 407 tracing::event!( 408 target: "tower_http::trace::on_response", 409 Level::INFO, 410 status = response.status().as_u16(), 411 "request completed", 412 ); 413 } 414} 415 416impl OnFailure<ServerErrorsFailureClass> for LatencyFreeTrace { 417 fn on_failure(&mut self, error: ServerErrorsFailureClass, _latency: Duration, _span: &Span) { 418 tracing::event!( 419 target: "tower_http::trace::on_failure", 420 Level::WARN, 421 error = %error, 422 "request failed", 423 ); 424 } 425} 426 427const REPO_PROXIED_NSIDS: &[&str] = &[ 428 "sh.tangled.repo.archive", 429 "sh.tangled.repo.blob", 430 "sh.tangled.repo.branch", 431 "sh.tangled.repo.branches", 432 "sh.tangled.repo.compare", 433 "sh.tangled.repo.describeRepo", 434 "sh.tangled.repo.diff", 435 "sh.tangled.repo.getDefaultBranch", 436 "sh.tangled.repo.languages", 437 "sh.tangled.repo.listSecrets", 438 "sh.tangled.repo.log", 439 "sh.tangled.repo.tag", 440 "sh.tangled.repo.tags", 441 "sh.tangled.repo.tree", 442]; 443 444const KNOT_PROXIED_NSIDS: &[&str] = &[ 445 "sh.tangled.owner", 446 "sh.tangled.knot.version", 447 "sh.tangled.knot.listKeys", 448]; 449 450const PASSTHROUGH_HEADERS: &[&HeaderName] = &[ 451 &CONTENT_TYPE, 452 &CONTENT_LENGTH, 453 &CONTENT_ENCODING, 454 &ETAG, 455 &CACHE_CONTROL, 456 &LAST_MODIFIED, 457 &CONTENT_DISPOSITION, 458 &ACCEPT_RANGES, 459 &CONTENT_RANGE, 460]; 461 462const FORWARDED_REQUEST_HEADERS: &[&HeaderName] = 463 &[&RANGE, &IF_RANGE, &IF_NONE_MATCH, &IF_MODIFIED_SINCE]; 464 465const KNOT_HOST_PARAM: &str = "knot"; 466const REPO_PARAM: &str = "repo"; 467 468type ProxyParams = Vec<(String, String)>; 469 470fn knot_proxied_routes() -> Router<AppState> { 471 let with_repo = register_proxied(Router::new(), REPO_PROXIED_NSIDS, proxy_repo_handler); 472 register_proxied(with_repo, KNOT_PROXIED_NSIDS, proxy_knot_handler) 473} 474 475fn register_proxied<H, Fut>( 476 router: Router<AppState>, 477 nsids: &[&'static str], 478 handler: H, 479) -> Router<AppState> 480where 481 H: Fn(AppState, HeaderMap, ProxyParams, Nsid<DefaultStr>) -> Fut 482 + Clone 483 + Send 484 + Sync 485 + 'static, 486 Fut: Future<Output = Result<Response, XrpcError>> + Send + 'static, 487{ 488 nsids.iter().fold(router, |router, &nsid_lit| { 489 let handler = handler.clone(); 490 let nsid = nsid_static(nsid_lit); 491 router.route( 492 &format!("/xrpc/{nsid_lit}"), 493 get( 494 move |State(state): State<AppState>, 495 headers: HeaderMap, 496 Query(params): Query<ProxyParams>| { 497 handler(state, headers, params, nsid.clone()) 498 }, 499 ), 500 ) 501 }) 502} 503 504#[derive(Clone, Debug)] 505pub enum SubjectQuery { 506 Did(Did<DefaultStr>), 507 Uri(AtUri<DefaultStr>), 508} 509 510impl<'de> Deserialize<'de> for SubjectQuery { 511 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 512 where 513 D: serde::Deserializer<'de>, 514 { 515 let raw = String::deserialize(deserializer)?; 516 if let Ok(did) = Did::<DefaultStr>::new_owned(&raw) { 517 return Ok(Self::Did(did)); 518 } 519 AtUri::<DefaultStr>::new_owned(&raw) 520 .map(Self::Uri) 521 .map_err(serde::de::Error::custom) 522 } 523} 524 525#[derive(Clone, Debug, Eq, PartialEq)] 526pub struct ExpectedNsid { 527 canon: Nsid<DefaultStr>, 528 aliases: &'static [&'static str], 529} 530 531const FEED_COMMENT_LEGACY_ALIASES: &[&str] = &[ 532 "sh.tangled.repo.issue.comment", 533 "sh.tangled.repo.pull.comment", 534]; 535 536fn aliases_for(nsid: &str) -> &'static [&'static str] { 537 match nsid { 538 "sh.tangled.feed.comment" => FEED_COMMENT_LEGACY_ALIASES, 539 _ => &[], 540 } 541} 542 543impl ExpectedNsid { 544 pub fn new(nsid: Nsid<DefaultStr>) -> Self { 545 let aliases = aliases_for(nsid.as_ref()); 546 Self { 547 canon: nsid, 548 aliases, 549 } 550 } 551 552 pub fn from_static(s: &'static str) -> Self { 553 let canon = nsid_static(s); 554 let aliases = aliases_for(s); 555 Self { canon, aliases } 556 } 557 558 pub fn as_nsid(&self) -> &Nsid<DefaultStr> { 559 &self.canon 560 } 561 562 pub fn as_str(&self) -> &str { 563 self.canon.as_ref() 564 } 565 566 fn accepts(&self, other: &str) -> bool { 567 other == self.canon.as_ref() || self.aliases.contains(&other) 568 } 569} 570 571#[derive(Debug, Deserialize)] 572struct GetRepoQuery { 573 repo: AtUri<DefaultStr>, 574} 575 576#[derive(Debug, Deserialize)] 577struct GetRepoByRepoDidQuery { 578 #[serde(rename = "repoDid")] 579 repo_did: Did<DefaultStr>, 580} 581 582#[derive(Debug, Deserialize)] 583struct GetProfileQuery { 584 actor: AtUri<DefaultStr>, 585} 586 587#[derive(Debug, Deserialize)] 588struct GetIssueQuery { 589 issue: AtUri<DefaultStr>, 590} 591 592#[derive(Debug, Deserialize)] 593struct GetPullQuery { 594 pull: AtUri<DefaultStr>, 595} 596 597#[derive(Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)] 598#[serde(rename_all = "lowercase")] 599enum Order { 600 Asc, 601 #[default] 602 Desc, 603} 604 605impl From<Order> for SortDir { 606 fn from(o: Order) -> Self { 607 match o { 608 Order::Asc => SortDir::Asc, 609 Order::Desc => SortDir::Desc, 610 } 611 } 612} 613 614#[derive(Debug, Deserialize)] 615struct TypedListQuery<F> { 616 subject: SubjectQuery, 617 cursor: Option<String>, 618 limit: Option<u32>, 619 #[serde(default)] 620 order: Order, 621 #[serde(flatten)] 622 filter: F, 623} 624 625impl<F> TypedListQuery<F> { 626 fn dir(&self) -> SortDir { 627 self.order.into() 628 } 629} 630 631#[derive(Debug, Deserialize)] 632struct CountQuery { 633 subject: SubjectQuery, 634} 635 636#[derive(Debug, Deserialize)] 637struct SearchQueryParams { 638 q: String, 639 nsid: Option<Nsid<DefaultStr>>, 640 author: Option<Did<DefaultStr>>, 641 repo: Option<Did<DefaultStr>>, 642 since: Option<String>, 643 until: Option<String>, 644 cursor: Option<String>, 645 limit: Option<u32>, 646} 647 648pub struct XrpcQuery<T>(pub T); 649 650impl<S, T> FromRequestParts<S> for XrpcQuery<T> 651where 652 S: Send + Sync, 653 T: serde::de::DeserializeOwned + Send + 'static, 654{ 655 type Rejection = XrpcError; 656 657 async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> { 658 Query::<T>::from_request_parts(parts, state) 659 .await 660 .map(|Query(t)| Self(t)) 661 .map_err(|rej: QueryRejection| XrpcError::InvalidParams(rej.body_text())) 662 } 663} 664 665#[derive(Debug, Error)] 666pub enum XrpcError { 667 #[error("invalid request: {0}")] 668 InvalidParams(String), 669 #[error("record not found")] 670 NotFound, 671 #[error("upstream unavailable: {0}")] 672 UpstreamUnavailable(String), 673 #[error("upstream gone: {0}")] 674 UpstreamGone(String), 675 #[error("invalid record: {0}")] 676 InvalidRecord(String), 677 #[error("internal: {0}")] 678 Internal(String), 679 #[error("overloaded, shedding under memory pressure")] 680 Overloaded, 681} 682 683impl XrpcError { 684 pub fn overloaded() -> Self { 685 Self::Overloaded 686 } 687} 688 689#[derive(Serialize)] 690struct ErrorBody { 691 error: &'static str, 692 message: String, 693} 694 695impl IntoResponse for XrpcError { 696 fn into_response(self) -> Response { 697 let (status, error) = match &self { 698 Self::InvalidParams(_) => (StatusCode::BAD_REQUEST, "InvalidRequest"), 699 Self::NotFound => (StatusCode::NOT_FOUND, "RecordNotFound"), 700 Self::UpstreamUnavailable(_) => (StatusCode::BAD_GATEWAY, "UpstreamFailed"), 701 Self::UpstreamGone(_) => (StatusCode::BAD_GATEWAY, "UpstreamGone"), 702 Self::InvalidRecord(_) => (StatusCode::BAD_GATEWAY, "InvalidRecord"), 703 Self::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "InternalError"), 704 Self::Overloaded => (StatusCode::SERVICE_UNAVAILABLE, "Overloaded"), 705 }; 706 let body = ErrorBody { 707 error, 708 message: self.to_string(), 709 }; 710 (status, Json(body)).into_response() 711 } 712} 713 714#[derive(Serialize)] 715#[serde(rename_all = "camelCase")] 716struct CoverageEnvelope { 717 ready: bool, 718 events_processed: u64, 719 last_cursor: u64, 720} 721 722impl From<Coverage> for CoverageEnvelope { 723 fn from(c: Coverage) -> Self { 724 Self { 725 ready: c.is_ready(), 726 events_processed: c.events_processed(), 727 last_cursor: c.last_cursor().raw(), 728 } 729 } 730} 731 732#[derive(Serialize)] 733#[serde(rename_all = "camelCase")] 734struct RecordView<V> { 735 uri: AtUri<DefaultStr>, 736 cid: Option<Cid<DefaultStr>>, 737 value: V, 738} 739 740#[derive(Serialize)] 741#[serde(rename_all = "camelCase")] 742struct StatefulItem<V> { 743 #[serde(flatten)] 744 view: RecordView<V>, 745 state: &'static str, 746 #[serde(skip_serializing_if = "Option::is_none")] 747 state_updated_at: Option<String>, 748 comment_count: u64, 749} 750 751fn format_micros(micros: u64) -> String { 752 let signed = i64::try_from(micros).ok(); 753 let rfc = signed 754 .and_then(chrono::DateTime::<chrono::Utc>::from_timestamp_micros) 755 .map(|dt| dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true)); 756 rfc.unwrap_or_else(|| micros.to_string()) 757} 758 759pub(crate) fn source_authority_did(source: &AtUri<DefaultStr>) -> Option<Did<DefaultStr>> { 760 match source.authority() { 761 AtIdentifier::Did(d) => Some(d.clone().into_static()), 762 AtIdentifier::Handle(_) => None, 763 } 764} 765 766fn enrich_issue_view( 767 state: &AppState, 768 view: RecordView<Issue<DefaultStr>>, 769) -> StatefulItem<Issue<DefaultStr>> { 770 let issue_author = source_authority_did(&view.uri); 771 let repo_did = view.value.repo.clone(); 772 enrich_view( 773 &state.edges, 774 nsid_static("sh.tangled.feed.comment"), 775 &state.issue_states, 776 view, 777 move |src| accept_state_source(src, issue_author.as_ref(), &repo_did), 778 ) 779} 780 781fn enrich_pull_view( 782 state: &AppState, 783 view: RecordView<Pull<DefaultStr>>, 784) -> StatefulItem<Pull<DefaultStr>> { 785 let pull_author = source_authority_did(&view.uri); 786 let target_repo = view.value.target.repo.clone(); 787 enrich_view( 788 &state.edges, 789 nsid_static("sh.tangled.feed.comment"), 790 &state.pull_statuses, 791 view, 792 move |src| accept_state_source(src, pull_author.as_ref(), &target_repo), 793 ) 794} 795 796pub(crate) fn accept_state_source( 797 source: &AtUri<DefaultStr>, 798 entity_author: Option<&Did<DefaultStr>>, 799 repo_owner: &Did<DefaultStr>, 800) -> bool { 801 let Some(src) = source_authority_did(source) else { 802 return false; 803 }; 804 Some(&src) == entity_author || &src == repo_owner 805} 806 807fn enrich_view<V, K, F>( 808 edges: &EdgeStore, 809 comment_nsid: Nsid<DefaultStr>, 810 states: &StateIndex<K>, 811 view: RecordView<V>, 812 accept: F, 813) -> StatefulItem<V> 814where 815 K: StateKind + Default, 816 F: Fn(&AtUri<DefaultStr>) -> bool, 817{ 818 let comment_count = edges.count(&EdgeKey::new( 819 comment_nsid, 820 SubjectRef::Uri(view.uri.clone()), 821 )); 822 let (state, state_updated_at) = states 823 .latest_by(&view.uri, accept) 824 .map_or((K::default().wire(), None), |(kind, micros)| { 825 (kind.wire(), Some(format_micros(micros))) 826 }); 827 StatefulItem { 828 view, 829 state, 830 state_updated_at, 831 comment_count, 832 } 833} 834 835#[derive(Serialize)] 836#[serde(rename_all = "camelCase")] 837struct CountResponse { 838 count: u64, 839 distinct_authors: u64, 840} 841 842#[derive(Serialize)] 843#[serde(rename_all = "camelCase")] 844struct SearchHitView { 845 uri: AtUri<DefaultStr>, 846 cid: Option<Cid<DefaultStr>>, 847 nsid: Nsid<DefaultStr>, 848 score: f32, 849 value: SearchableRecord, 850} 851 852fn map_slingshot(err: SlingshotError) -> XrpcError { 853 use SlingshotError as E; 854 match err { 855 E::NotFound => XrpcError::NotFound, 856 e @ (E::Decode(_) 857 | E::MissingField(_) 858 | E::InvalidAtUri(_) 859 | E::InvalidCid(_) 860 | E::UriMismatch { .. }) => XrpcError::InvalidRecord(e.to_string()), 861 e @ (E::Network(_) 862 | E::Build(_) 863 | E::Upstream(_) 864 | E::BodyTooLarge { .. } 865 | E::BadScheme(_)) => XrpcError::UpstreamUnavailable(e.to_string()), 866 } 867} 868 869fn parse_uri(raw: &str) -> Result<AtUri<DefaultStr>, XrpcError> { 870 AtUri::<DefaultStr>::new_owned(raw).map_err(|e| XrpcError::InvalidParams(format!("uri: {e}"))) 871} 872 873#[derive(Clone, Copy, Debug, Eq, PartialEq)] 874pub enum SubjectShape { 875 BareDid, 876 Collection(&'static str), 877 BareDidOrOneOfCollections(&'static [&'static str]), 878 OneOfCollections(&'static [&'static str]), 879 AnyAtUri, 880} 881 882pub trait HasSubject { 883 const SHAPE: SubjectShape; 884} 885 886pub trait MirrorOf { 887 type Record: XrpcResp; 888 const EDGE_KIND: &'static str; 889 const SHAPE: SubjectShape; 890} 891 892pub struct StarBy; 893pub struct ReactionBy; 894pub struct FollowBy; 895pub struct VouchBy; 896pub struct RefUpdateBy; 897pub struct KnotMemberBy; 898pub struct LabelOpBy; 899pub struct PipelineBy; 900pub struct PipelineStatusBy; 901pub struct ArtifactBy; 902pub struct CollaboratorBy; 903pub struct FeedCommentBy; 904pub struct IssueBy; 905pub struct IssueStateBy; 906pub struct PullBy; 907pub struct PullStatusBy; 908pub struct SpindleMemberBy; 909 910impl MirrorOf for StarBy { 911 type Record = StarRecord; 912 const EDGE_KIND: &'static str = "sh.tangled.feed.star.by"; 913 const SHAPE: SubjectShape = SubjectShape::BareDid; 914} 915impl MirrorOf for ReactionBy { 916 type Record = ReactionRecord; 917 const EDGE_KIND: &'static str = "sh.tangled.feed.reaction.by"; 918 const SHAPE: SubjectShape = SubjectShape::BareDid; 919} 920impl MirrorOf for FollowBy { 921 type Record = FollowRecord; 922 const EDGE_KIND: &'static str = "sh.tangled.graph.follow.by"; 923 const SHAPE: SubjectShape = SubjectShape::BareDid; 924} 925impl MirrorOf for VouchBy { 926 type Record = VouchRecord; 927 const EDGE_KIND: &'static str = "sh.tangled.graph.vouch.by"; 928 const SHAPE: SubjectShape = SubjectShape::BareDid; 929} 930impl MirrorOf for RefUpdateBy { 931 type Record = RefUpdateRecord; 932 const EDGE_KIND: &'static str = "sh.tangled.git.refUpdate.by"; 933 const SHAPE: SubjectShape = SubjectShape::BareDid; 934} 935impl MirrorOf for KnotMemberBy { 936 type Record = KnotMemberRecord; 937 const EDGE_KIND: &'static str = "sh.tangled.knot.member.by"; 938 const SHAPE: SubjectShape = SubjectShape::BareDid; 939} 940impl MirrorOf for LabelOpBy { 941 type Record = LabelOpRecord; 942 const EDGE_KIND: &'static str = "sh.tangled.label.op.by"; 943 const SHAPE: SubjectShape = SubjectShape::BareDid; 944} 945impl MirrorOf for PipelineBy { 946 type Record = PipelineRecord; 947 const EDGE_KIND: &'static str = "sh.tangled.pipeline.by"; 948 const SHAPE: SubjectShape = SubjectShape::BareDid; 949} 950impl MirrorOf for PipelineStatusBy { 951 type Record = PipelineStatusRecord; 952 const EDGE_KIND: &'static str = "sh.tangled.pipeline.status.by"; 953 const SHAPE: SubjectShape = SubjectShape::BareDid; 954} 955impl MirrorOf for ArtifactBy { 956 type Record = ArtifactRecord; 957 const EDGE_KIND: &'static str = "sh.tangled.repo.artifact.by"; 958 const SHAPE: SubjectShape = SubjectShape::BareDid; 959} 960impl MirrorOf for CollaboratorBy { 961 type Record = CollaboratorRecord; 962 const EDGE_KIND: &'static str = "sh.tangled.repo.collaborator.by"; 963 const SHAPE: SubjectShape = SubjectShape::BareDid; 964} 965impl MirrorOf for FeedCommentBy { 966 type Record = FeedCommentRecord; 967 const EDGE_KIND: &'static str = "sh.tangled.feed.comment.by"; 968 const SHAPE: SubjectShape = SubjectShape::BareDid; 969} 970impl MirrorOf for IssueBy { 971 type Record = IssueRecord; 972 const EDGE_KIND: &'static str = "sh.tangled.repo.issue.by"; 973 const SHAPE: SubjectShape = SubjectShape::BareDid; 974} 975impl MirrorOf for IssueStateBy { 976 type Record = IssueStateRecord; 977 const EDGE_KIND: &'static str = "sh.tangled.repo.issue.state.by"; 978 const SHAPE: SubjectShape = SubjectShape::BareDid; 979} 980impl MirrorOf for PullBy { 981 type Record = PullRecord; 982 const EDGE_KIND: &'static str = "sh.tangled.repo.pull.by"; 983 const SHAPE: SubjectShape = SubjectShape::BareDid; 984} 985impl MirrorOf for PullStatusBy { 986 type Record = PullStatusRecord; 987 const EDGE_KIND: &'static str = "sh.tangled.repo.pull.status.by"; 988 const SHAPE: SubjectShape = SubjectShape::BareDid; 989} 990impl MirrorOf for SpindleMemberBy { 991 type Record = SpindleMemberRecord; 992 const EDGE_KIND: &'static str = "sh.tangled.spindle.member.by"; 993 const SHAPE: SubjectShape = SubjectShape::BareDid; 994} 995 996impl HasSubject for StarRecord { 997 const SHAPE: SubjectShape = SubjectShape::BareDidOrOneOfCollections(&["sh.tangled.string"]); 998} 999impl HasSubject for FollowRecord { 1000 const SHAPE: SubjectShape = SubjectShape::BareDid; 1001} 1002impl HasSubject for IssueRecord { 1003 const SHAPE: SubjectShape = SubjectShape::BareDid; 1004} 1005impl HasSubject for PullRecord { 1006 const SHAPE: SubjectShape = SubjectShape::BareDid; 1007} 1008impl HasSubject for FeedCommentRecord { 1009 const SHAPE: SubjectShape = SubjectShape::OneOfCollections(&[ 1010 "sh.tangled.repo.issue", 1011 "sh.tangled.repo.pull", 1012 "sh.tangled.string", 1013 ]); 1014} 1015impl HasSubject for LabelDefinitionRecord { 1016 const SHAPE: SubjectShape = SubjectShape::BareDid; 1017} 1018impl HasSubject for LabelOpRecord { 1019 const SHAPE: SubjectShape = 1020 SubjectShape::OneOfCollections(&["sh.tangled.repo.issue", "sh.tangled.repo.pull"]); 1021} 1022impl HasSubject for PipelineRecord { 1023 const SHAPE: SubjectShape = SubjectShape::BareDid; 1024} 1025impl HasSubject for PipelineStatusRecord { 1026 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.pipeline"); 1027} 1028impl HasSubject for ArtifactRecord { 1029 const SHAPE: SubjectShape = SubjectShape::BareDid; 1030} 1031impl HasSubject for KnotMemberRecord { 1032 const SHAPE: SubjectShape = SubjectShape::BareDid; 1033} 1034impl HasSubject for SpindleMemberRecord { 1035 const SHAPE: SubjectShape = SubjectShape::BareDid; 1036} 1037impl HasSubject for TangledStringRecord { 1038 const SHAPE: SubjectShape = SubjectShape::BareDid; 1039} 1040impl HasSubject for ReactionRecord { 1041 const SHAPE: SubjectShape = SubjectShape::AnyAtUri; 1042} 1043impl HasSubject for RefUpdateRecord { 1044 const SHAPE: SubjectShape = SubjectShape::BareDid; 1045} 1046impl HasSubject for CollaboratorRecord { 1047 const SHAPE: SubjectShape = SubjectShape::BareDid; 1048} 1049impl HasSubject for IssueStateRecord { 1050 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.repo.issue"); 1051} 1052impl HasSubject for PullStatusRecord { 1053 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.repo.pull"); 1054} 1055impl HasSubject for KnotRecord { 1056 const SHAPE: SubjectShape = SubjectShape::BareDid; 1057} 1058impl HasSubject for SpindleRecord { 1059 const SHAPE: SubjectShape = SubjectShape::BareDid; 1060} 1061impl HasSubject for PublicKeyRecord { 1062 const SHAPE: SubjectShape = SubjectShape::BareDid; 1063} 1064impl HasSubject for RepoRecord { 1065 const SHAPE: SubjectShape = SubjectShape::BareDid; 1066} 1067impl HasSubject for VouchRecord { 1068 const SHAPE: SubjectShape = SubjectShape::BareDid; 1069} 1070 1071fn parse_subject(raw: &SubjectQuery, shape: SubjectShape) -> Result<SubjectRef, XrpcError> { 1072 let uri = match raw { 1073 SubjectQuery::Did(did) => { 1074 return match shape { 1075 SubjectShape::BareDid | SubjectShape::BareDidOrOneOfCollections(_) => { 1076 Ok(SubjectRef::Did(did.clone())) 1077 } 1078 SubjectShape::Collection(expected) => Err(XrpcError::InvalidParams(format!( 1079 "subject must be at://<did>/{expected}/<rkey>, got bare did" 1080 ))), 1081 SubjectShape::OneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!( 1082 "subject must be at://<did>/<nsid>/<rkey> with nsid in [{}], got bare did", 1083 allowed.join(", "), 1084 ))), 1085 SubjectShape::AnyAtUri => Err(XrpcError::InvalidParams( 1086 "subject must be at-uri form, got bare did".into(), 1087 )), 1088 }; 1089 } 1090 SubjectQuery::Uri(uri) => uri, 1091 }; 1092 if matches!(uri.authority(), AtIdentifier::Handle(_)) { 1093 return Err(XrpcError::InvalidParams( 1094 "subject authority must be a did, not a handle".into(), 1095 )); 1096 } 1097 let Some(collection) = uri.collection() else { 1098 return Err(XrpcError::InvalidParams( 1099 "subject must be a bare did or full at://<did>/<nsid>/<rkey>".into(), 1100 )); 1101 }; 1102 let c = collection.as_ref(); 1103 match shape { 1104 SubjectShape::BareDid => Err(XrpcError::InvalidParams(format!( 1105 "subject must be a bare did, got at-uri with collection {c}" 1106 ))), 1107 SubjectShape::Collection(expected) if c == expected => { 1108 require_rkey(uri, expected)?; 1109 Ok(SubjectRef::Uri(uri.clone())) 1110 } 1111 SubjectShape::Collection(expected) => Err(XrpcError::InvalidParams(format!( 1112 "subject must be at://<did>/{expected}/<rkey>, got collection {c}" 1113 ))), 1114 SubjectShape::OneOfCollections(allowed) if allowed.contains(&c) => { 1115 require_rkey(uri, c)?; 1116 Ok(SubjectRef::Uri(uri.clone())) 1117 } 1118 SubjectShape::OneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!( 1119 "subject must be at://<did>/<nsid>/<rkey> with nsid in [{}], got collection {c}", 1120 allowed.join(", "), 1121 ))), 1122 SubjectShape::BareDidOrOneOfCollections(allowed) if allowed.contains(&c) => { 1123 require_rkey(uri, c)?; 1124 Ok(SubjectRef::Uri(uri.clone())) 1125 } 1126 SubjectShape::BareDidOrOneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!( 1127 "subject must be a bare did or at://<did>/<nsid>/<rkey> with nsid in [{}], got collection {c}", 1128 allowed.join(", "), 1129 ))), 1130 SubjectShape::AnyAtUri => Ok(SubjectRef::Uri(uri.clone())), 1131 } 1132} 1133 1134fn require_rkey(uri: &AtUri<DefaultStr>, expected: &str) -> Result<(), XrpcError> { 1135 uri.rkey().map(|_| ()).ok_or_else(|| { 1136 XrpcError::InvalidParams(format!( 1137 "subject must be at://<did>/{expected}/<rkey>; missing rkey" 1138 )) 1139 }) 1140} 1141 1142fn parse_cursor(raw: Option<&str>) -> Result<PageCursor, XrpcError> { 1143 PageCursor::from_token(raw) 1144 .map_err(|e: CursorParseError| XrpcError::InvalidParams(format!("cursor: {e}"))) 1145} 1146 1147fn parse_limit(raw: Option<u32>) -> Result<PageLimit, XrpcError> { 1148 PageLimit::new(raw.unwrap_or(DEFAULT_LIMIT)) 1149 .map_err(|e| XrpcError::InvalidParams(format!("limit: {e}"))) 1150} 1151 1152pub(crate) fn at_uri_owned_by(uri: &AtUri<DefaultStr>, author: &Did<DefaultStr>) -> bool { 1153 match uri.authority() { 1154 AtIdentifier::Did(d) => d.as_ref() == author.as_ref(), 1155 AtIdentifier::Handle(_) => false, 1156 } 1157} 1158 1159async fn resolve_for_view( 1160 state: &AppState, 1161 expected_nsid: &Nsid<DefaultStr>, 1162 uri: AtUri<DefaultStr>, 1163) -> Result<Arc<RecordBody>, XrpcError> { 1164 let raw = uri.as_ref().to_owned(); 1165 resolve(state, ExpectedNsid::new(expected_nsid.clone()), uri) 1166 .await 1167 .map(|(body, _did)| body) 1168 .map_err(|e| match e { 1169 XrpcError::NotFound => XrpcError::UpstreamGone(raw), 1170 other => other, 1171 }) 1172} 1173 1174async fn resolve( 1175 state: &AppState, 1176 expected: ExpectedNsid, 1177 uri: AtUri<DefaultStr>, 1178) -> Result<(Arc<RecordBody>, Did<DefaultStr>), XrpcError> { 1179 let collection = uri 1180 .collection() 1181 .ok_or_else(|| XrpcError::InvalidParams("uri missing collection".into()))?; 1182 if !expected.accepts(collection.as_ref()) { 1183 return Err(XrpcError::InvalidParams(format!( 1184 "collection mismatch: expected {}, got {}", 1185 expected.as_str(), 1186 collection.as_ref() 1187 ))); 1188 } 1189 let rkey = uri 1190 .rkey() 1191 .ok_or_else(|| XrpcError::InvalidParams("uri missing rkey".into()))?; 1192 let did_ref = match uri.authority() { 1193 AtIdentifier::Did(d) => d, 1194 AtIdentifier::Handle(_) => { 1195 return Err(XrpcError::InvalidParams( 1196 "uri authority must be a did, not a handle".into(), 1197 )); 1198 } 1199 }; 1200 let did: Did<DefaultStr> = did_ref.clone().into_static(); 1201 1202 if let Some(hit) = state.records.get(&uri) { 1203 return Ok((hit, did)); 1204 } 1205 let body = state 1206 .slingshot 1207 .get_record(&did_ref, &collection, &rkey) 1208 .await 1209 .map_err(map_slingshot)?; 1210 verify_type_tag(&body, &expected)?; 1211 state.records.put(uri, body.clone()); 1212 Ok((body, did)) 1213} 1214 1215#[derive(Deserialize)] 1216struct TypeTag<'a> { 1217 #[serde(rename = "$type", borrow)] 1218 ty: &'a str, 1219} 1220 1221fn verify_type_tag(body: &RecordBody, expected: &ExpectedNsid) -> Result<(), XrpcError> { 1222 let bytes = body.value.as_ref(); 1223 let ty: std::borrow::Cow<'_, str> = match serde_json::from_slice::<TypeTag>(bytes) { 1224 Ok(t) => std::borrow::Cow::Borrowed(t.ty), 1225 Err(_) => { 1226 let value: serde_json::Value = serde_json::from_slice(bytes) 1227 .map_err(|e| XrpcError::InvalidRecord(format!("$type peek: {e}")))?; 1228 value 1229 .as_object() 1230 .and_then(|m| m.get("$type")) 1231 .and_then(|v| v.as_str()) 1232 .map(|s| std::borrow::Cow::Owned(s.to_owned())) 1233 .ok_or_else(|| XrpcError::InvalidRecord("$type peek: missing $type field".into()))? 1234 } 1235 }; 1236 if !expected.accepts(ty.as_ref()) { 1237 return Err(XrpcError::InvalidRecord(format!( 1238 "$type mismatch: expected {}, got {}", 1239 expected.as_str(), 1240 ty 1241 ))); 1242 } 1243 Ok(()) 1244} 1245 1246fn wire_type_nsid(bytes: &[u8]) -> Option<Nsid<DefaultStr>> { 1247 let ty = serde_json::from_slice::<TypeTag>(bytes).ok()?.ty; 1248 Nsid::<DefaultStr>::new_owned(ty).ok() 1249} 1250 1251async fn deserialize_or_upgrade<V>( 1252 state: &AppState, 1253 nsid: &Nsid<DefaultStr>, 1254 bytes: &[u8], 1255) -> Result<V, XrpcError> 1256where 1257 V: serde::de::DeserializeOwned, 1258{ 1259 match serde_json::from_slice::<V>(bytes) { 1260 Ok(v) => Ok(v), 1261 Err(canon_err) => { 1262 let normalized = normalize_record_fields(bytes); 1263 let working: &[u8] = normalized.as_deref().unwrap_or(bytes); 1264 if normalized.is_some() 1265 && let Ok(v) = serde_json::from_slice::<V>(working) 1266 { 1267 return Ok(v); 1268 } 1269 let scrubbed = scrub_record_bytes(nsid, working); 1270 let retry_bytes: &[u8] = scrubbed.as_deref().unwrap_or(working); 1271 if scrubbed.is_some() 1272 && let Ok(v) = serde_json::from_slice::<V>(retry_bytes) 1273 { 1274 return Ok(v); 1275 } 1276 let wire_nsid = wire_type_nsid(retry_bytes).unwrap_or_else(|| nsid.clone()); 1277 match upgrade_wire_bytes(&wire_nsid, retry_bytes, &state.resolver).await { 1278 Ok(canon_bytes) => serde_json::from_slice(&canon_bytes) 1279 .map_err(|e| XrpcError::InvalidRecord(e.to_string())), 1280 Err(_) => Err(XrpcError::InvalidRecord(canon_err.to_string())), 1281 } 1282 } 1283 } 1284} 1285 1286async fn fetch_from_uri<R, V>( 1287 state: &AppState, 1288 uri: AtUri<DefaultStr>, 1289) -> Result<(Arc<RecordBody>, V), XrpcError> 1290where 1291 R: XrpcResp, 1292 V: serde::de::DeserializeOwned + NormalizeRepoRefs, 1293{ 1294 let raw = uri.as_str().to_owned(); 1295 let nsid = nsid_static(R::NSID); 1296 let (body, _did) = resolve(state, ExpectedNsid::new(nsid.clone()), uri).await?; 1297 let value: V = deserialize_or_upgrade(state, &nsid, &body.value).await?; 1298 let value = value 1299 .normalize(&state.resolver) 1300 .await 1301 .ok_or(XrpcError::UpstreamGone(raw))?; 1302 Ok((body, value)) 1303} 1304 1305async fn fetch<R, V>( 1306 state: &AppState, 1307 uri: &AtUri<DefaultStr>, 1308) -> Result<(Arc<RecordBody>, V), XrpcError> 1309where 1310 R: XrpcResp, 1311 V: serde::de::DeserializeOwned + NormalizeRepoRefs, 1312{ 1313 fetch_from_uri::<R, V>(state, uri.clone()).await 1314} 1315 1316async fn get_repo( 1317 State(state): State<AppState>, 1318 XrpcQuery(q): XrpcQuery<GetRepoQuery>, 1319) -> Result<Json<RepoGetRecordOutput<DefaultStr>>, XrpcError> { 1320 let (body, value) = fetch::<RepoRecord, Repo<DefaultStr>>(&state, &q.repo).await?; 1321 Ok(Json(RepoGetRecordOutput { 1322 cid: Some(body.cid.clone()), 1323 uri: body.uri.clone(), 1324 value, 1325 })) 1326} 1327 1328async fn get_repo_by_repo_did( 1329 State(state): State<AppState>, 1330 XrpcQuery(q): XrpcQuery<GetRepoByRepoDidQuery>, 1331) -> Result<Json<RepoGetRecordOutput<DefaultStr>>, XrpcError> { 1332 let ident = state 1333 .resolver 1334 .lookup_by_repo_did(&q.repo_did) 1335 .await 1336 .ok_or(XrpcError::NotFound)?; 1337 let uri = AtUri::<DefaultStr>::from_parts_owned( 1338 ident.owner.as_str(), 1339 RepoRecord::NSID, 1340 ident.rkey.as_str(), 1341 ) 1342 .expect("Did and Rkey newtypes already validated, at-uri assembly cannot fail"); 1343 let (body, value) = fetch_from_uri::<RepoRecord, Repo<DefaultStr>>(&state, uri).await?; 1344 Ok(Json(RepoGetRecordOutput { 1345 cid: Some(body.cid.clone()), 1346 uri: body.uri.clone(), 1347 value, 1348 })) 1349} 1350 1351async fn get_profile( 1352 State(state): State<AppState>, 1353 XrpcQuery(q): XrpcQuery<GetProfileQuery>, 1354) -> Result<Json<ProfileGetRecordOutput<DefaultStr>>, XrpcError> { 1355 let (body, value) = fetch::<ProfileRecord, Profile<DefaultStr>>(&state, &q.actor).await?; 1356 Ok(Json(ProfileGetRecordOutput { 1357 cid: Some(body.cid.clone()), 1358 uri: body.uri.clone(), 1359 value, 1360 })) 1361} 1362 1363async fn get_issue( 1364 State(state): State<AppState>, 1365 XrpcQuery(q): XrpcQuery<GetIssueQuery>, 1366) -> Result<Json<IssueGetRecordOutput<DefaultStr>>, XrpcError> { 1367 let (body, value) = fetch::<IssueRecord, Issue<DefaultStr>>(&state, &q.issue).await?; 1368 Ok(Json(IssueGetRecordOutput { 1369 cid: Some(body.cid.clone()), 1370 uri: body.uri.clone(), 1371 value, 1372 })) 1373} 1374 1375async fn get_pull( 1376 State(state): State<AppState>, 1377 XrpcQuery(q): XrpcQuery<GetPullQuery>, 1378) -> Result<Json<PullGetRecordOutput<DefaultStr>>, XrpcError> { 1379 let (body, value) = fetch::<PullRecord, Pull<DefaultStr>>(&state, &q.pull).await?; 1380 Ok(Json(PullGetRecordOutput { 1381 cid: Some(body.cid.clone()), 1382 uri: body.uri.clone(), 1383 value, 1384 })) 1385} 1386 1387async fn get_repos( 1388 State(state): State<AppState>, 1389 RawQuery(query): RawQuery, 1390) -> Result<Response, XrpcError> { 1391 let uris = collect_repeated(query.as_deref(), BULK_REPOS_KEY); 1392 bulk_fetch::<RepoRecord, Repo<DefaultStr>>(&state, uris).await 1393} 1394 1395async fn get_profiles( 1396 State(state): State<AppState>, 1397 RawQuery(query): RawQuery, 1398) -> Result<Response, XrpcError> { 1399 let uris = collect_repeated(query.as_deref(), BULK_PROFILES_KEY); 1400 bulk_fetch::<ProfileRecord, Profile<DefaultStr>>(&state, uris).await 1401} 1402 1403async fn get_issues( 1404 State(state): State<AppState>, 1405 RawQuery(query): RawQuery, 1406) -> Result<Response, XrpcError> { 1407 let uris = collect_repeated(query.as_deref(), BULK_ISSUES_KEY); 1408 bulk_fetch::<IssueRecord, Issue<DefaultStr>>(&state, uris).await 1409} 1410 1411async fn get_pulls( 1412 State(state): State<AppState>, 1413 RawQuery(query): RawQuery, 1414) -> Result<Response, XrpcError> { 1415 let uris = collect_repeated(query.as_deref(), BULK_PULLS_KEY); 1416 bulk_fetch::<PullRecord, Pull<DefaultStr>>(&state, uris).await 1417} 1418 1419const BULK_REPOS_KEY: &str = "repos"; 1420const BULK_PROFILES_KEY: &str = "actors"; 1421const BULK_ISSUES_KEY: &str = "issues"; 1422const BULK_PULLS_KEY: &str = "pulls"; 1423const BULK_LIMIT: usize = 50; 1424 1425fn collect_repeated(query: Option<&str>, key: &str) -> Vec<String> { 1426 let Some(q) = query else { 1427 return Vec::new(); 1428 }; 1429 form_urlencoded::parse(q.as_bytes()) 1430 .filter_map(|(k, v)| (k == key).then(|| v.into_owned())) 1431 .collect() 1432} 1433 1434async fn hydrate_record_view<V>( 1435 state: &AppState, 1436 nsid: &Nsid<DefaultStr>, 1437 uri: AtUri<DefaultStr>, 1438 sort_micros: u64, 1439) -> Result<Option<RecordView<V>>, XrpcError> 1440where 1441 V: serde::de::DeserializeOwned + NormalizeRepoRefs, 1442{ 1443 if let Some(source) = decode_knot_owned_source(&uri) { 1444 return synthesize_knot_owned_view::<V>(state, uri, source, sort_micros).await; 1445 } 1446 let body = resolve_for_view(state, nsid, uri).await?; 1447 let value: V = deserialize_or_upgrade::<V>(state, nsid, &body.value).await?; 1448 let Some(value) = value.normalize(&state.resolver).await else { 1449 return Ok(None); 1450 }; 1451 Ok(Some(RecordView { 1452 uri: body.uri.clone(), 1453 cid: Some(body.cid.clone()), 1454 value, 1455 })) 1456} 1457 1458async fn synthesize_knot_owned_view<V>( 1459 state: &AppState, 1460 uri: AtUri<DefaultStr>, 1461 source: KnotOwnedSource, 1462 sort_micros: u64, 1463) -> Result<Option<RecordView<V>>, XrpcError> 1464where 1465 V: serde::de::DeserializeOwned + NormalizeRepoRefs, 1466{ 1467 let Some(body) = synth_knot_owned_value(source, sort_micros) else { 1468 return Ok(None); 1469 }; 1470 let Ok(value) = serde_json::from_value::<V>(body) else { 1471 return Ok(None); 1472 }; 1473 let Some(value) = value.normalize(&state.resolver).await else { 1474 return Ok(None); 1475 }; 1476 Ok(Some(RecordView { 1477 uri, 1478 cid: None, 1479 value, 1480 })) 1481} 1482 1483fn synth_knot_owned_value(source: KnotOwnedSource, sort_micros: u64) -> Option<serde_json::Value> { 1484 let created_at = micros_to_rfc3339(sort_micros)?; 1485 match source { 1486 KnotOwnedSource::Member { knot, subject } => Some(serde_json::json!({ 1487 "domain": knot_did_host(&knot)?, 1488 "subject": subject.as_ref(), 1489 "createdAt": created_at, 1490 })), 1491 KnotOwnedSource::Collaborator { repo, subject } => Some(serde_json::json!({ 1492 "repo": repo.as_ref(), 1493 "subject": subject.as_ref(), 1494 "createdAt": created_at, 1495 })), 1496 } 1497} 1498 1499fn micros_to_rfc3339(micros: u64) -> Option<String> { 1500 let micros = i64::try_from(micros).ok()?; 1501 chrono::DateTime::from_timestamp_micros(micros) 1502 .map(|dt| dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true)) 1503} 1504 1505#[derive(Clone, Copy)] 1506enum HitProvenance { 1507 ClientSupplied, 1508 Indexed, 1509} 1510 1511fn is_index_evictable(err: &XrpcError) -> bool { 1512 matches!( 1513 err, 1514 XrpcError::NotFound 1515 | XrpcError::UpstreamGone(_) 1516 | XrpcError::InvalidRecord(_) 1517 | XrpcError::InvalidParams(_) 1518 ) 1519} 1520 1521fn drop_unhydratable<V>( 1522 provenance: HitProvenance, 1523 nsid: &Nsid<DefaultStr>, 1524 uri: &AtUri<DefaultStr>, 1525 result: Result<Option<V>, XrpcError>, 1526) -> Result<Option<V>, XrpcError> { 1527 match result { 1528 Ok(view) => Ok(view), 1529 Err(err @ (XrpcError::NotFound | XrpcError::UpstreamGone(_))) => { 1530 tracing::debug!( 1531 uri = %uri, 1532 nsid = %nsid.as_ref(), 1533 error = %err, 1534 "dropping gone hit during hydration", 1535 ); 1536 Ok(None) 1537 } 1538 Err(err @ XrpcError::UpstreamUnavailable(_)) => { 1539 tracing::warn!( 1540 uri = %uri, 1541 nsid = %nsid.as_ref(), 1542 error = %err, 1543 "dropping hit, upstream unavailable during hydration", 1544 ); 1545 Ok(None) 1546 } 1547 Err(err @ XrpcError::InvalidRecord(_)) => { 1548 tracing::warn!( 1549 uri = %uri, 1550 nsid = %nsid.as_ref(), 1551 error = %err, 1552 "dropping invalid hit during hydration", 1553 ); 1554 Ok(None) 1555 } 1556 Err(err @ XrpcError::InvalidParams(_)) => match provenance { 1557 HitProvenance::ClientSupplied => Err(err), 1558 HitProvenance::Indexed => { 1559 tracing::warn!( 1560 uri = %uri, 1561 nsid = %nsid.as_ref(), 1562 error = %err, 1563 "dropping malformed indexed hit during hydration", 1564 ); 1565 Ok(None) 1566 } 1567 }, 1568 Err(err @ (XrpcError::Internal(_) | XrpcError::Overloaded)) => Err(err), 1569 } 1570} 1571 1572fn hydrate_stream<T, Fut, V>( 1573 items: impl IntoIterator<Item = T>, 1574 produce: impl FnMut(T) -> Fut, 1575) -> impl Stream<Item = Result<V, XrpcError>> 1576where 1577 Fut: Future<Output = Result<Option<V>, XrpcError>>, 1578{ 1579 stream::iter(items) 1580 .map(produce) 1581 .buffered(FETCH_CONCURRENCY) 1582 .try_filter_map(|view| async move { Ok(view) }) 1583} 1584 1585fn hydrate_record_stream<V>( 1586 state: &AppState, 1587 nsid: Nsid<DefaultStr>, 1588 items: Vec<EdgeItem>, 1589 provenance: HitProvenance, 1590) -> impl Stream<Item = Result<RecordView<V>, XrpcError>> + Send + 'static 1591where 1592 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static, 1593{ 1594 let owned = state.clone(); 1595 hydrate_stream(items, move |item| { 1596 let owned = owned.clone(); 1597 let nsid = nsid.clone(); 1598 async move { 1599 let EdgeItem { uri, sort_micros } = item; 1600 let result = hydrate_record_view::<V>(&owned, &nsid, uri.clone(), sort_micros).await; 1601 if matches!(provenance, HitProvenance::Indexed) 1602 && let Err(err) = &result 1603 && is_index_evictable(err) 1604 { 1605 owned.edges.remove_source(&uri); 1606 } 1607 drop_unhydratable(provenance, &nsid, &uri, result) 1608 } 1609 }) 1610} 1611 1612enum PagePhase { 1613 Head, 1614 Body { first: bool }, 1615 Done, 1616} 1617 1618struct PageState<S> { 1619 items: std::pin::Pin<Box<S>>, 1620 phase: PagePhase, 1621 array_key: &'static str, 1622 tail: Vec<u8>, 1623 permit: Option<HeavyPermit>, 1624} 1625 1626fn paged_tail(cursor: Option<String>) -> Vec<u8> { 1627 let encoded = serde_json::to_string(&cursor).unwrap_or_else(|_| "null".to_owned()); 1628 format!("],\"cursor\":{encoded}}}").into_bytes() 1629} 1630 1631fn unpaged_tail() -> Vec<u8> { 1632 b"]}".to_vec() 1633} 1634 1635fn json_stream<V, S>( 1636 array_key: &'static str, 1637 items: S, 1638 tail: Vec<u8>, 1639 permit: Option<HeavyPermit>, 1640) -> Response 1641where 1642 V: Serialize + Send + 'static, 1643 S: Stream<Item = Result<V, XrpcError>> + Send + 'static, 1644{ 1645 let init = PageState { 1646 items: Box::pin(items), 1647 phase: PagePhase::Head, 1648 array_key, 1649 tail, 1650 permit, 1651 }; 1652 let chunks = stream::unfold(init, |mut st| async move { 1653 match st.phase { 1654 PagePhase::Head => { 1655 let head = format!("{{\"{}\":[", st.array_key).into_bytes(); 1656 st.phase = PagePhase::Body { first: true }; 1657 Some((Ok::<Vec<u8>, Infallible>(head), st)) 1658 } 1659 PagePhase::Body { first } => match st.items.next().await { 1660 Some(Ok(view)) => match serde_json::to_vec(&view) { 1661 Ok(encoded) => { 1662 let mut chunk = Vec::with_capacity(encoded.len() + 1); 1663 if !first { 1664 chunk.push(b','); 1665 } 1666 chunk.extend_from_slice(&encoded); 1667 st.phase = PagePhase::Body { first: false }; 1668 Some((Ok(chunk), st)) 1669 } 1670 Err(e) => { 1671 tracing::warn!(error = %e, "skipping hit, serialize failed mid-stream"); 1672 Some((Ok(Vec::new()), st)) 1673 } 1674 }, 1675 Some(Err(e)) => { 1676 tracing::warn!(error = %e, "ending page early, hydration failed mid-stream"); 1677 let tail = std::mem::take(&mut st.tail); 1678 st.phase = PagePhase::Done; 1679 Some((Ok(tail), st)) 1680 } 1681 None => { 1682 let tail = std::mem::take(&mut st.tail); 1683 st.phase = PagePhase::Done; 1684 Some((Ok(tail), st)) 1685 } 1686 }, 1687 PagePhase::Done => { 1688 drop(st.permit.take()); 1689 None 1690 } 1691 } 1692 }); 1693 ( 1694 [(CONTENT_TYPE, "application/json")], 1695 Body::from_stream(chunks), 1696 ) 1697 .into_response() 1698} 1699 1700async fn bulk_fetch<R, V>(state: &AppState, uris: Vec<String>) -> Result<Response, XrpcError> 1701where 1702 R: XrpcResp, 1703 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static, 1704{ 1705 if uris.is_empty() { 1706 return Err(XrpcError::InvalidParams("at least one uri required".into())); 1707 } 1708 if uris.len() > BULK_LIMIT { 1709 return Err(XrpcError::InvalidParams(format!( 1710 "at most {BULK_LIMIT} uris per request" 1711 ))); 1712 } 1713 let parsed: Vec<AtUri<DefaultStr>> = uris 1714 .iter() 1715 .map(|s| parse_uri(s)) 1716 .collect::<Result<_, _>>()?; 1717 let nsid = nsid_static(R::NSID); 1718 if let Some(bad) = parsed 1719 .iter() 1720 .find(|uri| uri.collection().is_none_or(|c| c.as_ref() != nsid.as_ref())) 1721 { 1722 return Err(XrpcError::InvalidParams(format!( 1723 "uri collection must be {}, got {}", 1724 nsid.as_ref(), 1725 bad.as_ref() 1726 ))); 1727 } 1728 let permit = state.heavy_permit()?; 1729 let items = parsed 1730 .into_iter() 1731 .map(|uri| EdgeItem { 1732 uri, 1733 sort_micros: 0, 1734 }) 1735 .collect(); 1736 let views = hydrate_record_stream::<V>(state, nsid, items, HitProvenance::ClientSupplied); 1737 Ok(json_stream::<RecordView<V>, _>( 1738 "items", 1739 views, 1740 unpaged_tail(), 1741 permit, 1742 )) 1743} 1744 1745fn record_edge_page<R, F>( 1746 state: &AppState, 1747 q: &TypedListQuery<F>, 1748) -> Result<(EdgePage, Nsid<DefaultStr>), XrpcError> 1749where 1750 R: XrpcResp + HasSubject, 1751 F: ListFilter, 1752{ 1753 let subject = parse_subject(&q.subject, R::SHAPE)?; 1754 let cursor = parse_cursor(q.cursor.as_deref())?; 1755 let limit = parse_limit(q.limit)?; 1756 let dir = q.dir(); 1757 let nsid = nsid_static(R::NSID); 1758 let page = if q.filter.is_identity() { 1759 let key = EdgeKey::new(nsid.clone(), subject); 1760 state.edges.list(&key, cursor, limit, dir) 1761 } else { 1762 let pred = q.filter.predicate(state, &subject); 1763 let key = EdgeKey::new(nsid.clone(), subject); 1764 state.edges.list_filtered(&key, cursor, limit, dir, pred) 1765 }; 1766 Ok((page, nsid)) 1767} 1768 1769async fn list_records<R, V, F>( 1770 state: &AppState, 1771 q: TypedListQuery<F>, 1772) -> Result<Response, XrpcError> 1773where 1774 R: XrpcResp + HasSubject, 1775 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static, 1776 F: ListFilter, 1777{ 1778 let (page, nsid) = record_edge_page::<R, F>(state, &q)?; 1779 let permit = state.heavy_permit()?; 1780 let views = hydrate_record_stream::<V>(state, nsid, page.items, HitProvenance::Indexed); 1781 Ok(json_stream::<RecordView<V>, _>( 1782 "items", 1783 views, 1784 paged_tail(page.next.map(PageToken::encode_token)), 1785 permit, 1786 )) 1787} 1788 1789fn count_for<R: XrpcResp + HasSubject>( 1790 state: &AppState, 1791 q: CountQuery, 1792) -> Result<CountResponse, XrpcError> { 1793 let subject = parse_subject(&q.subject, R::SHAPE)?; 1794 let key = EdgeKey::new(nsid_static(R::NSID), subject); 1795 Ok(CountResponse { 1796 count: state.edges.count(&key), 1797 distinct_authors: state.edges.count_distinct_authors(&key), 1798 }) 1799} 1800 1801fn mirror_edge_page<M, F>( 1802 state: &AppState, 1803 q: &TypedListQuery<F>, 1804) -> Result<(EdgePage, Nsid<DefaultStr>), XrpcError> 1805where 1806 M: MirrorOf, 1807 F: ListFilter, 1808{ 1809 let subject = parse_subject(&q.subject, M::SHAPE)?; 1810 let cursor = parse_cursor(q.cursor.as_deref())?; 1811 let limit = parse_limit(q.limit)?; 1812 let dir = q.dir(); 1813 let edge_nsid = nsid_static(M::EDGE_KIND); 1814 let page = if q.filter.is_identity() { 1815 let key = EdgeKey::new(edge_nsid, subject); 1816 state.edges.list(&key, cursor, limit, dir) 1817 } else { 1818 let pred = q.filter.predicate(state, &subject); 1819 let key = EdgeKey::new(edge_nsid, subject); 1820 state.edges.list_filtered(&key, cursor, limit, dir, pred) 1821 }; 1822 let record_nsid = nsid_static(<M::Record as XrpcResp>::NSID); 1823 Ok((page, record_nsid)) 1824} 1825 1826async fn list_mirror<M, V, F>(state: &AppState, q: TypedListQuery<F>) -> Result<Response, XrpcError> 1827where 1828 M: MirrorOf, 1829 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static, 1830 F: ListFilter, 1831{ 1832 let (page, record_nsid) = mirror_edge_page::<M, F>(state, &q)?; 1833 let permit = state.heavy_permit()?; 1834 let views = hydrate_record_stream::<V>(state, record_nsid, page.items, HitProvenance::Indexed); 1835 Ok(json_stream::<RecordView<V>, _>( 1836 "items", 1837 views, 1838 paged_tail(page.next.map(PageToken::encode_token)), 1839 permit, 1840 )) 1841} 1842 1843fn count_mirror<M: MirrorOf>(state: &AppState, q: CountQuery) -> Result<CountResponse, XrpcError> { 1844 let subject = parse_subject(&q.subject, M::SHAPE)?; 1845 let key = EdgeKey::new(nsid_static(M::EDGE_KIND), subject); 1846 Ok(CountResponse { 1847 count: state.edges.count(&key), 1848 distinct_authors: state.edges.count_distinct_authors(&key), 1849 }) 1850} 1851 1852async fn list_stars( 1853 State(state): State<AppState>, 1854 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1855) -> Result<Response, XrpcError> { 1856 list_records::<StarRecord, Star<DefaultStr>, _>(&state, q).await 1857} 1858 1859async fn count_stars( 1860 State(state): State<AppState>, 1861 XrpcQuery(q): XrpcQuery<CountQuery>, 1862) -> Result<Json<CountResponse>, XrpcError> { 1863 count_for::<StarRecord>(&state, q).map(Json) 1864} 1865 1866async fn list_follows( 1867 State(state): State<AppState>, 1868 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1869) -> Result<Response, XrpcError> { 1870 list_records::<FollowRecord, Follow<DefaultStr>, _>(&state, q).await 1871} 1872 1873async fn count_follows( 1874 State(state): State<AppState>, 1875 XrpcQuery(q): XrpcQuery<CountQuery>, 1876) -> Result<Json<CountResponse>, XrpcError> { 1877 count_for::<FollowRecord>(&state, q).map(Json) 1878} 1879 1880async fn list_issues( 1881 State(state): State<AppState>, 1882 XrpcQuery(q): XrpcQuery<TypedListQuery<IssueFilter>>, 1883) -> Result<Response, XrpcError> { 1884 let (page, nsid) = record_edge_page::<IssueRecord, _>(&state, &q)?; 1885 let permit = state.heavy_permit()?; 1886 let owned = state.clone(); 1887 let items = hydrate_record_stream::<Issue<DefaultStr>>( 1888 &state, 1889 nsid, 1890 page.items, 1891 HitProvenance::Indexed, 1892 ) 1893 .map(move |view| view.map(|v| enrich_issue_view(&owned, v))); 1894 Ok(json_stream::<StatefulItem<Issue<DefaultStr>>, _>( 1895 "items", 1896 items, 1897 paged_tail(page.next.map(PageToken::encode_token)), 1898 permit, 1899 )) 1900} 1901 1902async fn count_issues( 1903 State(state): State<AppState>, 1904 XrpcQuery(q): XrpcQuery<CountQuery>, 1905) -> Result<Json<CountResponse>, XrpcError> { 1906 count_for::<IssueRecord>(&state, q).map(Json) 1907} 1908 1909async fn list_pulls( 1910 State(state): State<AppState>, 1911 XrpcQuery(q): XrpcQuery<TypedListQuery<PullFilter>>, 1912) -> Result<Response, XrpcError> { 1913 let (page, nsid) = record_edge_page::<PullRecord, _>(&state, &q)?; 1914 let permit = state.heavy_permit()?; 1915 let owned = state.clone(); 1916 let items = 1917 hydrate_record_stream::<Pull<DefaultStr>>(&state, nsid, page.items, HitProvenance::Indexed) 1918 .map(move |view| view.map(|v| enrich_pull_view(&owned, v))); 1919 Ok(json_stream::<StatefulItem<Pull<DefaultStr>>, _>( 1920 "items", 1921 items, 1922 paged_tail(page.next.map(PageToken::encode_token)), 1923 permit, 1924 )) 1925} 1926 1927async fn count_pulls( 1928 State(state): State<AppState>, 1929 XrpcQuery(q): XrpcQuery<CountQuery>, 1930) -> Result<Json<CountResponse>, XrpcError> { 1931 count_for::<PullRecord>(&state, q).map(Json) 1932} 1933 1934async fn list_feed_comments( 1935 State(state): State<AppState>, 1936 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1937) -> Result<Response, XrpcError> { 1938 list_records::<FeedCommentRecord, FeedComment<DefaultStr>, _>(&state, q).await 1939} 1940 1941async fn count_feed_comments( 1942 State(state): State<AppState>, 1943 XrpcQuery(q): XrpcQuery<CountQuery>, 1944) -> Result<Json<CountResponse>, XrpcError> { 1945 count_for::<FeedCommentRecord>(&state, q).map(Json) 1946} 1947 1948async fn list_reactions( 1949 State(state): State<AppState>, 1950 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1951) -> Result<Response, XrpcError> { 1952 list_records::<ReactionRecord, Reaction<DefaultStr>, _>(&state, q).await 1953} 1954 1955async fn count_reactions( 1956 State(state): State<AppState>, 1957 XrpcQuery(q): XrpcQuery<CountQuery>, 1958) -> Result<Json<CountResponse>, XrpcError> { 1959 count_for::<ReactionRecord>(&state, q).map(Json) 1960} 1961 1962async fn list_ref_updates( 1963 State(state): State<AppState>, 1964 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1965) -> Result<Response, XrpcError> { 1966 list_records::<RefUpdateRecord, RefUpdate<DefaultStr>, _>(&state, q).await 1967} 1968 1969async fn count_ref_updates( 1970 State(state): State<AppState>, 1971 XrpcQuery(q): XrpcQuery<CountQuery>, 1972) -> Result<Json<CountResponse>, XrpcError> { 1973 count_for::<RefUpdateRecord>(&state, q).map(Json) 1974} 1975 1976async fn list_collaborators( 1977 State(state): State<AppState>, 1978 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1979) -> Result<Response, XrpcError> { 1980 list_records::<CollaboratorRecord, Collaborator<DefaultStr>, _>(&state, q).await 1981} 1982 1983async fn count_collaborators( 1984 State(state): State<AppState>, 1985 XrpcQuery(q): XrpcQuery<CountQuery>, 1986) -> Result<Json<CountResponse>, XrpcError> { 1987 count_for::<CollaboratorRecord>(&state, q).map(Json) 1988} 1989 1990async fn list_issue_states( 1991 State(state): State<AppState>, 1992 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 1993) -> Result<Response, XrpcError> { 1994 list_records::<IssueStateRecord, IssueState<DefaultStr>, _>(&state, q).await 1995} 1996 1997async fn count_issue_states( 1998 State(state): State<AppState>, 1999 XrpcQuery(q): XrpcQuery<CountQuery>, 2000) -> Result<Json<CountResponse>, XrpcError> { 2001 count_for::<IssueStateRecord>(&state, q).map(Json) 2002} 2003 2004async fn list_pull_statuses( 2005 State(state): State<AppState>, 2006 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2007) -> Result<Response, XrpcError> { 2008 list_records::<PullStatusRecord, PullStatus<DefaultStr>, _>(&state, q).await 2009} 2010 2011async fn count_pull_statuses( 2012 State(state): State<AppState>, 2013 XrpcQuery(q): XrpcQuery<CountQuery>, 2014) -> Result<Json<CountResponse>, XrpcError> { 2015 count_for::<PullStatusRecord>(&state, q).map(Json) 2016} 2017 2018async fn list_repos( 2019 State(state): State<AppState>, 2020 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2021) -> Result<Response, XrpcError> { 2022 list_records::<RepoRecord, Repo<DefaultStr>, _>(&state, q).await 2023} 2024 2025async fn count_repos( 2026 State(state): State<AppState>, 2027 XrpcQuery(q): XrpcQuery<CountQuery>, 2028) -> Result<Json<CountResponse>, XrpcError> { 2029 count_for::<RepoRecord>(&state, q).map(Json) 2030} 2031 2032async fn list_knots( 2033 State(state): State<AppState>, 2034 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2035) -> Result<Response, XrpcError> { 2036 list_records::<KnotRecord, Knot<DefaultStr>, _>(&state, q).await 2037} 2038 2039async fn count_knots( 2040 State(state): State<AppState>, 2041 XrpcQuery(q): XrpcQuery<CountQuery>, 2042) -> Result<Json<CountResponse>, XrpcError> { 2043 count_for::<KnotRecord>(&state, q).map(Json) 2044} 2045 2046async fn list_spindles( 2047 State(state): State<AppState>, 2048 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2049) -> Result<Response, XrpcError> { 2050 list_records::<SpindleRecord, Spindle<DefaultStr>, _>(&state, q).await 2051} 2052 2053async fn count_spindles( 2054 State(state): State<AppState>, 2055 XrpcQuery(q): XrpcQuery<CountQuery>, 2056) -> Result<Json<CountResponse>, XrpcError> { 2057 count_for::<SpindleRecord>(&state, q).map(Json) 2058} 2059 2060async fn list_public_keys( 2061 State(state): State<AppState>, 2062 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2063) -> Result<Response, XrpcError> { 2064 list_records::<PublicKeyRecord, PublicKey<DefaultStr>, _>(&state, q).await 2065} 2066 2067async fn count_public_keys( 2068 State(state): State<AppState>, 2069 XrpcQuery(q): XrpcQuery<CountQuery>, 2070) -> Result<Json<CountResponse>, XrpcError> { 2071 count_for::<PublicKeyRecord>(&state, q).map(Json) 2072} 2073 2074async fn list_vouches( 2075 State(state): State<AppState>, 2076 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2077) -> Result<Response, XrpcError> { 2078 list_records::<VouchRecord, Vouch<DefaultStr>, _>(&state, q).await 2079} 2080 2081async fn count_vouches( 2082 State(state): State<AppState>, 2083 XrpcQuery(q): XrpcQuery<CountQuery>, 2084) -> Result<Json<CountResponse>, XrpcError> { 2085 count_for::<VouchRecord>(&state, q).map(Json) 2086} 2087 2088async fn list_stars_by( 2089 State(state): State<AppState>, 2090 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2091) -> Result<Response, XrpcError> { 2092 list_mirror::<StarBy, Star<DefaultStr>, _>(&state, q).await 2093} 2094async fn count_stars_by( 2095 State(state): State<AppState>, 2096 XrpcQuery(q): XrpcQuery<CountQuery>, 2097) -> Result<Json<CountResponse>, XrpcError> { 2098 count_mirror::<StarBy>(&state, q).map(Json) 2099} 2100 2101async fn list_reactions_by( 2102 State(state): State<AppState>, 2103 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2104) -> Result<Response, XrpcError> { 2105 list_mirror::<ReactionBy, Reaction<DefaultStr>, _>(&state, q).await 2106} 2107async fn count_reactions_by( 2108 State(state): State<AppState>, 2109 XrpcQuery(q): XrpcQuery<CountQuery>, 2110) -> Result<Json<CountResponse>, XrpcError> { 2111 count_mirror::<ReactionBy>(&state, q).map(Json) 2112} 2113 2114async fn list_follows_by( 2115 State(state): State<AppState>, 2116 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2117) -> Result<Response, XrpcError> { 2118 list_mirror::<FollowBy, Follow<DefaultStr>, _>(&state, q).await 2119} 2120async fn count_follows_by( 2121 State(state): State<AppState>, 2122 XrpcQuery(q): XrpcQuery<CountQuery>, 2123) -> Result<Json<CountResponse>, XrpcError> { 2124 count_mirror::<FollowBy>(&state, q).map(Json) 2125} 2126 2127async fn list_vouches_by( 2128 State(state): State<AppState>, 2129 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2130) -> Result<Response, XrpcError> { 2131 list_mirror::<VouchBy, Vouch<DefaultStr>, _>(&state, q).await 2132} 2133async fn count_vouches_by( 2134 State(state): State<AppState>, 2135 XrpcQuery(q): XrpcQuery<CountQuery>, 2136) -> Result<Json<CountResponse>, XrpcError> { 2137 count_mirror::<VouchBy>(&state, q).map(Json) 2138} 2139 2140async fn list_ref_updates_by( 2141 State(state): State<AppState>, 2142 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2143) -> Result<Response, XrpcError> { 2144 list_mirror::<RefUpdateBy, RefUpdate<DefaultStr>, _>(&state, q).await 2145} 2146async fn count_ref_updates_by( 2147 State(state): State<AppState>, 2148 XrpcQuery(q): XrpcQuery<CountQuery>, 2149) -> Result<Json<CountResponse>, XrpcError> { 2150 count_mirror::<RefUpdateBy>(&state, q).map(Json) 2151} 2152 2153async fn list_knot_members_by( 2154 State(state): State<AppState>, 2155 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2156) -> Result<Response, XrpcError> { 2157 list_mirror::<KnotMemberBy, KnotMember<DefaultStr>, _>(&state, q).await 2158} 2159async fn count_knot_members_by( 2160 State(state): State<AppState>, 2161 XrpcQuery(q): XrpcQuery<CountQuery>, 2162) -> Result<Json<CountResponse>, XrpcError> { 2163 count_mirror::<KnotMemberBy>(&state, q).map(Json) 2164} 2165 2166async fn list_label_ops_by( 2167 State(state): State<AppState>, 2168 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2169) -> Result<Response, XrpcError> { 2170 list_mirror::<LabelOpBy, LabelOp<DefaultStr>, _>(&state, q).await 2171} 2172async fn count_label_ops_by( 2173 State(state): State<AppState>, 2174 XrpcQuery(q): XrpcQuery<CountQuery>, 2175) -> Result<Json<CountResponse>, XrpcError> { 2176 count_mirror::<LabelOpBy>(&state, q).map(Json) 2177} 2178 2179async fn list_pipelines_by( 2180 State(state): State<AppState>, 2181 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2182) -> Result<Response, XrpcError> { 2183 list_mirror::<PipelineBy, Pipeline<DefaultStr>, _>(&state, q).await 2184} 2185async fn count_pipelines_by( 2186 State(state): State<AppState>, 2187 XrpcQuery(q): XrpcQuery<CountQuery>, 2188) -> Result<Json<CountResponse>, XrpcError> { 2189 count_mirror::<PipelineBy>(&state, q).map(Json) 2190} 2191 2192async fn list_pipeline_statuses_by( 2193 State(state): State<AppState>, 2194 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2195) -> Result<Response, XrpcError> { 2196 list_mirror::<PipelineStatusBy, PipelineStatus<DefaultStr>, _>(&state, q).await 2197} 2198async fn count_pipeline_statuses_by( 2199 State(state): State<AppState>, 2200 XrpcQuery(q): XrpcQuery<CountQuery>, 2201) -> Result<Json<CountResponse>, XrpcError> { 2202 count_mirror::<PipelineStatusBy>(&state, q).map(Json) 2203} 2204 2205async fn list_artifacts_by( 2206 State(state): State<AppState>, 2207 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2208) -> Result<Response, XrpcError> { 2209 list_mirror::<ArtifactBy, Artifact<DefaultStr>, _>(&state, q).await 2210} 2211async fn count_artifacts_by( 2212 State(state): State<AppState>, 2213 XrpcQuery(q): XrpcQuery<CountQuery>, 2214) -> Result<Json<CountResponse>, XrpcError> { 2215 count_mirror::<ArtifactBy>(&state, q).map(Json) 2216} 2217 2218async fn list_collaborators_by( 2219 State(state): State<AppState>, 2220 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2221) -> Result<Response, XrpcError> { 2222 list_mirror::<CollaboratorBy, Collaborator<DefaultStr>, _>(&state, q).await 2223} 2224async fn count_collaborators_by( 2225 State(state): State<AppState>, 2226 XrpcQuery(q): XrpcQuery<CountQuery>, 2227) -> Result<Json<CountResponse>, XrpcError> { 2228 count_mirror::<CollaboratorBy>(&state, q).map(Json) 2229} 2230 2231async fn list_issues_by( 2232 State(state): State<AppState>, 2233 XrpcQuery(q): XrpcQuery<TypedListQuery<IssueFilter>>, 2234) -> Result<Response, XrpcError> { 2235 let (page, nsid) = mirror_edge_page::<IssueBy, _>(&state, &q)?; 2236 let permit = state.heavy_permit()?; 2237 let owned = state.clone(); 2238 let items = hydrate_record_stream::<Issue<DefaultStr>>( 2239 &state, 2240 nsid, 2241 page.items, 2242 HitProvenance::Indexed, 2243 ) 2244 .map(move |view| view.map(|v| enrich_issue_view(&owned, v))); 2245 Ok(json_stream::<StatefulItem<Issue<DefaultStr>>, _>( 2246 "items", 2247 items, 2248 paged_tail(page.next.map(PageToken::encode_token)), 2249 permit, 2250 )) 2251} 2252async fn count_issues_by( 2253 State(state): State<AppState>, 2254 XrpcQuery(q): XrpcQuery<CountQuery>, 2255) -> Result<Json<CountResponse>, XrpcError> { 2256 count_mirror::<IssueBy>(&state, q).map(Json) 2257} 2258 2259async fn list_feed_comments_by( 2260 State(state): State<AppState>, 2261 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2262) -> Result<Response, XrpcError> { 2263 list_mirror::<FeedCommentBy, FeedComment<DefaultStr>, _>(&state, q).await 2264} 2265async fn count_feed_comments_by( 2266 State(state): State<AppState>, 2267 XrpcQuery(q): XrpcQuery<CountQuery>, 2268) -> Result<Json<CountResponse>, XrpcError> { 2269 count_mirror::<FeedCommentBy>(&state, q).map(Json) 2270} 2271 2272async fn list_issue_states_by( 2273 State(state): State<AppState>, 2274 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2275) -> Result<Response, XrpcError> { 2276 list_mirror::<IssueStateBy, IssueState<DefaultStr>, _>(&state, q).await 2277} 2278async fn count_issue_states_by( 2279 State(state): State<AppState>, 2280 XrpcQuery(q): XrpcQuery<CountQuery>, 2281) -> Result<Json<CountResponse>, XrpcError> { 2282 count_mirror::<IssueStateBy>(&state, q).map(Json) 2283} 2284 2285async fn list_pulls_by( 2286 State(state): State<AppState>, 2287 XrpcQuery(q): XrpcQuery<TypedListQuery<PullFilter>>, 2288) -> Result<Response, XrpcError> { 2289 let (page, nsid) = mirror_edge_page::<PullBy, _>(&state, &q)?; 2290 let permit = state.heavy_permit()?; 2291 let owned = state.clone(); 2292 let items = 2293 hydrate_record_stream::<Pull<DefaultStr>>(&state, nsid, page.items, HitProvenance::Indexed) 2294 .map(move |view| view.map(|v| enrich_pull_view(&owned, v))); 2295 Ok(json_stream::<StatefulItem<Pull<DefaultStr>>, _>( 2296 "items", 2297 items, 2298 paged_tail(page.next.map(PageToken::encode_token)), 2299 permit, 2300 )) 2301} 2302async fn count_pulls_by( 2303 State(state): State<AppState>, 2304 XrpcQuery(q): XrpcQuery<CountQuery>, 2305) -> Result<Json<CountResponse>, XrpcError> { 2306 count_mirror::<PullBy>(&state, q).map(Json) 2307} 2308 2309async fn list_pull_statuses_by( 2310 State(state): State<AppState>, 2311 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2312) -> Result<Response, XrpcError> { 2313 list_mirror::<PullStatusBy, PullStatus<DefaultStr>, _>(&state, q).await 2314} 2315async fn count_pull_statuses_by( 2316 State(state): State<AppState>, 2317 XrpcQuery(q): XrpcQuery<CountQuery>, 2318) -> Result<Json<CountResponse>, XrpcError> { 2319 count_mirror::<PullStatusBy>(&state, q).map(Json) 2320} 2321 2322async fn list_spindle_members_by( 2323 State(state): State<AppState>, 2324 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2325) -> Result<Response, XrpcError> { 2326 list_mirror::<SpindleMemberBy, SpindleMember<DefaultStr>, _>(&state, q).await 2327} 2328async fn count_spindle_members_by( 2329 State(state): State<AppState>, 2330 XrpcQuery(q): XrpcQuery<CountQuery>, 2331) -> Result<Json<CountResponse>, XrpcError> { 2332 count_mirror::<SpindleMemberBy>(&state, q).map(Json) 2333} 2334 2335async fn list_label_definitions( 2336 State(state): State<AppState>, 2337 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2338) -> Result<Response, XrpcError> { 2339 list_records::<LabelDefinitionRecord, LabelDefinition<DefaultStr>, _>(&state, q).await 2340} 2341 2342async fn count_label_definitions( 2343 State(state): State<AppState>, 2344 XrpcQuery(q): XrpcQuery<CountQuery>, 2345) -> Result<Json<CountResponse>, XrpcError> { 2346 count_for::<LabelDefinitionRecord>(&state, q).map(Json) 2347} 2348 2349async fn list_label_ops( 2350 State(state): State<AppState>, 2351 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2352) -> Result<Response, XrpcError> { 2353 list_records::<LabelOpRecord, LabelOp<DefaultStr>, _>(&state, q).await 2354} 2355 2356async fn count_label_ops( 2357 State(state): State<AppState>, 2358 XrpcQuery(q): XrpcQuery<CountQuery>, 2359) -> Result<Json<CountResponse>, XrpcError> { 2360 count_for::<LabelOpRecord>(&state, q).map(Json) 2361} 2362 2363async fn list_pipelines( 2364 State(state): State<AppState>, 2365 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2366) -> Result<Response, XrpcError> { 2367 list_records::<PipelineRecord, Pipeline<DefaultStr>, _>(&state, q).await 2368} 2369 2370async fn count_pipelines( 2371 State(state): State<AppState>, 2372 XrpcQuery(q): XrpcQuery<CountQuery>, 2373) -> Result<Json<CountResponse>, XrpcError> { 2374 count_for::<PipelineRecord>(&state, q).map(Json) 2375} 2376 2377async fn list_pipeline_statuses( 2378 State(state): State<AppState>, 2379 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2380) -> Result<Response, XrpcError> { 2381 list_records::<PipelineStatusRecord, PipelineStatus<DefaultStr>, _>(&state, q).await 2382} 2383 2384async fn count_pipeline_statuses( 2385 State(state): State<AppState>, 2386 XrpcQuery(q): XrpcQuery<CountQuery>, 2387) -> Result<Json<CountResponse>, XrpcError> { 2388 count_for::<PipelineStatusRecord>(&state, q).map(Json) 2389} 2390 2391async fn list_artifacts( 2392 State(state): State<AppState>, 2393 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2394) -> Result<Response, XrpcError> { 2395 list_records::<ArtifactRecord, Artifact<DefaultStr>, _>(&state, q).await 2396} 2397 2398async fn count_artifacts( 2399 State(state): State<AppState>, 2400 XrpcQuery(q): XrpcQuery<CountQuery>, 2401) -> Result<Json<CountResponse>, XrpcError> { 2402 count_for::<ArtifactRecord>(&state, q).map(Json) 2403} 2404 2405async fn list_knot_members( 2406 State(state): State<AppState>, 2407 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2408) -> Result<Response, XrpcError> { 2409 list_records::<KnotMemberRecord, KnotMember<DefaultStr>, _>(&state, q).await 2410} 2411 2412async fn count_knot_members( 2413 State(state): State<AppState>, 2414 XrpcQuery(q): XrpcQuery<CountQuery>, 2415) -> Result<Json<CountResponse>, XrpcError> { 2416 count_for::<KnotMemberRecord>(&state, q).map(Json) 2417} 2418 2419async fn list_spindle_members( 2420 State(state): State<AppState>, 2421 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2422) -> Result<Response, XrpcError> { 2423 list_records::<SpindleMemberRecord, SpindleMember<DefaultStr>, _>(&state, q).await 2424} 2425 2426async fn count_spindle_members( 2427 State(state): State<AppState>, 2428 XrpcQuery(q): XrpcQuery<CountQuery>, 2429) -> Result<Json<CountResponse>, XrpcError> { 2430 count_for::<SpindleMemberRecord>(&state, q).map(Json) 2431} 2432 2433async fn list_strings( 2434 State(state): State<AppState>, 2435 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>, 2436) -> Result<Response, XrpcError> { 2437 list_records::<TangledStringRecord, TangledString<DefaultStr>, _>(&state, q).await 2438} 2439 2440async fn count_strings( 2441 State(state): State<AppState>, 2442 XrpcQuery(q): XrpcQuery<CountQuery>, 2443) -> Result<Json<CountResponse>, XrpcError> { 2444 count_for::<TangledStringRecord>(&state, q).map(Json) 2445} 2446 2447#[derive(Deserialize)] 2448struct ResolveMiniDocParams { 2449 identifier: AtIdentifier<DefaultStr>, 2450} 2451 2452async fn resolve_mini_doc( 2453 State(state): State<AppState>, 2454 XrpcQuery(q): XrpcQuery<ResolveMiniDocParams>, 2455) -> Result<Response, XrpcError> { 2456 let body = state 2457 .slingshot 2458 .resolve_mini_doc(&q.identifier) 2459 .await 2460 .map_err(map_slingshot)?; 2461 Ok((StatusCode::OK, [(CONTENT_TYPE, "application/json")], body).into_response()) 2462} 2463 2464async fn get_coverage(State(state): State<AppState>) -> Json<CoverageEnvelope> { 2465 Json(state.coverage.snapshot().into()) 2466} 2467 2468async fn search_query( 2469 State(state): State<AppState>, 2470 XrpcQuery(q): XrpcQuery<SearchQueryParams>, 2471) -> Result<Response, XrpcError> { 2472 if q.q.trim().is_empty() { 2473 return Err(XrpcError::InvalidParams("q must not be empty".into())); 2474 } 2475 let cursor = SearchCursor::from_token(q.cursor.as_deref()) 2476 .map_err(|e| XrpcError::InvalidParams(format!("cursor: {e}")))?; 2477 let limit = parse_limit(q.limit)?; 2478 let filters = build_search_filters(&q)?; 2479 let permit = state.heavy_permit()?; 2480 let page = state 2481 .search 2482 .search(&q.q, filters, cursor, limit.get()) 2483 .await 2484 .map_err(map_search_err)?; 2485 let next = page.next.map(SearchOffset::encode_token); 2486 let owned = state.clone(); 2487 let hits = hydrate_stream(page.hits, move |hit| { 2488 let owned = owned.clone(); 2489 let uri = hit.uri.clone(); 2490 let nsid = hit.nsid.clone(); 2491 async move { 2492 let result = hydrate_search_hit(&owned, hit).await; 2493 drop_unhydratable(HitProvenance::Indexed, &nsid, &uri, result) 2494 } 2495 }); 2496 Ok(json_stream::<SearchHitView, _>( 2497 "hits", 2498 hits, 2499 paged_tail(next), 2500 permit, 2501 )) 2502} 2503 2504fn build_search_filters(q: &SearchQueryParams) -> Result<SearchFilters, XrpcError> { 2505 let since = q 2506 .since 2507 .as_deref() 2508 .map(parse_rfc3339_seconds) 2509 .transpose() 2510 .map_err(|e| XrpcError::InvalidParams(format!("since: {e}")))?; 2511 let until = q 2512 .until 2513 .as_deref() 2514 .map(parse_rfc3339_seconds) 2515 .transpose() 2516 .map_err(|e| XrpcError::InvalidParams(format!("until: {e}")))?; 2517 if let (Some(s), Some(u)) = (since, until) 2518 && s > u 2519 { 2520 return Err(XrpcError::InvalidParams("since must be <= until".into())); 2521 } 2522 Ok(SearchFilters { 2523 nsid: q.nsid.clone(), 2524 author: q.author.clone(), 2525 repo: q.repo.clone(), 2526 since, 2527 until, 2528 }) 2529} 2530 2531fn parse_rfc3339_seconds(raw: &str) -> Result<i64, String> { 2532 chrono::DateTime::parse_from_rfc3339(raw) 2533 .map(|dt| dt.timestamp()) 2534 .map_err(|e| format!("expected RFC3339, got {raw}: {e}")) 2535} 2536 2537async fn hydrate_search_hit( 2538 state: &AppState, 2539 hit: SearchHit, 2540) -> Result<Option<SearchHitView>, XrpcError> { 2541 let SearchHit { uri, nsid, score } = hit; 2542 let body = resolve_for_view(state, &nsid, uri).await?; 2543 let record = decode_canon_or_upgrade(&nsid, &body.value, &state.resolver) 2544 .await 2545 .map_err(|err| XrpcError::InvalidRecord(err.to_string()))?; 2546 let Some(value) = SearchableRecord::try_from_record(record) else { 2547 return Ok(None); 2548 }; 2549 let Some(value) = value.normalize(&state.resolver).await else { 2550 return Ok(None); 2551 }; 2552 Ok(Some(SearchHitView { 2553 uri: body.uri.clone(), 2554 cid: Some(body.cid.clone()), 2555 nsid, 2556 score, 2557 value, 2558 })) 2559} 2560 2561fn map_search_err(err: SearchError) -> XrpcError { 2562 use SearchError as E; 2563 match err { 2564 E::Query(e) => XrpcError::InvalidParams(format!("query: {e}")), 2565 e @ (E::Tantivy(_) 2566 | E::InvalidUri(_) 2567 | E::InvalidNsid(_) 2568 | E::MissingField(_) 2569 | E::Cancelled(_)) => XrpcError::Internal(format!("search: {e}")), 2570 } 2571} 2572 2573fn map_proxy_error(err: KnotProxyError) -> XrpcError { 2574 match err { 2575 KnotProxyError::CircuitOpen => { 2576 XrpcError::UpstreamUnavailable("knot circuit breaker open".into()) 2577 } 2578 KnotProxyError::BlockedHost { host, reason } => { 2579 XrpcError::InvalidRecord(format!("knot host {host} is {reason} address space")) 2580 } 2581 KnotProxyError::PlaintextHttp { host } => { 2582 XrpcError::InvalidRecord(format!("knot host {host} requires https")) 2583 } 2584 KnotProxyError::Connect(e) => XrpcError::UpstreamUnavailable(format!("connect: {e}")), 2585 KnotProxyError::Timeout(e) => { 2586 XrpcError::UpstreamUnavailable(format!("upstream timeout: {e}")) 2587 } 2588 KnotProxyError::Redirect(e) => XrpcError::UpstreamUnavailable(format!("redirect: {e}")), 2589 KnotProxyError::Transport(e) => XrpcError::UpstreamUnavailable(format!("transport: {e}")), 2590 KnotProxyError::Upstream(s) => XrpcError::UpstreamUnavailable(format!("status {s}")), 2591 } 2592} 2593 2594fn validate_client_supplied_knot(state: &AppState, host: &KnotHost) -> Result<(), XrpcError> { 2595 let host_str = || host.url().host_str().unwrap_or_default().to_owned(); 2596 if state.knots.requires_https() && host.url().scheme() != "https" { 2597 return Err(XrpcError::InvalidParams(format!( 2598 "knot host {} must be https", 2599 host_str(), 2600 ))); 2601 } 2602 if state.knots.allows_private_hosts() { 2603 return Ok(()); 2604 } 2605 match host.private_literal_reason() { 2606 None => Ok(()), 2607 Some(reason) => Err(XrpcError::InvalidParams(format!( 2608 "knot host {} blocked: {} address space", 2609 host_str(), 2610 reason, 2611 ))), 2612 } 2613} 2614 2615async fn resolve_knot_target( 2616 state: &AppState, 2617 repo_uri: AtUri<DefaultStr>, 2618) -> Result<(KnotHost, RepoSlug), XrpcError> { 2619 let rkey: Option<Rkey<DefaultStr>> = repo_uri.rkey().map(|r| r.clone().into_static()); 2620 let (body, did) = resolve(state, ExpectedNsid::from_static(RepoRecord::NSID), repo_uri).await?; 2621 let value: Repo<DefaultStr> = serde_json::from_slice(&body.value) 2622 .map_err(|e| XrpcError::InvalidRecord(format!("decode repo record: {e}")))?; 2623 let host = KnotHost::parse(value.knot.as_ref()) 2624 .map_err(|e| XrpcError::InvalidRecord(format!("knot field: {e}")))?; 2625 let name = pick_human_slug(rkey.as_ref(), value.name.as_deref()).ok_or_else(|| { 2626 XrpcError::InvalidRecord("at-uri missing rkey and record missing name".to_string()) 2627 })?; 2628 let slug = RepoSlug::new(&did, &name) 2629 .map_err(|e| XrpcError::InvalidRecord(format!("repo slug: {e}")))?; 2630 Ok((host, slug)) 2631} 2632 2633fn pick_human_slug(rkey: Option<&Rkey<DefaultStr>>, name: Option<&str>) -> Option<String> { 2634 match rkey { 2635 Some(r) if jacquard_common::types::tid::Tid::new(r.as_ref()).is_ok() => { 2636 Some(name.unwrap_or(r.as_ref()).to_owned()) 2637 } 2638 Some(r) => Some(r.as_ref().to_owned()), 2639 None => name.map(str::to_owned), 2640 } 2641} 2642 2643fn filter_request_headers(client: &HeaderMap) -> HeaderMap { 2644 FORWARDED_REQUEST_HEADERS 2645 .iter() 2646 .fold(HeaderMap::new(), |mut acc, name| { 2647 if let Some(value) = client.get(*name) { 2648 acc.insert((*name).clone(), value.clone()); 2649 } 2650 acc 2651 }) 2652} 2653 2654fn upstream_to_axum(resp: ProxyResponse) -> Response { 2655 let status = resp.status(); 2656 let upstream_headers = resp.headers().clone(); 2657 let body = Body::from_stream(resp.into_body_stream()); 2658 let mut response = Response::builder() 2659 .status(status) 2660 .body(body) 2661 .expect("response body construction must succeed"); 2662 let response_headers = response.headers_mut(); 2663 PASSTHROUGH_HEADERS.iter().for_each(|name| { 2664 if let Some(value) = upstream_headers.get(*name) { 2665 response_headers.insert((*name).clone(), value.clone()); 2666 } 2667 }); 2668 response 2669} 2670 2671async fn dispatch_proxy( 2672 state: AppState, 2673 headers: HeaderMap, 2674 nsid: Nsid<DefaultStr>, 2675 host: KnotHost, 2676 params: ProxyParams, 2677) -> Result<Response, XrpcError> { 2678 let forward: Vec<(&str, &str)> = params 2679 .iter() 2680 .map(|(k, v)| (k.as_str(), v.as_str())) 2681 .collect(); 2682 let allowed = filter_request_headers(&headers); 2683 let upstream = state 2684 .knots 2685 .forward(&host, &nsid, &forward, allowed) 2686 .await 2687 .map_err(map_proxy_error)?; 2688 Ok(upstream_to_axum(upstream)) 2689} 2690 2691fn extract_param( 2692 params: ProxyParams, 2693 key: &str, 2694) -> Result<Option<(String, ProxyParams)>, XrpcError> { 2695 let (matching, rest): (ProxyParams, ProxyParams) = 2696 params.into_iter().partition(|(k, _)| k == key); 2697 match matching.as_slice() { 2698 [] => Ok(None), 2699 [_] => Ok(matching.into_iter().next().map(|(_, v)| (v, rest))), 2700 _ => Err(XrpcError::InvalidParams(format!( 2701 "{key} parameter must appear at most once, got {}", 2702 matching.len(), 2703 ))), 2704 } 2705} 2706 2707async fn proxy_repo_handler( 2708 state: AppState, 2709 headers: HeaderMap, 2710 params: ProxyParams, 2711 nsid: Nsid<DefaultStr>, 2712) -> Result<Response, XrpcError> { 2713 let (repo_raw, rest) = extract_param(params, REPO_PARAM)? 2714 .ok_or_else(|| XrpcError::InvalidParams("missing repo".into()))?; 2715 let repo_uri = parse_uri(&repo_raw)?; 2716 let (host, slug) = resolve_knot_target(&state, repo_uri).await?; 2717 let forward = rest 2718 .into_iter() 2719 .chain(std::iter::once(( 2720 REPO_PARAM.to_owned(), 2721 slug.as_str().to_owned(), 2722 ))) 2723 .collect(); 2724 dispatch_proxy(state, headers, nsid, host, forward).await 2725} 2726 2727async fn proxy_knot_handler( 2728 state: AppState, 2729 headers: HeaderMap, 2730 params: ProxyParams, 2731 nsid: Nsid<DefaultStr>, 2732) -> Result<Response, XrpcError> { 2733 let (knot_raw, forward) = extract_param(params, KNOT_HOST_PARAM)? 2734 .ok_or_else(|| XrpcError::InvalidParams("missing knot".into()))?; 2735 let host = 2736 KnotHost::parse(&knot_raw).map_err(|e| XrpcError::InvalidParams(format!("knot: {e}")))?; 2737 validate_client_supplied_knot(&state, &host)?; 2738 dispatch_proxy(state, headers, nsid, host, forward).await 2739}