Monorepo for Tangled
tangled.org
1use std::future::Future;
2use std::sync::Arc;
3
4use bobbin_resolver::{
5 NormalizeRepoRefs, decode_canon_or_upgrade, normalize_record_fields, scrub_record_bytes,
6 upgrade_wire_bytes,
7};
8
9use axum::{
10 Router,
11 body::Body,
12 extract::{FromRequestParts, Query, RawQuery, State, rejection::QueryRejection},
13 http::{
14 HeaderMap, HeaderName, StatusCode,
15 header::{
16 ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LENGTH,
17 CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_RANGE,
18 LAST_MODIFIED, RANGE,
19 },
20 request::Parts,
21 },
22 response::{IntoResponse, Json, Response},
23 routing::get,
24};
25use bobbin_edge_index::{
26 Coverage, CoverageWatch, CursorParseError, EdgePage, EdgeStore, IssueStateKind, PageCursor,
27 PageLimit, PageToken, PullStatusKind, SortDir, StateIndex, StateKind,
28};
29use bobbin_knot_proxy::{KnotHost, KnotProxy, KnotProxyError, ProxyResponse, RepoSlug};
30use bobbin_record_lru::RecordStore;
31use bobbin_resolver::RepoIdResolver;
32use bobbin_search::{
33 SearchCursor, SearchError, SearchFilters, SearchHit, SearchOffset, SearchReader,
34};
35use bobbin_slingshot_client::{SlingshotClient, SlingshotError};
36use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static};
37use bobbin_types::record::RecordBody;
38use bobbin_types::search::SearchableRecord;
39use bobbin_types::sh_tangled::actor::profile::{Profile, ProfileGetRecordOutput, ProfileRecord};
40use bobbin_types::sh_tangled::feed::comment::{
41 Comment as FeedComment, CommentRecord as FeedCommentRecord,
42};
43use bobbin_types::sh_tangled::feed::reaction::{Reaction, ReactionRecord};
44use bobbin_types::sh_tangled::feed::star::{Star, StarRecord};
45use bobbin_types::sh_tangled::git::ref_update::{RefUpdate, RefUpdateRecord};
46use bobbin_types::sh_tangled::graph::follow::{Follow, FollowRecord};
47use bobbin_types::sh_tangled::graph::vouch::{Vouch, VouchRecord};
48use bobbin_types::sh_tangled::knot::member::{
49 Member as KnotMember, MemberRecord as KnotMemberRecord,
50};
51use bobbin_types::sh_tangled::knot::{Knot, KnotRecord};
52use bobbin_types::sh_tangled::label::definition::{
53 Definition as LabelDefinition, DefinitionRecord as LabelDefinitionRecord,
54};
55use bobbin_types::sh_tangled::label::op::{Op as LabelOp, OpRecord as LabelOpRecord};
56use bobbin_types::sh_tangled::pipeline::status::{
57 Status as PipelineStatus, StatusRecord as PipelineStatusRecord,
58};
59use bobbin_types::sh_tangled::pipeline::{Pipeline, PipelineRecord};
60use bobbin_types::sh_tangled::public_key::{PublicKey, PublicKeyRecord};
61use bobbin_types::sh_tangled::repo::artifact::{Artifact, ArtifactRecord};
62use bobbin_types::sh_tangled::repo::collaborator::{Collaborator, CollaboratorRecord};
63use bobbin_types::sh_tangled::repo::issue::state::{
64 State as IssueState, StateRecord as IssueStateRecord,
65};
66use bobbin_types::sh_tangled::repo::issue::{Issue, IssueGetRecordOutput, IssueRecord};
67use bobbin_types::sh_tangled::repo::pull::status::{
68 Status as PullStatus, StatusRecord as PullStatusRecord,
69};
70use bobbin_types::sh_tangled::repo::pull::{Pull, PullGetRecordOutput, PullRecord};
71use bobbin_types::sh_tangled::repo::{Repo, RepoGetRecordOutput, RepoRecord};
72use bobbin_types::sh_tangled::spindle::member::{
73 Member as SpindleMember, MemberRecord as SpindleMemberRecord,
74};
75use bobbin_types::sh_tangled::spindle::{Spindle, SpindleRecord};
76use bobbin_types::sh_tangled::string::{TangledString, TangledStringRecord};
77use futures::Stream;
78use futures::stream::{self, StreamExt, TryStreamExt};
79use jacquard_common::types::did::Did;
80use jacquard_common::types::ident::AtIdentifier;
81use jacquard_common::types::nsid::Nsid;
82use jacquard_common::types::recordkey::Rkey;
83use jacquard_common::types::string::{AtUri, Cid};
84use jacquard_common::xrpc::XrpcResp;
85use jacquard_common::{DefaultStr, IntoStatic};
86use serde::{Deserialize, Serialize};
87use std::convert::Infallible;
88use std::time::Duration;
89use thiserror::Error;
90use url::form_urlencoded;
91
92use tower_http::classify::ServerErrorsFailureClass;
93use tower_http::trace::{DefaultMakeSpan, OnFailure, OnResponse, TraceLayer};
94use tracing::{Level, Span};
95
96mod backpressure;
97mod filter;
98
99pub use backpressure::{
100 HeavyLimiter, HeavyPermit, MaxInFlight, PerRequestAnonBytes, PressureVerdict, ReservedFloor,
101};
102use filter::{IssueFilter, ListFilter, NoFilter, PullFilter};
103
104const DEFAULT_LIMIT: u32 = 50;
105const FETCH_CONCURRENCY: usize = 8;
106
107#[derive(Clone)]
108pub struct AppState {
109 pub records: Arc<dyn RecordStore>,
110 pub slingshot: SlingshotClient,
111 pub edges: Arc<EdgeStore>,
112 pub issue_states: Arc<StateIndex<IssueStateKind>>,
113 pub pull_statuses: Arc<StateIndex<PullStatusKind>>,
114 pub coverage: Arc<CoverageWatch>,
115 pub knots: Arc<KnotProxy>,
116 pub search: Arc<dyn SearchReader>,
117 pub resolver: Arc<RepoIdResolver>,
118 pub limiter: Option<Arc<HeavyLimiter>>,
119}
120
121impl AppState {
122 #[allow(clippy::too_many_arguments)]
123 pub fn new(
124 records: Arc<dyn RecordStore>,
125 slingshot: SlingshotClient,
126 edges: Arc<EdgeStore>,
127 issue_states: Arc<StateIndex<IssueStateKind>>,
128 pull_statuses: Arc<StateIndex<PullStatusKind>>,
129 coverage: Arc<CoverageWatch>,
130 knots: Arc<KnotProxy>,
131 search: Arc<dyn SearchReader>,
132 resolver: Arc<RepoIdResolver>,
133 ) -> Self {
134 Self {
135 records,
136 slingshot,
137 edges,
138 issue_states,
139 pull_statuses,
140 coverage,
141 knots,
142 search,
143 resolver,
144 limiter: None,
145 }
146 }
147
148 pub fn with_limiter(mut self, limiter: Option<Arc<HeavyLimiter>>) -> Self {
149 self.limiter = limiter;
150 self
151 }
152
153 fn heavy_permit(&self) -> Result<Option<HeavyPermit>, XrpcError> {
154 self.limiter.as_ref().map(|l| l.try_enter()).transpose()
155 }
156}
157
158pub fn router(state: AppState) -> Router {
159 Router::new()
160 .route("/xrpc/sh.tangled.repo.getRepo", get(get_repo))
161 .route("/xrpc/sh.tangled.repo.getRepos", get(get_repos))
162 .route(
163 "/xrpc/sh.tangled.repo.getRepoByRepoDid",
164 get(get_repo_by_repo_did),
165 )
166 .route("/xrpc/sh.tangled.actor.getProfile", get(get_profile))
167 .route("/xrpc/sh.tangled.actor.getProfiles", get(get_profiles))
168 .route("/xrpc/sh.tangled.repo.getIssue", get(get_issue))
169 .route("/xrpc/sh.tangled.repo.getIssues", get(get_issues))
170 .route("/xrpc/sh.tangled.repo.getPull", get(get_pull))
171 .route("/xrpc/sh.tangled.repo.getPulls", get(get_pulls))
172 .route("/xrpc/sh.tangled.feed.listStars", get(list_stars))
173 .route("/xrpc/sh.tangled.feed.countStars", get(count_stars))
174 .route("/xrpc/sh.tangled.graph.listFollows", get(list_follows))
175 .route("/xrpc/sh.tangled.graph.countFollows", get(count_follows))
176 .route("/xrpc/sh.tangled.repo.listIssues", get(list_issues))
177 .route("/xrpc/sh.tangled.repo.countIssues", get(count_issues))
178 .route("/xrpc/sh.tangled.repo.listPulls", get(list_pulls))
179 .route("/xrpc/sh.tangled.repo.countPulls", get(count_pulls))
180 .route(
181 "/xrpc/sh.tangled.feed.listComments",
182 get(list_feed_comments),
183 )
184 .route(
185 "/xrpc/sh.tangled.feed.countComments",
186 get(count_feed_comments),
187 )
188 .route("/xrpc/sh.tangled.feed.listReactions", get(list_reactions))
189 .route("/xrpc/sh.tangled.feed.countReactions", get(count_reactions))
190 .route("/xrpc/sh.tangled.git.listRefUpdates", get(list_ref_updates))
191 .route(
192 "/xrpc/sh.tangled.git.countRefUpdates",
193 get(count_ref_updates),
194 )
195 .route(
196 "/xrpc/sh.tangled.repo.listCollaborators",
197 get(list_collaborators),
198 )
199 .route(
200 "/xrpc/sh.tangled.repo.countCollaborators",
201 get(count_collaborators),
202 )
203 .route(
204 "/xrpc/sh.tangled.repo.issue.listStates",
205 get(list_issue_states),
206 )
207 .route(
208 "/xrpc/sh.tangled.repo.issue.countStates",
209 get(count_issue_states),
210 )
211 .route(
212 "/xrpc/sh.tangled.repo.pull.listStatuses",
213 get(list_pull_statuses),
214 )
215 .route(
216 "/xrpc/sh.tangled.repo.pull.countStatuses",
217 get(count_pull_statuses),
218 )
219 .route("/xrpc/sh.tangled.repo.listRepos", get(list_repos))
220 .route("/xrpc/sh.tangled.repo.countRepos", get(count_repos))
221 .route("/xrpc/sh.tangled.knot.listKnots", get(list_knots))
222 .route("/xrpc/sh.tangled.knot.countKnots", get(count_knots))
223 .route("/xrpc/sh.tangled.spindle.listSpindles", get(list_spindles))
224 .route(
225 "/xrpc/sh.tangled.spindle.countSpindles",
226 get(count_spindles),
227 )
228 .route("/xrpc/sh.tangled.publicKey.listKeys", get(list_public_keys))
229 .route(
230 "/xrpc/sh.tangled.publicKey.countKeys",
231 get(count_public_keys),
232 )
233 .route("/xrpc/sh.tangled.graph.listVouches", get(list_vouches))
234 .route("/xrpc/sh.tangled.graph.countVouches", get(count_vouches))
235 .route("/xrpc/sh.tangled.feed.listStarsBy", get(list_stars_by))
236 .route("/xrpc/sh.tangled.feed.countStarsBy", get(count_stars_by))
237 .route(
238 "/xrpc/sh.tangled.feed.listReactionsBy",
239 get(list_reactions_by),
240 )
241 .route(
242 "/xrpc/sh.tangled.feed.countReactionsBy",
243 get(count_reactions_by),
244 )
245 .route("/xrpc/sh.tangled.graph.listFollowsBy", get(list_follows_by))
246 .route(
247 "/xrpc/sh.tangled.graph.countFollowsBy",
248 get(count_follows_by),
249 )
250 .route("/xrpc/sh.tangled.graph.listVouchesBy", get(list_vouches_by))
251 .route(
252 "/xrpc/sh.tangled.graph.countVouchesBy",
253 get(count_vouches_by),
254 )
255 .route(
256 "/xrpc/sh.tangled.git.listRefUpdatesBy",
257 get(list_ref_updates_by),
258 )
259 .route(
260 "/xrpc/sh.tangled.git.countRefUpdatesBy",
261 get(count_ref_updates_by),
262 )
263 .route(
264 "/xrpc/sh.tangled.knot.listMembersBy",
265 get(list_knot_members_by),
266 )
267 .route(
268 "/xrpc/sh.tangled.knot.countMembersBy",
269 get(count_knot_members_by),
270 )
271 .route("/xrpc/sh.tangled.label.listOpsBy", get(list_label_ops_by))
272 .route("/xrpc/sh.tangled.label.countOpsBy", get(count_label_ops_by))
273 .route(
274 "/xrpc/sh.tangled.pipeline.listPipelinesBy",
275 get(list_pipelines_by),
276 )
277 .route(
278 "/xrpc/sh.tangled.pipeline.countPipelinesBy",
279 get(count_pipelines_by),
280 )
281 .route(
282 "/xrpc/sh.tangled.pipeline.listStatusesBy",
283 get(list_pipeline_statuses_by),
284 )
285 .route(
286 "/xrpc/sh.tangled.pipeline.countStatusesBy",
287 get(count_pipeline_statuses_by),
288 )
289 .route(
290 "/xrpc/sh.tangled.repo.listArtifactsBy",
291 get(list_artifacts_by),
292 )
293 .route(
294 "/xrpc/sh.tangled.repo.countArtifactsBy",
295 get(count_artifacts_by),
296 )
297 .route(
298 "/xrpc/sh.tangled.repo.listCollaboratorsBy",
299 get(list_collaborators_by),
300 )
301 .route(
302 "/xrpc/sh.tangled.repo.countCollaboratorsBy",
303 get(count_collaborators_by),
304 )
305 .route("/xrpc/sh.tangled.repo.listIssuesBy", get(list_issues_by))
306 .route("/xrpc/sh.tangled.repo.countIssuesBy", get(count_issues_by))
307 .route(
308 "/xrpc/sh.tangled.feed.listCommentsBy",
309 get(list_feed_comments_by),
310 )
311 .route(
312 "/xrpc/sh.tangled.feed.countCommentsBy",
313 get(count_feed_comments_by),
314 )
315 .route(
316 "/xrpc/sh.tangled.repo.issue.listStatesBy",
317 get(list_issue_states_by),
318 )
319 .route(
320 "/xrpc/sh.tangled.repo.issue.countStatesBy",
321 get(count_issue_states_by),
322 )
323 .route("/xrpc/sh.tangled.repo.listPullsBy", get(list_pulls_by))
324 .route("/xrpc/sh.tangled.repo.countPullsBy", get(count_pulls_by))
325 .route(
326 "/xrpc/sh.tangled.repo.pull.listStatusesBy",
327 get(list_pull_statuses_by),
328 )
329 .route(
330 "/xrpc/sh.tangled.repo.pull.countStatusesBy",
331 get(count_pull_statuses_by),
332 )
333 .route(
334 "/xrpc/sh.tangled.spindle.listMembersBy",
335 get(list_spindle_members_by),
336 )
337 .route(
338 "/xrpc/sh.tangled.spindle.countMembersBy",
339 get(count_spindle_members_by),
340 )
341 .route(
342 "/xrpc/sh.tangled.label.listDefinitions",
343 get(list_label_definitions),
344 )
345 .route(
346 "/xrpc/sh.tangled.label.countDefinitions",
347 get(count_label_definitions),
348 )
349 .route("/xrpc/sh.tangled.label.listOps", get(list_label_ops))
350 .route("/xrpc/sh.tangled.label.countOps", get(count_label_ops))
351 .route(
352 "/xrpc/sh.tangled.pipeline.listPipelines",
353 get(list_pipelines),
354 )
355 .route(
356 "/xrpc/sh.tangled.pipeline.countPipelines",
357 get(count_pipelines),
358 )
359 .route(
360 "/xrpc/sh.tangled.pipeline.listStatuses",
361 get(list_pipeline_statuses),
362 )
363 .route(
364 "/xrpc/sh.tangled.pipeline.countStatuses",
365 get(count_pipeline_statuses),
366 )
367 .route("/xrpc/sh.tangled.repo.listArtifacts", get(list_artifacts))
368 .route("/xrpc/sh.tangled.repo.countArtifacts", get(count_artifacts))
369 .route("/xrpc/sh.tangled.knot.listMembers", get(list_knot_members))
370 .route(
371 "/xrpc/sh.tangled.knot.countMembers",
372 get(count_knot_members),
373 )
374 .route(
375 "/xrpc/sh.tangled.spindle.listMembers",
376 get(list_spindle_members),
377 )
378 .route(
379 "/xrpc/sh.tangled.spindle.countMembers",
380 get(count_spindle_members),
381 )
382 .route("/xrpc/sh.tangled.string.listStrings", get(list_strings))
383 .route("/xrpc/sh.tangled.string.countStrings", get(count_strings))
384 .route("/xrpc/sh.tangled.search.query", get(search_query))
385 .route("/xrpc/sh.tangled.bobbin.getCoverage", get(get_coverage))
386 .route(
387 "/xrpc/com.bad-example.identity.resolveMiniDoc",
388 get(resolve_mini_doc),
389 )
390 .merge(knot_proxied_routes())
391 .layer(
392 TraceLayer::new_for_http()
393 .make_span_with(DefaultMakeSpan::new().level(Level::INFO))
394 .on_request(())
395 .on_response(LatencyFreeTrace)
396 .on_failure(LatencyFreeTrace),
397 )
398 .with_state(state)
399}
400
401#[derive(Clone, Copy, Debug)]
402struct LatencyFreeTrace;
403
404impl<B> OnResponse<B> for LatencyFreeTrace {
405 fn on_response(self, response: &Response<B>, _latency: Duration, _span: &Span) {
406 tracing::event!(
407 target: "tower_http::trace::on_response",
408 Level::INFO,
409 status = response.status().as_u16(),
410 "request completed",
411 );
412 }
413}
414
415impl OnFailure<ServerErrorsFailureClass> for LatencyFreeTrace {
416 fn on_failure(&mut self, error: ServerErrorsFailureClass, _latency: Duration, _span: &Span) {
417 tracing::event!(
418 target: "tower_http::trace::on_failure",
419 Level::WARN,
420 error = %error,
421 "request failed",
422 );
423 }
424}
425
426const REPO_PROXIED_NSIDS: &[&str] = &[
427 "sh.tangled.repo.archive",
428 "sh.tangled.repo.blob",
429 "sh.tangled.repo.branch",
430 "sh.tangled.repo.branches",
431 "sh.tangled.repo.compare",
432 "sh.tangled.repo.describeRepo",
433 "sh.tangled.repo.diff",
434 "sh.tangled.repo.getDefaultBranch",
435 "sh.tangled.repo.languages",
436 "sh.tangled.repo.listSecrets",
437 "sh.tangled.repo.log",
438 "sh.tangled.repo.tag",
439 "sh.tangled.repo.tags",
440 "sh.tangled.repo.tree",
441];
442
443const KNOT_PROXIED_NSIDS: &[&str] = &[
444 "sh.tangled.owner",
445 "sh.tangled.knot.version",
446 "sh.tangled.knot.listKeys",
447];
448
449const PASSTHROUGH_HEADERS: &[&HeaderName] = &[
450 &CONTENT_TYPE,
451 &CONTENT_LENGTH,
452 &CONTENT_ENCODING,
453 &ETAG,
454 &CACHE_CONTROL,
455 &LAST_MODIFIED,
456 &CONTENT_DISPOSITION,
457 &ACCEPT_RANGES,
458 &CONTENT_RANGE,
459];
460
461const FORWARDED_REQUEST_HEADERS: &[&HeaderName] =
462 &[&RANGE, &IF_RANGE, &IF_NONE_MATCH, &IF_MODIFIED_SINCE];
463
464const KNOT_HOST_PARAM: &str = "knot";
465const REPO_PARAM: &str = "repo";
466
467type ProxyParams = Vec<(String, String)>;
468
469fn knot_proxied_routes() -> Router<AppState> {
470 let with_repo = register_proxied(Router::new(), REPO_PROXIED_NSIDS, proxy_repo_handler);
471 register_proxied(with_repo, KNOT_PROXIED_NSIDS, proxy_knot_handler)
472}
473
474fn register_proxied<H, Fut>(
475 router: Router<AppState>,
476 nsids: &[&'static str],
477 handler: H,
478) -> Router<AppState>
479where
480 H: Fn(AppState, HeaderMap, ProxyParams, Nsid<DefaultStr>) -> Fut
481 + Clone
482 + Send
483 + Sync
484 + 'static,
485 Fut: Future<Output = Result<Response, XrpcError>> + Send + 'static,
486{
487 nsids.iter().fold(router, |router, &nsid_lit| {
488 let handler = handler.clone();
489 let nsid = nsid_static(nsid_lit);
490 router.route(
491 &format!("/xrpc/{nsid_lit}"),
492 get(
493 move |State(state): State<AppState>,
494 headers: HeaderMap,
495 Query(params): Query<ProxyParams>| {
496 handler(state, headers, params, nsid.clone())
497 },
498 ),
499 )
500 })
501}
502
503#[derive(Clone, Debug)]
504pub enum SubjectQuery {
505 Did(Did<DefaultStr>),
506 Uri(AtUri<DefaultStr>),
507}
508
509impl<'de> Deserialize<'de> for SubjectQuery {
510 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
511 where
512 D: serde::Deserializer<'de>,
513 {
514 let raw = String::deserialize(deserializer)?;
515 if let Ok(did) = Did::<DefaultStr>::new_owned(&raw) {
516 return Ok(Self::Did(did));
517 }
518 AtUri::<DefaultStr>::new_owned(&raw)
519 .map(Self::Uri)
520 .map_err(serde::de::Error::custom)
521 }
522}
523
524#[derive(Clone, Debug, Eq, PartialEq)]
525pub struct ExpectedNsid {
526 canon: Nsid<DefaultStr>,
527 aliases: &'static [&'static str],
528}
529
530const FEED_COMMENT_LEGACY_ALIASES: &[&str] = &[
531 "sh.tangled.repo.issue.comment",
532 "sh.tangled.repo.pull.comment",
533];
534
535fn aliases_for(nsid: &str) -> &'static [&'static str] {
536 match nsid {
537 "sh.tangled.feed.comment" => FEED_COMMENT_LEGACY_ALIASES,
538 _ => &[],
539 }
540}
541
542impl ExpectedNsid {
543 pub fn new(nsid: Nsid<DefaultStr>) -> Self {
544 let aliases = aliases_for(nsid.as_ref());
545 Self {
546 canon: nsid,
547 aliases,
548 }
549 }
550
551 pub fn from_static(s: &'static str) -> Self {
552 let canon = nsid_static(s);
553 let aliases = aliases_for(s);
554 Self { canon, aliases }
555 }
556
557 pub fn as_nsid(&self) -> &Nsid<DefaultStr> {
558 &self.canon
559 }
560
561 pub fn as_str(&self) -> &str {
562 self.canon.as_ref()
563 }
564
565 fn accepts(&self, other: &str) -> bool {
566 other == self.canon.as_ref() || self.aliases.contains(&other)
567 }
568}
569
570#[derive(Debug, Deserialize)]
571struct GetRepoQuery {
572 repo: AtUri<DefaultStr>,
573}
574
575#[derive(Debug, Deserialize)]
576struct GetRepoByRepoDidQuery {
577 #[serde(rename = "repoDid")]
578 repo_did: Did<DefaultStr>,
579}
580
581#[derive(Debug, Deserialize)]
582struct GetProfileQuery {
583 actor: AtUri<DefaultStr>,
584}
585
586#[derive(Debug, Deserialize)]
587struct GetIssueQuery {
588 issue: AtUri<DefaultStr>,
589}
590
591#[derive(Debug, Deserialize)]
592struct GetPullQuery {
593 pull: AtUri<DefaultStr>,
594}
595
596#[derive(Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
597#[serde(rename_all = "lowercase")]
598enum Order {
599 Asc,
600 #[default]
601 Desc,
602}
603
604impl From<Order> for SortDir {
605 fn from(o: Order) -> Self {
606 match o {
607 Order::Asc => SortDir::Asc,
608 Order::Desc => SortDir::Desc,
609 }
610 }
611}
612
613#[derive(Debug, Deserialize)]
614struct TypedListQuery<F> {
615 subject: SubjectQuery,
616 cursor: Option<String>,
617 limit: Option<u32>,
618 #[serde(default)]
619 order: Order,
620 #[serde(flatten)]
621 filter: F,
622}
623
624impl<F> TypedListQuery<F> {
625 fn dir(&self) -> SortDir {
626 self.order.into()
627 }
628}
629
630#[derive(Debug, Deserialize)]
631struct CountQuery {
632 subject: SubjectQuery,
633}
634
635#[derive(Debug, Deserialize)]
636struct SearchQueryParams {
637 q: String,
638 nsid: Option<Nsid<DefaultStr>>,
639 author: Option<Did<DefaultStr>>,
640 repo: Option<Did<DefaultStr>>,
641 since: Option<String>,
642 until: Option<String>,
643 cursor: Option<String>,
644 limit: Option<u32>,
645}
646
647pub struct XrpcQuery<T>(pub T);
648
649impl<S, T> FromRequestParts<S> for XrpcQuery<T>
650where
651 S: Send + Sync,
652 T: serde::de::DeserializeOwned + Send + 'static,
653{
654 type Rejection = XrpcError;
655
656 async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
657 Query::<T>::from_request_parts(parts, state)
658 .await
659 .map(|Query(t)| Self(t))
660 .map_err(|rej: QueryRejection| XrpcError::InvalidParams(rej.body_text()))
661 }
662}
663
664#[derive(Debug, Error)]
665pub enum XrpcError {
666 #[error("invalid request: {0}")]
667 InvalidParams(String),
668 #[error("record not found")]
669 NotFound,
670 #[error("upstream unavailable: {0}")]
671 UpstreamUnavailable(String),
672 #[error("upstream gone: {0}")]
673 UpstreamGone(String),
674 #[error("invalid record: {0}")]
675 InvalidRecord(String),
676 #[error("internal: {0}")]
677 Internal(String),
678 #[error("overloaded, shedding under memory pressure")]
679 Overloaded,
680}
681
682impl XrpcError {
683 pub fn overloaded() -> Self {
684 Self::Overloaded
685 }
686}
687
688#[derive(Serialize)]
689struct ErrorBody {
690 error: &'static str,
691 message: String,
692}
693
694impl IntoResponse for XrpcError {
695 fn into_response(self) -> Response {
696 let (status, error) = match &self {
697 Self::InvalidParams(_) => (StatusCode::BAD_REQUEST, "InvalidRequest"),
698 Self::NotFound => (StatusCode::NOT_FOUND, "RecordNotFound"),
699 Self::UpstreamUnavailable(_) => (StatusCode::BAD_GATEWAY, "UpstreamFailed"),
700 Self::UpstreamGone(_) => (StatusCode::BAD_GATEWAY, "UpstreamGone"),
701 Self::InvalidRecord(_) => (StatusCode::BAD_GATEWAY, "InvalidRecord"),
702 Self::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "InternalError"),
703 Self::Overloaded => (StatusCode::SERVICE_UNAVAILABLE, "Overloaded"),
704 };
705 let body = ErrorBody {
706 error,
707 message: self.to_string(),
708 };
709 (status, Json(body)).into_response()
710 }
711}
712
713#[derive(Serialize)]
714#[serde(rename_all = "camelCase")]
715struct CoverageEnvelope {
716 ready: bool,
717 events_processed: u64,
718 last_cursor: u64,
719}
720
721impl From<Coverage> for CoverageEnvelope {
722 fn from(c: Coverage) -> Self {
723 Self {
724 ready: c.is_ready(),
725 events_processed: c.events_processed(),
726 last_cursor: c.last_cursor().raw(),
727 }
728 }
729}
730
731#[derive(Serialize)]
732#[serde(rename_all = "camelCase")]
733struct RecordView<V> {
734 uri: AtUri<DefaultStr>,
735 cid: Option<Cid<DefaultStr>>,
736 value: V,
737}
738
739#[derive(Serialize)]
740#[serde(rename_all = "camelCase")]
741struct StatefulItem<V> {
742 #[serde(flatten)]
743 view: RecordView<V>,
744 state: &'static str,
745 #[serde(skip_serializing_if = "Option::is_none")]
746 state_updated_at: Option<String>,
747 comment_count: u64,
748}
749
750fn format_micros(micros: u64) -> String {
751 let signed = i64::try_from(micros).ok();
752 let rfc = signed
753 .and_then(chrono::DateTime::<chrono::Utc>::from_timestamp_micros)
754 .map(|dt| dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true));
755 rfc.unwrap_or_else(|| micros.to_string())
756}
757
758pub(crate) fn source_authority_did(source: &AtUri<DefaultStr>) -> Option<Did<DefaultStr>> {
759 match source.authority() {
760 AtIdentifier::Did(d) => Some(d.clone().into_static()),
761 AtIdentifier::Handle(_) => None,
762 }
763}
764
765fn enrich_issue_view(
766 state: &AppState,
767 view: RecordView<Issue<DefaultStr>>,
768) -> StatefulItem<Issue<DefaultStr>> {
769 let issue_author = source_authority_did(&view.uri);
770 let repo_did = view.value.repo.clone();
771 enrich_view(
772 &state.edges,
773 nsid_static("sh.tangled.feed.comment"),
774 &state.issue_states,
775 view,
776 move |src| accept_state_source(src, issue_author.as_ref(), &repo_did),
777 )
778}
779
780fn enrich_pull_view(
781 state: &AppState,
782 view: RecordView<Pull<DefaultStr>>,
783) -> StatefulItem<Pull<DefaultStr>> {
784 let pull_author = source_authority_did(&view.uri);
785 let target_repo = view.value.target.repo.clone();
786 enrich_view(
787 &state.edges,
788 nsid_static("sh.tangled.feed.comment"),
789 &state.pull_statuses,
790 view,
791 move |src| accept_state_source(src, pull_author.as_ref(), &target_repo),
792 )
793}
794
795pub(crate) fn accept_state_source(
796 source: &AtUri<DefaultStr>,
797 entity_author: Option<&Did<DefaultStr>>,
798 repo_owner: &Did<DefaultStr>,
799) -> bool {
800 let Some(src) = source_authority_did(source) else {
801 return false;
802 };
803 Some(&src) == entity_author || &src == repo_owner
804}
805
806fn enrich_view<V, K, F>(
807 edges: &EdgeStore,
808 comment_nsid: Nsid<DefaultStr>,
809 states: &StateIndex<K>,
810 view: RecordView<V>,
811 accept: F,
812) -> StatefulItem<V>
813where
814 K: StateKind + Default,
815 F: Fn(&AtUri<DefaultStr>) -> bool,
816{
817 let comment_count = edges.count(&EdgeKey::new(
818 comment_nsid,
819 SubjectRef::Uri(view.uri.clone()),
820 ));
821 let (state, state_updated_at) = states
822 .latest_by(&view.uri, accept)
823 .map_or((K::default().wire(), None), |(kind, micros)| {
824 (kind.wire(), Some(format_micros(micros)))
825 });
826 StatefulItem {
827 view,
828 state,
829 state_updated_at,
830 comment_count,
831 }
832}
833
834#[derive(Serialize)]
835#[serde(rename_all = "camelCase")]
836struct CountResponse {
837 count: u64,
838 distinct_authors: u64,
839}
840
841#[derive(Serialize)]
842#[serde(rename_all = "camelCase")]
843struct SearchHitView {
844 uri: AtUri<DefaultStr>,
845 cid: Option<Cid<DefaultStr>>,
846 nsid: Nsid<DefaultStr>,
847 score: f32,
848 value: SearchableRecord,
849}
850
851fn map_slingshot(err: SlingshotError) -> XrpcError {
852 use SlingshotError as E;
853 match err {
854 E::NotFound => XrpcError::NotFound,
855 e @ (E::Decode(_)
856 | E::MissingField(_)
857 | E::InvalidAtUri(_)
858 | E::InvalidCid(_)
859 | E::UriMismatch { .. }) => XrpcError::InvalidRecord(e.to_string()),
860 e @ (E::Network(_)
861 | E::Build(_)
862 | E::Upstream(_)
863 | E::BodyTooLarge { .. }
864 | E::BadScheme(_)) => XrpcError::UpstreamUnavailable(e.to_string()),
865 }
866}
867
868fn parse_uri(raw: &str) -> Result<AtUri<DefaultStr>, XrpcError> {
869 AtUri::<DefaultStr>::new_owned(raw).map_err(|e| XrpcError::InvalidParams(format!("uri: {e}")))
870}
871
872#[derive(Clone, Copy, Debug, Eq, PartialEq)]
873pub enum SubjectShape {
874 BareDid,
875 Collection(&'static str),
876 BareDidOrOneOfCollections(&'static [&'static str]),
877 OneOfCollections(&'static [&'static str]),
878 AnyAtUri,
879}
880
881pub trait HasSubject {
882 const SHAPE: SubjectShape;
883}
884
885pub trait MirrorOf {
886 type Record: XrpcResp;
887 const EDGE_KIND: &'static str;
888 const SHAPE: SubjectShape;
889}
890
891pub struct StarBy;
892pub struct ReactionBy;
893pub struct FollowBy;
894pub struct VouchBy;
895pub struct RefUpdateBy;
896pub struct KnotMemberBy;
897pub struct LabelOpBy;
898pub struct PipelineBy;
899pub struct PipelineStatusBy;
900pub struct ArtifactBy;
901pub struct CollaboratorBy;
902pub struct FeedCommentBy;
903pub struct IssueBy;
904pub struct IssueStateBy;
905pub struct PullBy;
906pub struct PullStatusBy;
907pub struct SpindleMemberBy;
908
909impl MirrorOf for StarBy {
910 type Record = StarRecord;
911 const EDGE_KIND: &'static str = "sh.tangled.feed.star.by";
912 const SHAPE: SubjectShape = SubjectShape::BareDid;
913}
914impl MirrorOf for ReactionBy {
915 type Record = ReactionRecord;
916 const EDGE_KIND: &'static str = "sh.tangled.feed.reaction.by";
917 const SHAPE: SubjectShape = SubjectShape::BareDid;
918}
919impl MirrorOf for FollowBy {
920 type Record = FollowRecord;
921 const EDGE_KIND: &'static str = "sh.tangled.graph.follow.by";
922 const SHAPE: SubjectShape = SubjectShape::BareDid;
923}
924impl MirrorOf for VouchBy {
925 type Record = VouchRecord;
926 const EDGE_KIND: &'static str = "sh.tangled.graph.vouch.by";
927 const SHAPE: SubjectShape = SubjectShape::BareDid;
928}
929impl MirrorOf for RefUpdateBy {
930 type Record = RefUpdateRecord;
931 const EDGE_KIND: &'static str = "sh.tangled.git.refUpdate.by";
932 const SHAPE: SubjectShape = SubjectShape::BareDid;
933}
934impl MirrorOf for KnotMemberBy {
935 type Record = KnotMemberRecord;
936 const EDGE_KIND: &'static str = "sh.tangled.knot.member.by";
937 const SHAPE: SubjectShape = SubjectShape::BareDid;
938}
939impl MirrorOf for LabelOpBy {
940 type Record = LabelOpRecord;
941 const EDGE_KIND: &'static str = "sh.tangled.label.op.by";
942 const SHAPE: SubjectShape = SubjectShape::BareDid;
943}
944impl MirrorOf for PipelineBy {
945 type Record = PipelineRecord;
946 const EDGE_KIND: &'static str = "sh.tangled.pipeline.by";
947 const SHAPE: SubjectShape = SubjectShape::BareDid;
948}
949impl MirrorOf for PipelineStatusBy {
950 type Record = PipelineStatusRecord;
951 const EDGE_KIND: &'static str = "sh.tangled.pipeline.status.by";
952 const SHAPE: SubjectShape = SubjectShape::BareDid;
953}
954impl MirrorOf for ArtifactBy {
955 type Record = ArtifactRecord;
956 const EDGE_KIND: &'static str = "sh.tangled.repo.artifact.by";
957 const SHAPE: SubjectShape = SubjectShape::BareDid;
958}
959impl MirrorOf for CollaboratorBy {
960 type Record = CollaboratorRecord;
961 const EDGE_KIND: &'static str = "sh.tangled.repo.collaborator.by";
962 const SHAPE: SubjectShape = SubjectShape::BareDid;
963}
964impl MirrorOf for FeedCommentBy {
965 type Record = FeedCommentRecord;
966 const EDGE_KIND: &'static str = "sh.tangled.feed.comment.by";
967 const SHAPE: SubjectShape = SubjectShape::BareDid;
968}
969impl MirrorOf for IssueBy {
970 type Record = IssueRecord;
971 const EDGE_KIND: &'static str = "sh.tangled.repo.issue.by";
972 const SHAPE: SubjectShape = SubjectShape::BareDid;
973}
974impl MirrorOf for IssueStateBy {
975 type Record = IssueStateRecord;
976 const EDGE_KIND: &'static str = "sh.tangled.repo.issue.state.by";
977 const SHAPE: SubjectShape = SubjectShape::BareDid;
978}
979impl MirrorOf for PullBy {
980 type Record = PullRecord;
981 const EDGE_KIND: &'static str = "sh.tangled.repo.pull.by";
982 const SHAPE: SubjectShape = SubjectShape::BareDid;
983}
984impl MirrorOf for PullStatusBy {
985 type Record = PullStatusRecord;
986 const EDGE_KIND: &'static str = "sh.tangled.repo.pull.status.by";
987 const SHAPE: SubjectShape = SubjectShape::BareDid;
988}
989impl MirrorOf for SpindleMemberBy {
990 type Record = SpindleMemberRecord;
991 const EDGE_KIND: &'static str = "sh.tangled.spindle.member.by";
992 const SHAPE: SubjectShape = SubjectShape::BareDid;
993}
994
995impl HasSubject for StarRecord {
996 const SHAPE: SubjectShape = SubjectShape::BareDidOrOneOfCollections(&["sh.tangled.string"]);
997}
998impl HasSubject for FollowRecord {
999 const SHAPE: SubjectShape = SubjectShape::BareDid;
1000}
1001impl HasSubject for IssueRecord {
1002 const SHAPE: SubjectShape = SubjectShape::BareDid;
1003}
1004impl HasSubject for PullRecord {
1005 const SHAPE: SubjectShape = SubjectShape::BareDid;
1006}
1007impl HasSubject for FeedCommentRecord {
1008 const SHAPE: SubjectShape = SubjectShape::OneOfCollections(&[
1009 "sh.tangled.repo.issue",
1010 "sh.tangled.repo.pull",
1011 "sh.tangled.string",
1012 ]);
1013}
1014impl HasSubject for LabelDefinitionRecord {
1015 const SHAPE: SubjectShape = SubjectShape::BareDid;
1016}
1017impl HasSubject for LabelOpRecord {
1018 const SHAPE: SubjectShape =
1019 SubjectShape::OneOfCollections(&["sh.tangled.repo.issue", "sh.tangled.repo.pull"]);
1020}
1021impl HasSubject for PipelineRecord {
1022 const SHAPE: SubjectShape = SubjectShape::BareDid;
1023}
1024impl HasSubject for PipelineStatusRecord {
1025 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.pipeline");
1026}
1027impl HasSubject for ArtifactRecord {
1028 const SHAPE: SubjectShape = SubjectShape::BareDid;
1029}
1030impl HasSubject for KnotMemberRecord {
1031 const SHAPE: SubjectShape = SubjectShape::BareDid;
1032}
1033impl HasSubject for SpindleMemberRecord {
1034 const SHAPE: SubjectShape = SubjectShape::BareDid;
1035}
1036impl HasSubject for TangledStringRecord {
1037 const SHAPE: SubjectShape = SubjectShape::BareDid;
1038}
1039impl HasSubject for ReactionRecord {
1040 const SHAPE: SubjectShape = SubjectShape::AnyAtUri;
1041}
1042impl HasSubject for RefUpdateRecord {
1043 const SHAPE: SubjectShape = SubjectShape::BareDid;
1044}
1045impl HasSubject for CollaboratorRecord {
1046 const SHAPE: SubjectShape = SubjectShape::BareDid;
1047}
1048impl HasSubject for IssueStateRecord {
1049 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.repo.issue");
1050}
1051impl HasSubject for PullStatusRecord {
1052 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.repo.pull");
1053}
1054impl HasSubject for KnotRecord {
1055 const SHAPE: SubjectShape = SubjectShape::BareDid;
1056}
1057impl HasSubject for SpindleRecord {
1058 const SHAPE: SubjectShape = SubjectShape::BareDid;
1059}
1060impl HasSubject for PublicKeyRecord {
1061 const SHAPE: SubjectShape = SubjectShape::BareDid;
1062}
1063impl HasSubject for RepoRecord {
1064 const SHAPE: SubjectShape = SubjectShape::BareDid;
1065}
1066impl HasSubject for VouchRecord {
1067 const SHAPE: SubjectShape = SubjectShape::BareDid;
1068}
1069
1070fn parse_subject(raw: &SubjectQuery, shape: SubjectShape) -> Result<SubjectRef, XrpcError> {
1071 let uri = match raw {
1072 SubjectQuery::Did(did) => {
1073 return match shape {
1074 SubjectShape::BareDid | SubjectShape::BareDidOrOneOfCollections(_) => {
1075 Ok(SubjectRef::Did(did.clone()))
1076 }
1077 SubjectShape::Collection(expected) => Err(XrpcError::InvalidParams(format!(
1078 "subject must be at://<did>/{expected}/<rkey>, got bare did"
1079 ))),
1080 SubjectShape::OneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!(
1081 "subject must be at://<did>/<nsid>/<rkey> with nsid in [{}], got bare did",
1082 allowed.join(", "),
1083 ))),
1084 SubjectShape::AnyAtUri => Err(XrpcError::InvalidParams(
1085 "subject must be at-uri form, got bare did".into(),
1086 )),
1087 };
1088 }
1089 SubjectQuery::Uri(uri) => uri,
1090 };
1091 if matches!(uri.authority(), AtIdentifier::Handle(_)) {
1092 return Err(XrpcError::InvalidParams(
1093 "subject authority must be a did, not a handle".into(),
1094 ));
1095 }
1096 let Some(collection) = uri.collection() else {
1097 return Err(XrpcError::InvalidParams(
1098 "subject must be a bare did or full at://<did>/<nsid>/<rkey>".into(),
1099 ));
1100 };
1101 let c = collection.as_ref();
1102 match shape {
1103 SubjectShape::BareDid => Err(XrpcError::InvalidParams(format!(
1104 "subject must be a bare did, got at-uri with collection {c}"
1105 ))),
1106 SubjectShape::Collection(expected) if c == expected => {
1107 require_rkey(uri, expected)?;
1108 Ok(SubjectRef::Uri(uri.clone()))
1109 }
1110 SubjectShape::Collection(expected) => Err(XrpcError::InvalidParams(format!(
1111 "subject must be at://<did>/{expected}/<rkey>, got collection {c}"
1112 ))),
1113 SubjectShape::OneOfCollections(allowed) if allowed.contains(&c) => {
1114 require_rkey(uri, c)?;
1115 Ok(SubjectRef::Uri(uri.clone()))
1116 }
1117 SubjectShape::OneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!(
1118 "subject must be at://<did>/<nsid>/<rkey> with nsid in [{}], got collection {c}",
1119 allowed.join(", "),
1120 ))),
1121 SubjectShape::BareDidOrOneOfCollections(allowed) if allowed.contains(&c) => {
1122 require_rkey(uri, c)?;
1123 Ok(SubjectRef::Uri(uri.clone()))
1124 }
1125 SubjectShape::BareDidOrOneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!(
1126 "subject must be a bare did or at://<did>/<nsid>/<rkey> with nsid in [{}], got collection {c}",
1127 allowed.join(", "),
1128 ))),
1129 SubjectShape::AnyAtUri => Ok(SubjectRef::Uri(uri.clone())),
1130 }
1131}
1132
1133fn require_rkey(uri: &AtUri<DefaultStr>, expected: &str) -> Result<(), XrpcError> {
1134 uri.rkey().map(|_| ()).ok_or_else(|| {
1135 XrpcError::InvalidParams(format!(
1136 "subject must be at://<did>/{expected}/<rkey>; missing rkey"
1137 ))
1138 })
1139}
1140
1141fn parse_cursor(raw: Option<&str>) -> Result<PageCursor, XrpcError> {
1142 PageCursor::from_token(raw)
1143 .map_err(|e: CursorParseError| XrpcError::InvalidParams(format!("cursor: {e}")))
1144}
1145
1146fn parse_limit(raw: Option<u32>) -> Result<PageLimit, XrpcError> {
1147 PageLimit::new(raw.unwrap_or(DEFAULT_LIMIT))
1148 .map_err(|e| XrpcError::InvalidParams(format!("limit: {e}")))
1149}
1150
1151pub(crate) fn at_uri_owned_by(uri: &AtUri<DefaultStr>, author: &Did<DefaultStr>) -> bool {
1152 match uri.authority() {
1153 AtIdentifier::Did(d) => d.as_ref() == author.as_ref(),
1154 AtIdentifier::Handle(_) => false,
1155 }
1156}
1157
1158async fn resolve_for_view(
1159 state: &AppState,
1160 expected_nsid: &Nsid<DefaultStr>,
1161 uri: AtUri<DefaultStr>,
1162) -> Result<Arc<RecordBody>, XrpcError> {
1163 let raw = uri.as_ref().to_owned();
1164 resolve(state, ExpectedNsid::new(expected_nsid.clone()), uri)
1165 .await
1166 .map(|(body, _did)| body)
1167 .map_err(|e| match e {
1168 XrpcError::NotFound => XrpcError::UpstreamGone(raw),
1169 other => other,
1170 })
1171}
1172
1173async fn resolve(
1174 state: &AppState,
1175 expected: ExpectedNsid,
1176 uri: AtUri<DefaultStr>,
1177) -> Result<(Arc<RecordBody>, Did<DefaultStr>), XrpcError> {
1178 let collection = uri
1179 .collection()
1180 .ok_or_else(|| XrpcError::InvalidParams("uri missing collection".into()))?;
1181 if !expected.accepts(collection.as_ref()) {
1182 return Err(XrpcError::InvalidParams(format!(
1183 "collection mismatch: expected {}, got {}",
1184 expected.as_str(),
1185 collection.as_ref()
1186 )));
1187 }
1188 let rkey = uri
1189 .rkey()
1190 .ok_or_else(|| XrpcError::InvalidParams("uri missing rkey".into()))?;
1191 let did_ref = match uri.authority() {
1192 AtIdentifier::Did(d) => d,
1193 AtIdentifier::Handle(_) => {
1194 return Err(XrpcError::InvalidParams(
1195 "uri authority must be a did, not a handle".into(),
1196 ));
1197 }
1198 };
1199 let did: Did<DefaultStr> = did_ref.clone().into_static();
1200
1201 if let Some(hit) = state.records.get(&uri) {
1202 return Ok((hit, did));
1203 }
1204 let body = state
1205 .slingshot
1206 .get_record(&did_ref, &collection, &rkey)
1207 .await
1208 .map_err(map_slingshot)?;
1209 verify_type_tag(&body, &expected)?;
1210 state.records.put(uri, body.clone());
1211 Ok((body, did))
1212}
1213
1214#[derive(Deserialize)]
1215struct TypeTag<'a> {
1216 #[serde(rename = "$type", borrow)]
1217 ty: &'a str,
1218}
1219
1220fn verify_type_tag(body: &RecordBody, expected: &ExpectedNsid) -> Result<(), XrpcError> {
1221 let bytes = body.value.as_ref();
1222 let ty: std::borrow::Cow<'_, str> = match serde_json::from_slice::<TypeTag>(bytes) {
1223 Ok(t) => std::borrow::Cow::Borrowed(t.ty),
1224 Err(_) => {
1225 let value: serde_json::Value = serde_json::from_slice(bytes)
1226 .map_err(|e| XrpcError::InvalidRecord(format!("$type peek: {e}")))?;
1227 value
1228 .as_object()
1229 .and_then(|m| m.get("$type"))
1230 .and_then(|v| v.as_str())
1231 .map(|s| std::borrow::Cow::Owned(s.to_owned()))
1232 .ok_or_else(|| XrpcError::InvalidRecord("$type peek: missing $type field".into()))?
1233 }
1234 };
1235 if !expected.accepts(ty.as_ref()) {
1236 return Err(XrpcError::InvalidRecord(format!(
1237 "$type mismatch: expected {}, got {}",
1238 expected.as_str(),
1239 ty
1240 )));
1241 }
1242 Ok(())
1243}
1244
1245fn wire_type_nsid(bytes: &[u8]) -> Option<Nsid<DefaultStr>> {
1246 let ty = serde_json::from_slice::<TypeTag>(bytes).ok()?.ty;
1247 Nsid::<DefaultStr>::new_owned(ty).ok()
1248}
1249
1250async fn deserialize_or_upgrade<V>(
1251 state: &AppState,
1252 nsid: &Nsid<DefaultStr>,
1253 bytes: &[u8],
1254) -> Result<V, XrpcError>
1255where
1256 V: serde::de::DeserializeOwned,
1257{
1258 match serde_json::from_slice::<V>(bytes) {
1259 Ok(v) => Ok(v),
1260 Err(canon_err) => {
1261 let normalized = normalize_record_fields(bytes);
1262 let working: &[u8] = normalized.as_deref().unwrap_or(bytes);
1263 if normalized.is_some()
1264 && let Ok(v) = serde_json::from_slice::<V>(working)
1265 {
1266 return Ok(v);
1267 }
1268 let scrubbed = scrub_record_bytes(nsid, working);
1269 let retry_bytes: &[u8] = scrubbed.as_deref().unwrap_or(working);
1270 if scrubbed.is_some()
1271 && let Ok(v) = serde_json::from_slice::<V>(retry_bytes)
1272 {
1273 return Ok(v);
1274 }
1275 let wire_nsid = wire_type_nsid(retry_bytes).unwrap_or_else(|| nsid.clone());
1276 match upgrade_wire_bytes(&wire_nsid, retry_bytes, &state.resolver).await {
1277 Ok(canon_bytes) => serde_json::from_slice(&canon_bytes)
1278 .map_err(|e| XrpcError::InvalidRecord(e.to_string())),
1279 Err(_) => Err(XrpcError::InvalidRecord(canon_err.to_string())),
1280 }
1281 }
1282 }
1283}
1284
1285async fn fetch_from_uri<R, V>(
1286 state: &AppState,
1287 uri: AtUri<DefaultStr>,
1288) -> Result<(Arc<RecordBody>, V), XrpcError>
1289where
1290 R: XrpcResp,
1291 V: serde::de::DeserializeOwned + NormalizeRepoRefs,
1292{
1293 let raw = uri.as_str().to_owned();
1294 let nsid = nsid_static(R::NSID);
1295 let (body, _did) = resolve(state, ExpectedNsid::new(nsid.clone()), uri).await?;
1296 let value: V = deserialize_or_upgrade(state, &nsid, &body.value).await?;
1297 let value = value
1298 .normalize(&state.resolver)
1299 .await
1300 .ok_or(XrpcError::UpstreamGone(raw))?;
1301 Ok((body, value))
1302}
1303
1304async fn fetch<R, V>(
1305 state: &AppState,
1306 uri: &AtUri<DefaultStr>,
1307) -> Result<(Arc<RecordBody>, V), XrpcError>
1308where
1309 R: XrpcResp,
1310 V: serde::de::DeserializeOwned + NormalizeRepoRefs,
1311{
1312 fetch_from_uri::<R, V>(state, uri.clone()).await
1313}
1314
1315async fn get_repo(
1316 State(state): State<AppState>,
1317 XrpcQuery(q): XrpcQuery<GetRepoQuery>,
1318) -> Result<Json<RepoGetRecordOutput<DefaultStr>>, XrpcError> {
1319 let (body, value) = fetch::<RepoRecord, Repo<DefaultStr>>(&state, &q.repo).await?;
1320 Ok(Json(RepoGetRecordOutput {
1321 cid: Some(body.cid.clone()),
1322 uri: body.uri.clone(),
1323 value,
1324 }))
1325}
1326
1327async fn get_repo_by_repo_did(
1328 State(state): State<AppState>,
1329 XrpcQuery(q): XrpcQuery<GetRepoByRepoDidQuery>,
1330) -> Result<Json<RepoGetRecordOutput<DefaultStr>>, XrpcError> {
1331 let ident = state
1332 .resolver
1333 .lookup_by_repo_did(&q.repo_did)
1334 .await
1335 .ok_or(XrpcError::NotFound)?;
1336 let uri = AtUri::<DefaultStr>::from_parts_owned(
1337 ident.owner.as_str(),
1338 RepoRecord::NSID,
1339 ident.rkey.as_str(),
1340 )
1341 .expect("Did and Rkey newtypes already validated, at-uri assembly cannot fail");
1342 let (body, value) = fetch_from_uri::<RepoRecord, Repo<DefaultStr>>(&state, uri).await?;
1343 Ok(Json(RepoGetRecordOutput {
1344 cid: Some(body.cid.clone()),
1345 uri: body.uri.clone(),
1346 value,
1347 }))
1348}
1349
1350async fn get_profile(
1351 State(state): State<AppState>,
1352 XrpcQuery(q): XrpcQuery<GetProfileQuery>,
1353) -> Result<Json<ProfileGetRecordOutput<DefaultStr>>, XrpcError> {
1354 let (body, value) = fetch::<ProfileRecord, Profile<DefaultStr>>(&state, &q.actor).await?;
1355 Ok(Json(ProfileGetRecordOutput {
1356 cid: Some(body.cid.clone()),
1357 uri: body.uri.clone(),
1358 value,
1359 }))
1360}
1361
1362async fn get_issue(
1363 State(state): State<AppState>,
1364 XrpcQuery(q): XrpcQuery<GetIssueQuery>,
1365) -> Result<Json<IssueGetRecordOutput<DefaultStr>>, XrpcError> {
1366 let (body, value) = fetch::<IssueRecord, Issue<DefaultStr>>(&state, &q.issue).await?;
1367 Ok(Json(IssueGetRecordOutput {
1368 cid: Some(body.cid.clone()),
1369 uri: body.uri.clone(),
1370 value,
1371 }))
1372}
1373
1374async fn get_pull(
1375 State(state): State<AppState>,
1376 XrpcQuery(q): XrpcQuery<GetPullQuery>,
1377) -> Result<Json<PullGetRecordOutput<DefaultStr>>, XrpcError> {
1378 let (body, value) = fetch::<PullRecord, Pull<DefaultStr>>(&state, &q.pull).await?;
1379 Ok(Json(PullGetRecordOutput {
1380 cid: Some(body.cid.clone()),
1381 uri: body.uri.clone(),
1382 value,
1383 }))
1384}
1385
1386async fn get_repos(
1387 State(state): State<AppState>,
1388 RawQuery(query): RawQuery,
1389) -> Result<Response, XrpcError> {
1390 let uris = collect_repeated(query.as_deref(), BULK_REPOS_KEY);
1391 bulk_fetch::<RepoRecord, Repo<DefaultStr>>(&state, uris).await
1392}
1393
1394async fn get_profiles(
1395 State(state): State<AppState>,
1396 RawQuery(query): RawQuery,
1397) -> Result<Response, XrpcError> {
1398 let uris = collect_repeated(query.as_deref(), BULK_PROFILES_KEY);
1399 bulk_fetch::<ProfileRecord, Profile<DefaultStr>>(&state, uris).await
1400}
1401
1402async fn get_issues(
1403 State(state): State<AppState>,
1404 RawQuery(query): RawQuery,
1405) -> Result<Response, XrpcError> {
1406 let uris = collect_repeated(query.as_deref(), BULK_ISSUES_KEY);
1407 bulk_fetch::<IssueRecord, Issue<DefaultStr>>(&state, uris).await
1408}
1409
1410async fn get_pulls(
1411 State(state): State<AppState>,
1412 RawQuery(query): RawQuery,
1413) -> Result<Response, XrpcError> {
1414 let uris = collect_repeated(query.as_deref(), BULK_PULLS_KEY);
1415 bulk_fetch::<PullRecord, Pull<DefaultStr>>(&state, uris).await
1416}
1417
1418const BULK_REPOS_KEY: &str = "repos";
1419const BULK_PROFILES_KEY: &str = "actors";
1420const BULK_ISSUES_KEY: &str = "issues";
1421const BULK_PULLS_KEY: &str = "pulls";
1422const BULK_LIMIT: usize = 50;
1423
1424fn collect_repeated(query: Option<&str>, key: &str) -> Vec<String> {
1425 let Some(q) = query else {
1426 return Vec::new();
1427 };
1428 form_urlencoded::parse(q.as_bytes())
1429 .filter_map(|(k, v)| (k == key).then(|| v.into_owned()))
1430 .collect()
1431}
1432
1433async fn hydrate_record_view<V>(
1434 state: &AppState,
1435 nsid: &Nsid<DefaultStr>,
1436 uri: AtUri<DefaultStr>,
1437) -> Result<Option<RecordView<V>>, XrpcError>
1438where
1439 V: serde::de::DeserializeOwned + NormalizeRepoRefs,
1440{
1441 let body = resolve_for_view(state, nsid, uri).await?;
1442 let value: V = deserialize_or_upgrade::<V>(state, nsid, &body.value).await?;
1443 let Some(value) = value.normalize(&state.resolver).await else {
1444 return Ok(None);
1445 };
1446 Ok(Some(RecordView {
1447 uri: body.uri.clone(),
1448 cid: Some(body.cid.clone()),
1449 value,
1450 }))
1451}
1452
1453#[derive(Clone, Copy)]
1454enum HitProvenance {
1455 ClientSupplied,
1456 Indexed,
1457}
1458
1459fn is_index_evictable(err: &XrpcError) -> bool {
1460 matches!(
1461 err,
1462 XrpcError::NotFound
1463 | XrpcError::UpstreamGone(_)
1464 | XrpcError::InvalidRecord(_)
1465 | XrpcError::InvalidParams(_)
1466 )
1467}
1468
1469fn drop_unhydratable<V>(
1470 provenance: HitProvenance,
1471 nsid: &Nsid<DefaultStr>,
1472 uri: &AtUri<DefaultStr>,
1473 result: Result<Option<V>, XrpcError>,
1474) -> Result<Option<V>, XrpcError> {
1475 match result {
1476 Ok(view) => Ok(view),
1477 Err(err @ (XrpcError::NotFound | XrpcError::UpstreamGone(_))) => {
1478 tracing::debug!(
1479 uri = %uri,
1480 nsid = %nsid.as_ref(),
1481 error = %err,
1482 "dropping gone hit during hydration",
1483 );
1484 Ok(None)
1485 }
1486 Err(err @ XrpcError::UpstreamUnavailable(_)) => {
1487 tracing::warn!(
1488 uri = %uri,
1489 nsid = %nsid.as_ref(),
1490 error = %err,
1491 "dropping hit, upstream unavailable during hydration",
1492 );
1493 Ok(None)
1494 }
1495 Err(err @ XrpcError::InvalidRecord(_)) => {
1496 tracing::warn!(
1497 uri = %uri,
1498 nsid = %nsid.as_ref(),
1499 error = %err,
1500 "dropping invalid hit during hydration",
1501 );
1502 Ok(None)
1503 }
1504 Err(err @ XrpcError::InvalidParams(_)) => match provenance {
1505 HitProvenance::ClientSupplied => Err(err),
1506 HitProvenance::Indexed => {
1507 tracing::warn!(
1508 uri = %uri,
1509 nsid = %nsid.as_ref(),
1510 error = %err,
1511 "dropping malformed indexed hit during hydration",
1512 );
1513 Ok(None)
1514 }
1515 },
1516 Err(err @ (XrpcError::Internal(_) | XrpcError::Overloaded)) => Err(err),
1517 }
1518}
1519
1520fn hydrate_stream<T, Fut, V>(
1521 items: impl IntoIterator<Item = T>,
1522 produce: impl FnMut(T) -> Fut,
1523) -> impl Stream<Item = Result<V, XrpcError>>
1524where
1525 Fut: Future<Output = Result<Option<V>, XrpcError>>,
1526{
1527 stream::iter(items)
1528 .map(produce)
1529 .buffered(FETCH_CONCURRENCY)
1530 .try_filter_map(|view| async move { Ok(view) })
1531}
1532
1533fn hydrate_record_stream<V>(
1534 state: &AppState,
1535 nsid: Nsid<DefaultStr>,
1536 uris: Vec<AtUri<DefaultStr>>,
1537 provenance: HitProvenance,
1538) -> impl Stream<Item = Result<RecordView<V>, XrpcError>> + Send + 'static
1539where
1540 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static,
1541{
1542 let owned = state.clone();
1543 hydrate_stream(uris, move |uri| {
1544 let owned = owned.clone();
1545 let nsid = nsid.clone();
1546 async move {
1547 let result = hydrate_record_view::<V>(&owned, &nsid, uri.clone()).await;
1548 if matches!(provenance, HitProvenance::Indexed)
1549 && let Err(err) = &result
1550 && is_index_evictable(err)
1551 {
1552 owned.edges.remove_source(&uri);
1553 }
1554 drop_unhydratable(provenance, &nsid, &uri, result)
1555 }
1556 })
1557}
1558
1559enum PagePhase {
1560 Head,
1561 Body { first: bool },
1562 Done,
1563}
1564
1565struct PageState<S> {
1566 items: std::pin::Pin<Box<S>>,
1567 phase: PagePhase,
1568 array_key: &'static str,
1569 tail: Vec<u8>,
1570 permit: Option<HeavyPermit>,
1571}
1572
1573fn paged_tail(cursor: Option<String>) -> Vec<u8> {
1574 let encoded = serde_json::to_string(&cursor).unwrap_or_else(|_| "null".to_owned());
1575 format!("],\"cursor\":{encoded}}}").into_bytes()
1576}
1577
1578fn unpaged_tail() -> Vec<u8> {
1579 b"]}".to_vec()
1580}
1581
1582fn json_stream<V, S>(
1583 array_key: &'static str,
1584 items: S,
1585 tail: Vec<u8>,
1586 permit: Option<HeavyPermit>,
1587) -> Response
1588where
1589 V: Serialize + Send + 'static,
1590 S: Stream<Item = Result<V, XrpcError>> + Send + 'static,
1591{
1592 let init = PageState {
1593 items: Box::pin(items),
1594 phase: PagePhase::Head,
1595 array_key,
1596 tail,
1597 permit,
1598 };
1599 let chunks = stream::unfold(init, |mut st| async move {
1600 match st.phase {
1601 PagePhase::Head => {
1602 let head = format!("{{\"{}\":[", st.array_key).into_bytes();
1603 st.phase = PagePhase::Body { first: true };
1604 Some((Ok::<Vec<u8>, Infallible>(head), st))
1605 }
1606 PagePhase::Body { first } => match st.items.next().await {
1607 Some(Ok(view)) => match serde_json::to_vec(&view) {
1608 Ok(encoded) => {
1609 let mut chunk = Vec::with_capacity(encoded.len() + 1);
1610 if !first {
1611 chunk.push(b',');
1612 }
1613 chunk.extend_from_slice(&encoded);
1614 st.phase = PagePhase::Body { first: false };
1615 Some((Ok(chunk), st))
1616 }
1617 Err(e) => {
1618 tracing::warn!(error = %e, "skipping hit, serialize failed mid-stream");
1619 Some((Ok(Vec::new()), st))
1620 }
1621 },
1622 Some(Err(e)) => {
1623 tracing::warn!(error = %e, "ending page early, hydration failed mid-stream");
1624 let tail = std::mem::take(&mut st.tail);
1625 st.phase = PagePhase::Done;
1626 Some((Ok(tail), st))
1627 }
1628 None => {
1629 let tail = std::mem::take(&mut st.tail);
1630 st.phase = PagePhase::Done;
1631 Some((Ok(tail), st))
1632 }
1633 },
1634 PagePhase::Done => {
1635 drop(st.permit.take());
1636 None
1637 }
1638 }
1639 });
1640 (
1641 [(CONTENT_TYPE, "application/json")],
1642 Body::from_stream(chunks),
1643 )
1644 .into_response()
1645}
1646
1647async fn bulk_fetch<R, V>(state: &AppState, uris: Vec<String>) -> Result<Response, XrpcError>
1648where
1649 R: XrpcResp,
1650 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static,
1651{
1652 if uris.is_empty() {
1653 return Err(XrpcError::InvalidParams("at least one uri required".into()));
1654 }
1655 if uris.len() > BULK_LIMIT {
1656 return Err(XrpcError::InvalidParams(format!(
1657 "at most {BULK_LIMIT} uris per request"
1658 )));
1659 }
1660 let parsed: Vec<AtUri<DefaultStr>> = uris
1661 .iter()
1662 .map(|s| parse_uri(s))
1663 .collect::<Result<_, _>>()?;
1664 let nsid = nsid_static(R::NSID);
1665 if let Some(bad) = parsed
1666 .iter()
1667 .find(|uri| uri.collection().is_none_or(|c| c.as_ref() != nsid.as_ref()))
1668 {
1669 return Err(XrpcError::InvalidParams(format!(
1670 "uri collection must be {}, got {}",
1671 nsid.as_ref(),
1672 bad.as_ref()
1673 )));
1674 }
1675 let permit = state.heavy_permit()?;
1676 let views = hydrate_record_stream::<V>(state, nsid, parsed, HitProvenance::ClientSupplied);
1677 Ok(json_stream::<RecordView<V>, _>(
1678 "items",
1679 views,
1680 unpaged_tail(),
1681 permit,
1682 ))
1683}
1684
1685fn record_edge_page<R, F>(
1686 state: &AppState,
1687 q: &TypedListQuery<F>,
1688) -> Result<(EdgePage, Nsid<DefaultStr>), XrpcError>
1689where
1690 R: XrpcResp + HasSubject,
1691 F: ListFilter,
1692{
1693 let subject = parse_subject(&q.subject, R::SHAPE)?;
1694 let cursor = parse_cursor(q.cursor.as_deref())?;
1695 let limit = parse_limit(q.limit)?;
1696 let dir = q.dir();
1697 let nsid = nsid_static(R::NSID);
1698 let page = if q.filter.is_identity() {
1699 let key = EdgeKey::new(nsid.clone(), subject);
1700 state.edges.list(&key, cursor, limit, dir)
1701 } else {
1702 let pred = q.filter.predicate(state, &subject);
1703 let key = EdgeKey::new(nsid.clone(), subject);
1704 state.edges.list_filtered(&key, cursor, limit, dir, pred)
1705 };
1706 Ok((page, nsid))
1707}
1708
1709async fn list_records<R, V, F>(
1710 state: &AppState,
1711 q: TypedListQuery<F>,
1712) -> Result<Response, XrpcError>
1713where
1714 R: XrpcResp + HasSubject,
1715 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static,
1716 F: ListFilter,
1717{
1718 let (page, nsid) = record_edge_page::<R, F>(state, &q)?;
1719 let permit = state.heavy_permit()?;
1720 let views = hydrate_record_stream::<V>(state, nsid, page.items, HitProvenance::Indexed);
1721 Ok(json_stream::<RecordView<V>, _>(
1722 "items",
1723 views,
1724 paged_tail(page.next.map(PageToken::encode_token)),
1725 permit,
1726 ))
1727}
1728
1729fn count_for<R: XrpcResp + HasSubject>(
1730 state: &AppState,
1731 q: CountQuery,
1732) -> Result<CountResponse, XrpcError> {
1733 let subject = parse_subject(&q.subject, R::SHAPE)?;
1734 let key = EdgeKey::new(nsid_static(R::NSID), subject);
1735 Ok(CountResponse {
1736 count: state.edges.count(&key),
1737 distinct_authors: state.edges.count_distinct_authors(&key),
1738 })
1739}
1740
1741fn mirror_edge_page<M, F>(
1742 state: &AppState,
1743 q: &TypedListQuery<F>,
1744) -> Result<(EdgePage, Nsid<DefaultStr>), XrpcError>
1745where
1746 M: MirrorOf,
1747 F: ListFilter,
1748{
1749 let subject = parse_subject(&q.subject, M::SHAPE)?;
1750 let cursor = parse_cursor(q.cursor.as_deref())?;
1751 let limit = parse_limit(q.limit)?;
1752 let dir = q.dir();
1753 let edge_nsid = nsid_static(M::EDGE_KIND);
1754 let page = if q.filter.is_identity() {
1755 let key = EdgeKey::new(edge_nsid, subject);
1756 state.edges.list(&key, cursor, limit, dir)
1757 } else {
1758 let pred = q.filter.predicate(state, &subject);
1759 let key = EdgeKey::new(edge_nsid, subject);
1760 state.edges.list_filtered(&key, cursor, limit, dir, pred)
1761 };
1762 let record_nsid = nsid_static(<M::Record as XrpcResp>::NSID);
1763 Ok((page, record_nsid))
1764}
1765
1766async fn list_mirror<M, V, F>(state: &AppState, q: TypedListQuery<F>) -> Result<Response, XrpcError>
1767where
1768 M: MirrorOf,
1769 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static,
1770 F: ListFilter,
1771{
1772 let (page, record_nsid) = mirror_edge_page::<M, F>(state, &q)?;
1773 let permit = state.heavy_permit()?;
1774 let views = hydrate_record_stream::<V>(state, record_nsid, page.items, HitProvenance::Indexed);
1775 Ok(json_stream::<RecordView<V>, _>(
1776 "items",
1777 views,
1778 paged_tail(page.next.map(PageToken::encode_token)),
1779 permit,
1780 ))
1781}
1782
1783fn count_mirror<M: MirrorOf>(state: &AppState, q: CountQuery) -> Result<CountResponse, XrpcError> {
1784 let subject = parse_subject(&q.subject, M::SHAPE)?;
1785 let key = EdgeKey::new(nsid_static(M::EDGE_KIND), subject);
1786 Ok(CountResponse {
1787 count: state.edges.count(&key),
1788 distinct_authors: state.edges.count_distinct_authors(&key),
1789 })
1790}
1791
1792async fn list_stars(
1793 State(state): State<AppState>,
1794 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1795) -> Result<Response, XrpcError> {
1796 list_records::<StarRecord, Star<DefaultStr>, _>(&state, q).await
1797}
1798
1799async fn count_stars(
1800 State(state): State<AppState>,
1801 XrpcQuery(q): XrpcQuery<CountQuery>,
1802) -> Result<Json<CountResponse>, XrpcError> {
1803 count_for::<StarRecord>(&state, q).map(Json)
1804}
1805
1806async fn list_follows(
1807 State(state): State<AppState>,
1808 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1809) -> Result<Response, XrpcError> {
1810 list_records::<FollowRecord, Follow<DefaultStr>, _>(&state, q).await
1811}
1812
1813async fn count_follows(
1814 State(state): State<AppState>,
1815 XrpcQuery(q): XrpcQuery<CountQuery>,
1816) -> Result<Json<CountResponse>, XrpcError> {
1817 count_for::<FollowRecord>(&state, q).map(Json)
1818}
1819
1820async fn list_issues(
1821 State(state): State<AppState>,
1822 XrpcQuery(q): XrpcQuery<TypedListQuery<IssueFilter>>,
1823) -> Result<Response, XrpcError> {
1824 let (page, nsid) = record_edge_page::<IssueRecord, _>(&state, &q)?;
1825 let permit = state.heavy_permit()?;
1826 let owned = state.clone();
1827 let items = hydrate_record_stream::<Issue<DefaultStr>>(
1828 &state,
1829 nsid,
1830 page.items,
1831 HitProvenance::Indexed,
1832 )
1833 .map(move |view| view.map(|v| enrich_issue_view(&owned, v)));
1834 Ok(json_stream::<StatefulItem<Issue<DefaultStr>>, _>(
1835 "items",
1836 items,
1837 paged_tail(page.next.map(PageToken::encode_token)),
1838 permit,
1839 ))
1840}
1841
1842async fn count_issues(
1843 State(state): State<AppState>,
1844 XrpcQuery(q): XrpcQuery<CountQuery>,
1845) -> Result<Json<CountResponse>, XrpcError> {
1846 count_for::<IssueRecord>(&state, q).map(Json)
1847}
1848
1849async fn list_pulls(
1850 State(state): State<AppState>,
1851 XrpcQuery(q): XrpcQuery<TypedListQuery<PullFilter>>,
1852) -> Result<Response, XrpcError> {
1853 let (page, nsid) = record_edge_page::<PullRecord, _>(&state, &q)?;
1854 let permit = state.heavy_permit()?;
1855 let owned = state.clone();
1856 let items =
1857 hydrate_record_stream::<Pull<DefaultStr>>(&state, nsid, page.items, HitProvenance::Indexed)
1858 .map(move |view| view.map(|v| enrich_pull_view(&owned, v)));
1859 Ok(json_stream::<StatefulItem<Pull<DefaultStr>>, _>(
1860 "items",
1861 items,
1862 paged_tail(page.next.map(PageToken::encode_token)),
1863 permit,
1864 ))
1865}
1866
1867async fn count_pulls(
1868 State(state): State<AppState>,
1869 XrpcQuery(q): XrpcQuery<CountQuery>,
1870) -> Result<Json<CountResponse>, XrpcError> {
1871 count_for::<PullRecord>(&state, q).map(Json)
1872}
1873
1874async fn list_feed_comments(
1875 State(state): State<AppState>,
1876 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1877) -> Result<Response, XrpcError> {
1878 list_records::<FeedCommentRecord, FeedComment<DefaultStr>, _>(&state, q).await
1879}
1880
1881async fn count_feed_comments(
1882 State(state): State<AppState>,
1883 XrpcQuery(q): XrpcQuery<CountQuery>,
1884) -> Result<Json<CountResponse>, XrpcError> {
1885 count_for::<FeedCommentRecord>(&state, q).map(Json)
1886}
1887
1888async fn list_reactions(
1889 State(state): State<AppState>,
1890 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1891) -> Result<Response, XrpcError> {
1892 list_records::<ReactionRecord, Reaction<DefaultStr>, _>(&state, q).await
1893}
1894
1895async fn count_reactions(
1896 State(state): State<AppState>,
1897 XrpcQuery(q): XrpcQuery<CountQuery>,
1898) -> Result<Json<CountResponse>, XrpcError> {
1899 count_for::<ReactionRecord>(&state, q).map(Json)
1900}
1901
1902async fn list_ref_updates(
1903 State(state): State<AppState>,
1904 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1905) -> Result<Response, XrpcError> {
1906 list_records::<RefUpdateRecord, RefUpdate<DefaultStr>, _>(&state, q).await
1907}
1908
1909async fn count_ref_updates(
1910 State(state): State<AppState>,
1911 XrpcQuery(q): XrpcQuery<CountQuery>,
1912) -> Result<Json<CountResponse>, XrpcError> {
1913 count_for::<RefUpdateRecord>(&state, q).map(Json)
1914}
1915
1916async fn list_collaborators(
1917 State(state): State<AppState>,
1918 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1919) -> Result<Response, XrpcError> {
1920 list_records::<CollaboratorRecord, Collaborator<DefaultStr>, _>(&state, q).await
1921}
1922
1923async fn count_collaborators(
1924 State(state): State<AppState>,
1925 XrpcQuery(q): XrpcQuery<CountQuery>,
1926) -> Result<Json<CountResponse>, XrpcError> {
1927 count_for::<CollaboratorRecord>(&state, q).map(Json)
1928}
1929
1930async fn list_issue_states(
1931 State(state): State<AppState>,
1932 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1933) -> Result<Response, XrpcError> {
1934 list_records::<IssueStateRecord, IssueState<DefaultStr>, _>(&state, q).await
1935}
1936
1937async fn count_issue_states(
1938 State(state): State<AppState>,
1939 XrpcQuery(q): XrpcQuery<CountQuery>,
1940) -> Result<Json<CountResponse>, XrpcError> {
1941 count_for::<IssueStateRecord>(&state, q).map(Json)
1942}
1943
1944async fn list_pull_statuses(
1945 State(state): State<AppState>,
1946 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1947) -> Result<Response, XrpcError> {
1948 list_records::<PullStatusRecord, PullStatus<DefaultStr>, _>(&state, q).await
1949}
1950
1951async fn count_pull_statuses(
1952 State(state): State<AppState>,
1953 XrpcQuery(q): XrpcQuery<CountQuery>,
1954) -> Result<Json<CountResponse>, XrpcError> {
1955 count_for::<PullStatusRecord>(&state, q).map(Json)
1956}
1957
1958async fn list_repos(
1959 State(state): State<AppState>,
1960 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1961) -> Result<Response, XrpcError> {
1962 list_records::<RepoRecord, Repo<DefaultStr>, _>(&state, q).await
1963}
1964
1965async fn count_repos(
1966 State(state): State<AppState>,
1967 XrpcQuery(q): XrpcQuery<CountQuery>,
1968) -> Result<Json<CountResponse>, XrpcError> {
1969 count_for::<RepoRecord>(&state, q).map(Json)
1970}
1971
1972async fn list_knots(
1973 State(state): State<AppState>,
1974 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1975) -> Result<Response, XrpcError> {
1976 list_records::<KnotRecord, Knot<DefaultStr>, _>(&state, q).await
1977}
1978
1979async fn count_knots(
1980 State(state): State<AppState>,
1981 XrpcQuery(q): XrpcQuery<CountQuery>,
1982) -> Result<Json<CountResponse>, XrpcError> {
1983 count_for::<KnotRecord>(&state, q).map(Json)
1984}
1985
1986async fn list_spindles(
1987 State(state): State<AppState>,
1988 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1989) -> Result<Response, XrpcError> {
1990 list_records::<SpindleRecord, Spindle<DefaultStr>, _>(&state, q).await
1991}
1992
1993async fn count_spindles(
1994 State(state): State<AppState>,
1995 XrpcQuery(q): XrpcQuery<CountQuery>,
1996) -> Result<Json<CountResponse>, XrpcError> {
1997 count_for::<SpindleRecord>(&state, q).map(Json)
1998}
1999
2000async fn list_public_keys(
2001 State(state): State<AppState>,
2002 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2003) -> Result<Response, XrpcError> {
2004 list_records::<PublicKeyRecord, PublicKey<DefaultStr>, _>(&state, q).await
2005}
2006
2007async fn count_public_keys(
2008 State(state): State<AppState>,
2009 XrpcQuery(q): XrpcQuery<CountQuery>,
2010) -> Result<Json<CountResponse>, XrpcError> {
2011 count_for::<PublicKeyRecord>(&state, q).map(Json)
2012}
2013
2014async fn list_vouches(
2015 State(state): State<AppState>,
2016 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2017) -> Result<Response, XrpcError> {
2018 list_records::<VouchRecord, Vouch<DefaultStr>, _>(&state, q).await
2019}
2020
2021async fn count_vouches(
2022 State(state): State<AppState>,
2023 XrpcQuery(q): XrpcQuery<CountQuery>,
2024) -> Result<Json<CountResponse>, XrpcError> {
2025 count_for::<VouchRecord>(&state, q).map(Json)
2026}
2027
2028async fn list_stars_by(
2029 State(state): State<AppState>,
2030 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2031) -> Result<Response, XrpcError> {
2032 list_mirror::<StarBy, Star<DefaultStr>, _>(&state, q).await
2033}
2034async fn count_stars_by(
2035 State(state): State<AppState>,
2036 XrpcQuery(q): XrpcQuery<CountQuery>,
2037) -> Result<Json<CountResponse>, XrpcError> {
2038 count_mirror::<StarBy>(&state, q).map(Json)
2039}
2040
2041async fn list_reactions_by(
2042 State(state): State<AppState>,
2043 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2044) -> Result<Response, XrpcError> {
2045 list_mirror::<ReactionBy, Reaction<DefaultStr>, _>(&state, q).await
2046}
2047async fn count_reactions_by(
2048 State(state): State<AppState>,
2049 XrpcQuery(q): XrpcQuery<CountQuery>,
2050) -> Result<Json<CountResponse>, XrpcError> {
2051 count_mirror::<ReactionBy>(&state, q).map(Json)
2052}
2053
2054async fn list_follows_by(
2055 State(state): State<AppState>,
2056 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2057) -> Result<Response, XrpcError> {
2058 list_mirror::<FollowBy, Follow<DefaultStr>, _>(&state, q).await
2059}
2060async fn count_follows_by(
2061 State(state): State<AppState>,
2062 XrpcQuery(q): XrpcQuery<CountQuery>,
2063) -> Result<Json<CountResponse>, XrpcError> {
2064 count_mirror::<FollowBy>(&state, q).map(Json)
2065}
2066
2067async fn list_vouches_by(
2068 State(state): State<AppState>,
2069 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2070) -> Result<Response, XrpcError> {
2071 list_mirror::<VouchBy, Vouch<DefaultStr>, _>(&state, q).await
2072}
2073async fn count_vouches_by(
2074 State(state): State<AppState>,
2075 XrpcQuery(q): XrpcQuery<CountQuery>,
2076) -> Result<Json<CountResponse>, XrpcError> {
2077 count_mirror::<VouchBy>(&state, q).map(Json)
2078}
2079
2080async fn list_ref_updates_by(
2081 State(state): State<AppState>,
2082 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2083) -> Result<Response, XrpcError> {
2084 list_mirror::<RefUpdateBy, RefUpdate<DefaultStr>, _>(&state, q).await
2085}
2086async fn count_ref_updates_by(
2087 State(state): State<AppState>,
2088 XrpcQuery(q): XrpcQuery<CountQuery>,
2089) -> Result<Json<CountResponse>, XrpcError> {
2090 count_mirror::<RefUpdateBy>(&state, q).map(Json)
2091}
2092
2093async fn list_knot_members_by(
2094 State(state): State<AppState>,
2095 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2096) -> Result<Response, XrpcError> {
2097 list_mirror::<KnotMemberBy, KnotMember<DefaultStr>, _>(&state, q).await
2098}
2099async fn count_knot_members_by(
2100 State(state): State<AppState>,
2101 XrpcQuery(q): XrpcQuery<CountQuery>,
2102) -> Result<Json<CountResponse>, XrpcError> {
2103 count_mirror::<KnotMemberBy>(&state, q).map(Json)
2104}
2105
2106async fn list_label_ops_by(
2107 State(state): State<AppState>,
2108 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2109) -> Result<Response, XrpcError> {
2110 list_mirror::<LabelOpBy, LabelOp<DefaultStr>, _>(&state, q).await
2111}
2112async fn count_label_ops_by(
2113 State(state): State<AppState>,
2114 XrpcQuery(q): XrpcQuery<CountQuery>,
2115) -> Result<Json<CountResponse>, XrpcError> {
2116 count_mirror::<LabelOpBy>(&state, q).map(Json)
2117}
2118
2119async fn list_pipelines_by(
2120 State(state): State<AppState>,
2121 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2122) -> Result<Response, XrpcError> {
2123 list_mirror::<PipelineBy, Pipeline<DefaultStr>, _>(&state, q).await
2124}
2125async fn count_pipelines_by(
2126 State(state): State<AppState>,
2127 XrpcQuery(q): XrpcQuery<CountQuery>,
2128) -> Result<Json<CountResponse>, XrpcError> {
2129 count_mirror::<PipelineBy>(&state, q).map(Json)
2130}
2131
2132async fn list_pipeline_statuses_by(
2133 State(state): State<AppState>,
2134 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2135) -> Result<Response, XrpcError> {
2136 list_mirror::<PipelineStatusBy, PipelineStatus<DefaultStr>, _>(&state, q).await
2137}
2138async fn count_pipeline_statuses_by(
2139 State(state): State<AppState>,
2140 XrpcQuery(q): XrpcQuery<CountQuery>,
2141) -> Result<Json<CountResponse>, XrpcError> {
2142 count_mirror::<PipelineStatusBy>(&state, q).map(Json)
2143}
2144
2145async fn list_artifacts_by(
2146 State(state): State<AppState>,
2147 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2148) -> Result<Response, XrpcError> {
2149 list_mirror::<ArtifactBy, Artifact<DefaultStr>, _>(&state, q).await
2150}
2151async fn count_artifacts_by(
2152 State(state): State<AppState>,
2153 XrpcQuery(q): XrpcQuery<CountQuery>,
2154) -> Result<Json<CountResponse>, XrpcError> {
2155 count_mirror::<ArtifactBy>(&state, q).map(Json)
2156}
2157
2158async fn list_collaborators_by(
2159 State(state): State<AppState>,
2160 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2161) -> Result<Response, XrpcError> {
2162 list_mirror::<CollaboratorBy, Collaborator<DefaultStr>, _>(&state, q).await
2163}
2164async fn count_collaborators_by(
2165 State(state): State<AppState>,
2166 XrpcQuery(q): XrpcQuery<CountQuery>,
2167) -> Result<Json<CountResponse>, XrpcError> {
2168 count_mirror::<CollaboratorBy>(&state, q).map(Json)
2169}
2170
2171async fn list_issues_by(
2172 State(state): State<AppState>,
2173 XrpcQuery(q): XrpcQuery<TypedListQuery<IssueFilter>>,
2174) -> Result<Response, XrpcError> {
2175 let (page, nsid) = mirror_edge_page::<IssueBy, _>(&state, &q)?;
2176 let permit = state.heavy_permit()?;
2177 let owned = state.clone();
2178 let items = hydrate_record_stream::<Issue<DefaultStr>>(
2179 &state,
2180 nsid,
2181 page.items,
2182 HitProvenance::Indexed,
2183 )
2184 .map(move |view| view.map(|v| enrich_issue_view(&owned, v)));
2185 Ok(json_stream::<StatefulItem<Issue<DefaultStr>>, _>(
2186 "items",
2187 items,
2188 paged_tail(page.next.map(PageToken::encode_token)),
2189 permit,
2190 ))
2191}
2192async fn count_issues_by(
2193 State(state): State<AppState>,
2194 XrpcQuery(q): XrpcQuery<CountQuery>,
2195) -> Result<Json<CountResponse>, XrpcError> {
2196 count_mirror::<IssueBy>(&state, q).map(Json)
2197}
2198
2199async fn list_feed_comments_by(
2200 State(state): State<AppState>,
2201 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2202) -> Result<Response, XrpcError> {
2203 list_mirror::<FeedCommentBy, FeedComment<DefaultStr>, _>(&state, q).await
2204}
2205async fn count_feed_comments_by(
2206 State(state): State<AppState>,
2207 XrpcQuery(q): XrpcQuery<CountQuery>,
2208) -> Result<Json<CountResponse>, XrpcError> {
2209 count_mirror::<FeedCommentBy>(&state, q).map(Json)
2210}
2211
2212async fn list_issue_states_by(
2213 State(state): State<AppState>,
2214 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2215) -> Result<Response, XrpcError> {
2216 list_mirror::<IssueStateBy, IssueState<DefaultStr>, _>(&state, q).await
2217}
2218async fn count_issue_states_by(
2219 State(state): State<AppState>,
2220 XrpcQuery(q): XrpcQuery<CountQuery>,
2221) -> Result<Json<CountResponse>, XrpcError> {
2222 count_mirror::<IssueStateBy>(&state, q).map(Json)
2223}
2224
2225async fn list_pulls_by(
2226 State(state): State<AppState>,
2227 XrpcQuery(q): XrpcQuery<TypedListQuery<PullFilter>>,
2228) -> Result<Response, XrpcError> {
2229 let (page, nsid) = mirror_edge_page::<PullBy, _>(&state, &q)?;
2230 let permit = state.heavy_permit()?;
2231 let owned = state.clone();
2232 let items =
2233 hydrate_record_stream::<Pull<DefaultStr>>(&state, nsid, page.items, HitProvenance::Indexed)
2234 .map(move |view| view.map(|v| enrich_pull_view(&owned, v)));
2235 Ok(json_stream::<StatefulItem<Pull<DefaultStr>>, _>(
2236 "items",
2237 items,
2238 paged_tail(page.next.map(PageToken::encode_token)),
2239 permit,
2240 ))
2241}
2242async fn count_pulls_by(
2243 State(state): State<AppState>,
2244 XrpcQuery(q): XrpcQuery<CountQuery>,
2245) -> Result<Json<CountResponse>, XrpcError> {
2246 count_mirror::<PullBy>(&state, q).map(Json)
2247}
2248
2249async fn list_pull_statuses_by(
2250 State(state): State<AppState>,
2251 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2252) -> Result<Response, XrpcError> {
2253 list_mirror::<PullStatusBy, PullStatus<DefaultStr>, _>(&state, q).await
2254}
2255async fn count_pull_statuses_by(
2256 State(state): State<AppState>,
2257 XrpcQuery(q): XrpcQuery<CountQuery>,
2258) -> Result<Json<CountResponse>, XrpcError> {
2259 count_mirror::<PullStatusBy>(&state, q).map(Json)
2260}
2261
2262async fn list_spindle_members_by(
2263 State(state): State<AppState>,
2264 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2265) -> Result<Response, XrpcError> {
2266 list_mirror::<SpindleMemberBy, SpindleMember<DefaultStr>, _>(&state, q).await
2267}
2268async fn count_spindle_members_by(
2269 State(state): State<AppState>,
2270 XrpcQuery(q): XrpcQuery<CountQuery>,
2271) -> Result<Json<CountResponse>, XrpcError> {
2272 count_mirror::<SpindleMemberBy>(&state, q).map(Json)
2273}
2274
2275async fn list_label_definitions(
2276 State(state): State<AppState>,
2277 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2278) -> Result<Response, XrpcError> {
2279 list_records::<LabelDefinitionRecord, LabelDefinition<DefaultStr>, _>(&state, q).await
2280}
2281
2282async fn count_label_definitions(
2283 State(state): State<AppState>,
2284 XrpcQuery(q): XrpcQuery<CountQuery>,
2285) -> Result<Json<CountResponse>, XrpcError> {
2286 count_for::<LabelDefinitionRecord>(&state, q).map(Json)
2287}
2288
2289async fn list_label_ops(
2290 State(state): State<AppState>,
2291 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2292) -> Result<Response, XrpcError> {
2293 list_records::<LabelOpRecord, LabelOp<DefaultStr>, _>(&state, q).await
2294}
2295
2296async fn count_label_ops(
2297 State(state): State<AppState>,
2298 XrpcQuery(q): XrpcQuery<CountQuery>,
2299) -> Result<Json<CountResponse>, XrpcError> {
2300 count_for::<LabelOpRecord>(&state, q).map(Json)
2301}
2302
2303async fn list_pipelines(
2304 State(state): State<AppState>,
2305 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2306) -> Result<Response, XrpcError> {
2307 list_records::<PipelineRecord, Pipeline<DefaultStr>, _>(&state, q).await
2308}
2309
2310async fn count_pipelines(
2311 State(state): State<AppState>,
2312 XrpcQuery(q): XrpcQuery<CountQuery>,
2313) -> Result<Json<CountResponse>, XrpcError> {
2314 count_for::<PipelineRecord>(&state, q).map(Json)
2315}
2316
2317async fn list_pipeline_statuses(
2318 State(state): State<AppState>,
2319 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2320) -> Result<Response, XrpcError> {
2321 list_records::<PipelineStatusRecord, PipelineStatus<DefaultStr>, _>(&state, q).await
2322}
2323
2324async fn count_pipeline_statuses(
2325 State(state): State<AppState>,
2326 XrpcQuery(q): XrpcQuery<CountQuery>,
2327) -> Result<Json<CountResponse>, XrpcError> {
2328 count_for::<PipelineStatusRecord>(&state, q).map(Json)
2329}
2330
2331async fn list_artifacts(
2332 State(state): State<AppState>,
2333 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2334) -> Result<Response, XrpcError> {
2335 list_records::<ArtifactRecord, Artifact<DefaultStr>, _>(&state, q).await
2336}
2337
2338async fn count_artifacts(
2339 State(state): State<AppState>,
2340 XrpcQuery(q): XrpcQuery<CountQuery>,
2341) -> Result<Json<CountResponse>, XrpcError> {
2342 count_for::<ArtifactRecord>(&state, q).map(Json)
2343}
2344
2345async fn list_knot_members(
2346 State(state): State<AppState>,
2347 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2348) -> Result<Response, XrpcError> {
2349 list_records::<KnotMemberRecord, KnotMember<DefaultStr>, _>(&state, q).await
2350}
2351
2352async fn count_knot_members(
2353 State(state): State<AppState>,
2354 XrpcQuery(q): XrpcQuery<CountQuery>,
2355) -> Result<Json<CountResponse>, XrpcError> {
2356 count_for::<KnotMemberRecord>(&state, q).map(Json)
2357}
2358
2359async fn list_spindle_members(
2360 State(state): State<AppState>,
2361 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2362) -> Result<Response, XrpcError> {
2363 list_records::<SpindleMemberRecord, SpindleMember<DefaultStr>, _>(&state, q).await
2364}
2365
2366async fn count_spindle_members(
2367 State(state): State<AppState>,
2368 XrpcQuery(q): XrpcQuery<CountQuery>,
2369) -> Result<Json<CountResponse>, XrpcError> {
2370 count_for::<SpindleMemberRecord>(&state, q).map(Json)
2371}
2372
2373async fn list_strings(
2374 State(state): State<AppState>,
2375 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2376) -> Result<Response, XrpcError> {
2377 list_records::<TangledStringRecord, TangledString<DefaultStr>, _>(&state, q).await
2378}
2379
2380async fn count_strings(
2381 State(state): State<AppState>,
2382 XrpcQuery(q): XrpcQuery<CountQuery>,
2383) -> Result<Json<CountResponse>, XrpcError> {
2384 count_for::<TangledStringRecord>(&state, q).map(Json)
2385}
2386
2387#[derive(Deserialize)]
2388struct ResolveMiniDocParams {
2389 identifier: AtIdentifier<DefaultStr>,
2390}
2391
2392async fn resolve_mini_doc(
2393 State(state): State<AppState>,
2394 XrpcQuery(q): XrpcQuery<ResolveMiniDocParams>,
2395) -> Result<Response, XrpcError> {
2396 let body = state
2397 .slingshot
2398 .resolve_mini_doc(&q.identifier)
2399 .await
2400 .map_err(map_slingshot)?;
2401 Ok((StatusCode::OK, [(CONTENT_TYPE, "application/json")], body).into_response())
2402}
2403
2404async fn get_coverage(State(state): State<AppState>) -> Json<CoverageEnvelope> {
2405 Json(state.coverage.snapshot().into())
2406}
2407
2408async fn search_query(
2409 State(state): State<AppState>,
2410 XrpcQuery(q): XrpcQuery<SearchQueryParams>,
2411) -> Result<Response, XrpcError> {
2412 if q.q.trim().is_empty() {
2413 return Err(XrpcError::InvalidParams("q must not be empty".into()));
2414 }
2415 let cursor = SearchCursor::from_token(q.cursor.as_deref())
2416 .map_err(|e| XrpcError::InvalidParams(format!("cursor: {e}")))?;
2417 let limit = parse_limit(q.limit)?;
2418 let filters = build_search_filters(&q)?;
2419 let permit = state.heavy_permit()?;
2420 let page = state
2421 .search
2422 .search(&q.q, filters, cursor, limit.get())
2423 .await
2424 .map_err(map_search_err)?;
2425 let next = page.next.map(SearchOffset::encode_token);
2426 let owned = state.clone();
2427 let hits = hydrate_stream(page.hits, move |hit| {
2428 let owned = owned.clone();
2429 let uri = hit.uri.clone();
2430 let nsid = hit.nsid.clone();
2431 async move {
2432 let result = hydrate_search_hit(&owned, hit).await;
2433 drop_unhydratable(HitProvenance::Indexed, &nsid, &uri, result)
2434 }
2435 });
2436 Ok(json_stream::<SearchHitView, _>(
2437 "hits",
2438 hits,
2439 paged_tail(next),
2440 permit,
2441 ))
2442}
2443
2444fn build_search_filters(q: &SearchQueryParams) -> Result<SearchFilters, XrpcError> {
2445 let since = q
2446 .since
2447 .as_deref()
2448 .map(parse_rfc3339_seconds)
2449 .transpose()
2450 .map_err(|e| XrpcError::InvalidParams(format!("since: {e}")))?;
2451 let until = q
2452 .until
2453 .as_deref()
2454 .map(parse_rfc3339_seconds)
2455 .transpose()
2456 .map_err(|e| XrpcError::InvalidParams(format!("until: {e}")))?;
2457 if let (Some(s), Some(u)) = (since, until)
2458 && s > u
2459 {
2460 return Err(XrpcError::InvalidParams("since must be <= until".into()));
2461 }
2462 Ok(SearchFilters {
2463 nsid: q.nsid.clone(),
2464 author: q.author.clone(),
2465 repo: q.repo.clone(),
2466 since,
2467 until,
2468 })
2469}
2470
2471fn parse_rfc3339_seconds(raw: &str) -> Result<i64, String> {
2472 chrono::DateTime::parse_from_rfc3339(raw)
2473 .map(|dt| dt.timestamp())
2474 .map_err(|e| format!("expected RFC3339, got {raw}: {e}"))
2475}
2476
2477async fn hydrate_search_hit(
2478 state: &AppState,
2479 hit: SearchHit,
2480) -> Result<Option<SearchHitView>, XrpcError> {
2481 let SearchHit { uri, nsid, score } = hit;
2482 let body = resolve_for_view(state, &nsid, uri).await?;
2483 let record = decode_canon_or_upgrade(&nsid, &body.value, &state.resolver)
2484 .await
2485 .map_err(|err| XrpcError::InvalidRecord(err.to_string()))?;
2486 let Some(value) = SearchableRecord::try_from_record(record) else {
2487 return Ok(None);
2488 };
2489 let Some(value) = value.normalize(&state.resolver).await else {
2490 return Ok(None);
2491 };
2492 Ok(Some(SearchHitView {
2493 uri: body.uri.clone(),
2494 cid: Some(body.cid.clone()),
2495 nsid,
2496 score,
2497 value,
2498 }))
2499}
2500
2501fn map_search_err(err: SearchError) -> XrpcError {
2502 use SearchError as E;
2503 match err {
2504 E::Query(e) => XrpcError::InvalidParams(format!("query: {e}")),
2505 e @ (E::Tantivy(_)
2506 | E::InvalidUri(_)
2507 | E::InvalidNsid(_)
2508 | E::MissingField(_)
2509 | E::Cancelled(_)) => XrpcError::Internal(format!("search: {e}")),
2510 }
2511}
2512
2513fn map_proxy_error(err: KnotProxyError) -> XrpcError {
2514 match err {
2515 KnotProxyError::CircuitOpen => {
2516 XrpcError::UpstreamUnavailable("knot circuit breaker open".into())
2517 }
2518 KnotProxyError::BlockedHost { host, reason } => {
2519 XrpcError::InvalidRecord(format!("knot host {host} is {reason} address space"))
2520 }
2521 KnotProxyError::PlaintextHttp { host } => {
2522 XrpcError::InvalidRecord(format!("knot host {host} requires https"))
2523 }
2524 KnotProxyError::Connect(e) => XrpcError::UpstreamUnavailable(format!("connect: {e}")),
2525 KnotProxyError::Timeout(e) => {
2526 XrpcError::UpstreamUnavailable(format!("upstream timeout: {e}"))
2527 }
2528 KnotProxyError::Redirect(e) => XrpcError::UpstreamUnavailable(format!("redirect: {e}")),
2529 KnotProxyError::Transport(e) => XrpcError::UpstreamUnavailable(format!("transport: {e}")),
2530 KnotProxyError::Upstream(s) => XrpcError::UpstreamUnavailable(format!("status {s}")),
2531 }
2532}
2533
2534fn validate_client_supplied_knot(state: &AppState, host: &KnotHost) -> Result<(), XrpcError> {
2535 let host_str = || host.url().host_str().unwrap_or_default().to_owned();
2536 if state.knots.requires_https() && host.url().scheme() != "https" {
2537 return Err(XrpcError::InvalidParams(format!(
2538 "knot host {} must be https",
2539 host_str(),
2540 )));
2541 }
2542 if state.knots.allows_private_hosts() {
2543 return Ok(());
2544 }
2545 match host.private_literal_reason() {
2546 None => Ok(()),
2547 Some(reason) => Err(XrpcError::InvalidParams(format!(
2548 "knot host {} blocked: {} address space",
2549 host_str(),
2550 reason,
2551 ))),
2552 }
2553}
2554
2555async fn resolve_knot_target(
2556 state: &AppState,
2557 repo_uri: AtUri<DefaultStr>,
2558) -> Result<(KnotHost, RepoSlug), XrpcError> {
2559 let rkey: Option<Rkey<DefaultStr>> = repo_uri.rkey().map(|r| r.clone().into_static());
2560 let (body, did) = resolve(state, ExpectedNsid::from_static(RepoRecord::NSID), repo_uri).await?;
2561 let value: Repo<DefaultStr> = serde_json::from_slice(&body.value)
2562 .map_err(|e| XrpcError::InvalidRecord(format!("decode repo record: {e}")))?;
2563 let host = KnotHost::parse(value.knot.as_ref())
2564 .map_err(|e| XrpcError::InvalidRecord(format!("knot field: {e}")))?;
2565 let name = pick_human_slug(rkey.as_ref(), value.name.as_deref()).ok_or_else(|| {
2566 XrpcError::InvalidRecord("at-uri missing rkey and record missing name".to_string())
2567 })?;
2568 let slug = RepoSlug::new(&did, &name)
2569 .map_err(|e| XrpcError::InvalidRecord(format!("repo slug: {e}")))?;
2570 Ok((host, slug))
2571}
2572
2573fn pick_human_slug(rkey: Option<&Rkey<DefaultStr>>, name: Option<&str>) -> Option<String> {
2574 match rkey {
2575 Some(r) if jacquard_common::types::tid::Tid::new(r.as_ref()).is_ok() => {
2576 Some(name.unwrap_or(r.as_ref()).to_owned())
2577 }
2578 Some(r) => Some(r.as_ref().to_owned()),
2579 None => name.map(str::to_owned),
2580 }
2581}
2582
2583fn filter_request_headers(client: &HeaderMap) -> HeaderMap {
2584 FORWARDED_REQUEST_HEADERS
2585 .iter()
2586 .fold(HeaderMap::new(), |mut acc, name| {
2587 if let Some(value) = client.get(*name) {
2588 acc.insert((*name).clone(), value.clone());
2589 }
2590 acc
2591 })
2592}
2593
2594fn upstream_to_axum(resp: ProxyResponse) -> Response {
2595 let status = resp.status();
2596 let upstream_headers = resp.headers().clone();
2597 let body = Body::from_stream(resp.into_body_stream());
2598 let mut response = Response::builder()
2599 .status(status)
2600 .body(body)
2601 .expect("response body construction must succeed");
2602 let response_headers = response.headers_mut();
2603 PASSTHROUGH_HEADERS.iter().for_each(|name| {
2604 if let Some(value) = upstream_headers.get(*name) {
2605 response_headers.insert((*name).clone(), value.clone());
2606 }
2607 });
2608 response
2609}
2610
2611async fn dispatch_proxy(
2612 state: AppState,
2613 headers: HeaderMap,
2614 nsid: Nsid<DefaultStr>,
2615 host: KnotHost,
2616 params: ProxyParams,
2617) -> Result<Response, XrpcError> {
2618 let forward: Vec<(&str, &str)> = params
2619 .iter()
2620 .map(|(k, v)| (k.as_str(), v.as_str()))
2621 .collect();
2622 let allowed = filter_request_headers(&headers);
2623 let upstream = state
2624 .knots
2625 .forward(&host, &nsid, &forward, allowed)
2626 .await
2627 .map_err(map_proxy_error)?;
2628 Ok(upstream_to_axum(upstream))
2629}
2630
2631fn extract_param(
2632 params: ProxyParams,
2633 key: &str,
2634) -> Result<Option<(String, ProxyParams)>, XrpcError> {
2635 let (matching, rest): (ProxyParams, ProxyParams) =
2636 params.into_iter().partition(|(k, _)| k == key);
2637 match matching.as_slice() {
2638 [] => Ok(None),
2639 [_] => Ok(matching.into_iter().next().map(|(_, v)| (v, rest))),
2640 _ => Err(XrpcError::InvalidParams(format!(
2641 "{key} parameter must appear at most once, got {}",
2642 matching.len(),
2643 ))),
2644 }
2645}
2646
2647async fn proxy_repo_handler(
2648 state: AppState,
2649 headers: HeaderMap,
2650 params: ProxyParams,
2651 nsid: Nsid<DefaultStr>,
2652) -> Result<Response, XrpcError> {
2653 let (repo_raw, rest) = extract_param(params, REPO_PARAM)?
2654 .ok_or_else(|| XrpcError::InvalidParams("missing repo".into()))?;
2655 let repo_uri = parse_uri(&repo_raw)?;
2656 let (host, slug) = resolve_knot_target(&state, repo_uri).await?;
2657 let forward = rest
2658 .into_iter()
2659 .chain(std::iter::once((
2660 REPO_PARAM.to_owned(),
2661 slug.as_str().to_owned(),
2662 )))
2663 .collect();
2664 dispatch_proxy(state, headers, nsid, host, forward).await
2665}
2666
2667async fn proxy_knot_handler(
2668 state: AppState,
2669 headers: HeaderMap,
2670 params: ProxyParams,
2671 nsid: Nsid<DefaultStr>,
2672) -> Result<Response, XrpcError> {
2673 let (knot_raw, forward) = extract_param(params, KNOT_HOST_PARAM)?
2674 .ok_or_else(|| XrpcError::InvalidParams("missing knot".into()))?;
2675 let host =
2676 KnotHost::parse(&knot_raw).map_err(|e| XrpcError::InvalidParams(format!("knot: {e}")))?;
2677 validate_client_supplied_knot(&state, &host)?;
2678 dispatch_proxy(state, headers, nsid, host, forward).await
2679}