Monorepo for Tangled
tangled.org
1use std::future::Future;
2use std::sync::Arc;
3
4use bobbin_resolver::{
5 NormalizeRepoRefs, decode_canon_or_upgrade, normalize_record_fields, scrub_record_bytes,
6 upgrade_wire_bytes,
7};
8
9use axum::{
10 Router,
11 body::Body,
12 extract::{FromRequestParts, Query, RawQuery, State, rejection::QueryRejection},
13 http::{
14 HeaderMap, HeaderName, StatusCode,
15 header::{
16 ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LENGTH,
17 CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_RANGE,
18 LAST_MODIFIED, RANGE,
19 },
20 request::Parts,
21 },
22 response::{IntoResponse, Json, Response},
23 routing::get,
24};
25use bobbin_edge_index::{
26 Coverage, CoverageWatch, CursorParseError, EdgeItem, EdgePage, EdgeStore, IssueStateKind,
27 PageCursor, PageLimit, PageToken, PullStatusKind, SortDir, StateIndex, StateKind,
28};
29use bobbin_knot_proxy::{KnotHost, KnotProxy, KnotProxyError, ProxyResponse, RepoSlug};
30use bobbin_record_lru::RecordStore;
31use bobbin_resolver::RepoIdResolver;
32use bobbin_search::{
33 SearchCursor, SearchError, SearchFilters, SearchHit, SearchOffset, SearchReader,
34};
35use bobbin_slingshot_client::{SlingshotClient, SlingshotError};
36use bobbin_types::ids::{EdgeKey, SubjectRef, nsid_static};
37use bobbin_types::knot_acl::{KnotOwnedSource, decode_knot_owned_source, knot_did_host};
38use bobbin_types::record::RecordBody;
39use bobbin_types::search::SearchableRecord;
40use bobbin_types::sh_tangled::actor::profile::{Profile, ProfileGetRecordOutput, ProfileRecord};
41use bobbin_types::sh_tangled::feed::comment::{
42 Comment as FeedComment, CommentRecord as FeedCommentRecord,
43};
44use bobbin_types::sh_tangled::feed::reaction::{Reaction, ReactionRecord};
45use bobbin_types::sh_tangled::feed::star::{Star, StarRecord};
46use bobbin_types::sh_tangled::git::ref_update::{RefUpdate, RefUpdateRecord};
47use bobbin_types::sh_tangled::graph::follow::{Follow, FollowRecord};
48use bobbin_types::sh_tangled::graph::vouch::{Vouch, VouchRecord};
49use bobbin_types::sh_tangled::knot::member::{
50 Member as KnotMember, MemberRecord as KnotMemberRecord,
51};
52use bobbin_types::sh_tangled::knot::{Knot, KnotRecord};
53use bobbin_types::sh_tangled::label::definition::{
54 Definition as LabelDefinition, DefinitionRecord as LabelDefinitionRecord,
55};
56use bobbin_types::sh_tangled::label::op::{Op as LabelOp, OpRecord as LabelOpRecord};
57use bobbin_types::sh_tangled::pipeline::status::{
58 Status as PipelineStatus, StatusRecord as PipelineStatusRecord,
59};
60use bobbin_types::sh_tangled::pipeline::{Pipeline, PipelineRecord};
61use bobbin_types::sh_tangled::public_key::{PublicKey, PublicKeyRecord};
62use bobbin_types::sh_tangled::repo::artifact::{Artifact, ArtifactRecord};
63use bobbin_types::sh_tangled::repo::collaborator::{Collaborator, CollaboratorRecord};
64use bobbin_types::sh_tangled::repo::issue::state::{
65 State as IssueState, StateRecord as IssueStateRecord,
66};
67use bobbin_types::sh_tangled::repo::issue::{Issue, IssueGetRecordOutput, IssueRecord};
68use bobbin_types::sh_tangled::repo::pull::status::{
69 Status as PullStatus, StatusRecord as PullStatusRecord,
70};
71use bobbin_types::sh_tangled::repo::pull::{Pull, PullGetRecordOutput, PullRecord};
72use bobbin_types::sh_tangled::repo::{Repo, RepoGetRecordOutput, RepoRecord};
73use bobbin_types::sh_tangled::spindle::member::{
74 Member as SpindleMember, MemberRecord as SpindleMemberRecord,
75};
76use bobbin_types::sh_tangled::spindle::{Spindle, SpindleRecord};
77use bobbin_types::sh_tangled::string::{TangledString, TangledStringRecord};
78use futures::Stream;
79use futures::stream::{self, StreamExt, TryStreamExt};
80use jacquard_common::types::did::Did;
81use jacquard_common::types::ident::AtIdentifier;
82use jacquard_common::types::nsid::Nsid;
83use jacquard_common::types::recordkey::Rkey;
84use jacquard_common::types::string::{AtUri, Cid};
85use jacquard_common::xrpc::XrpcResp;
86use jacquard_common::{DefaultStr, IntoStatic};
87use serde::{Deserialize, Serialize};
88use std::convert::Infallible;
89use std::time::Duration;
90use thiserror::Error;
91use url::form_urlencoded;
92
93use tower_http::classify::ServerErrorsFailureClass;
94use tower_http::trace::{DefaultMakeSpan, OnFailure, OnResponse, TraceLayer};
95use tracing::{Level, Span};
96
97mod backpressure;
98mod filter;
99
100pub use backpressure::{
101 HeavyLimiter, HeavyPermit, MaxInFlight, PerRequestAnonBytes, PressureVerdict, ReservedFloor,
102};
103use filter::{IssueFilter, ListFilter, NoFilter, PullFilter};
104
105const DEFAULT_LIMIT: u32 = 50;
106const FETCH_CONCURRENCY: usize = 8;
107
108#[derive(Clone)]
109pub struct AppState {
110 pub records: Arc<dyn RecordStore>,
111 pub slingshot: SlingshotClient,
112 pub edges: Arc<EdgeStore>,
113 pub issue_states: Arc<StateIndex<IssueStateKind>>,
114 pub pull_statuses: Arc<StateIndex<PullStatusKind>>,
115 pub coverage: Arc<CoverageWatch>,
116 pub knots: Arc<KnotProxy>,
117 pub search: Arc<dyn SearchReader>,
118 pub resolver: Arc<RepoIdResolver>,
119 pub limiter: Option<Arc<HeavyLimiter>>,
120}
121
122impl AppState {
123 #[allow(clippy::too_many_arguments)]
124 pub fn new(
125 records: Arc<dyn RecordStore>,
126 slingshot: SlingshotClient,
127 edges: Arc<EdgeStore>,
128 issue_states: Arc<StateIndex<IssueStateKind>>,
129 pull_statuses: Arc<StateIndex<PullStatusKind>>,
130 coverage: Arc<CoverageWatch>,
131 knots: Arc<KnotProxy>,
132 search: Arc<dyn SearchReader>,
133 resolver: Arc<RepoIdResolver>,
134 ) -> Self {
135 Self {
136 records,
137 slingshot,
138 edges,
139 issue_states,
140 pull_statuses,
141 coverage,
142 knots,
143 search,
144 resolver,
145 limiter: None,
146 }
147 }
148
149 pub fn with_limiter(mut self, limiter: Option<Arc<HeavyLimiter>>) -> Self {
150 self.limiter = limiter;
151 self
152 }
153
154 fn heavy_permit(&self) -> Result<Option<HeavyPermit>, XrpcError> {
155 self.limiter.as_ref().map(|l| l.try_enter()).transpose()
156 }
157}
158
159pub fn router(state: AppState) -> Router {
160 Router::new()
161 .route("/xrpc/sh.tangled.repo.getRepo", get(get_repo))
162 .route("/xrpc/sh.tangled.repo.getRepos", get(get_repos))
163 .route(
164 "/xrpc/sh.tangled.repo.getRepoByRepoDid",
165 get(get_repo_by_repo_did),
166 )
167 .route("/xrpc/sh.tangled.actor.getProfile", get(get_profile))
168 .route("/xrpc/sh.tangled.actor.getProfiles", get(get_profiles))
169 .route("/xrpc/sh.tangled.repo.getIssue", get(get_issue))
170 .route("/xrpc/sh.tangled.repo.getIssues", get(get_issues))
171 .route("/xrpc/sh.tangled.repo.getPull", get(get_pull))
172 .route("/xrpc/sh.tangled.repo.getPulls", get(get_pulls))
173 .route("/xrpc/sh.tangled.feed.listStars", get(list_stars))
174 .route("/xrpc/sh.tangled.feed.countStars", get(count_stars))
175 .route("/xrpc/sh.tangled.graph.listFollows", get(list_follows))
176 .route("/xrpc/sh.tangled.graph.countFollows", get(count_follows))
177 .route("/xrpc/sh.tangled.repo.listIssues", get(list_issues))
178 .route("/xrpc/sh.tangled.repo.countIssues", get(count_issues))
179 .route("/xrpc/sh.tangled.repo.listPulls", get(list_pulls))
180 .route("/xrpc/sh.tangled.repo.countPulls", get(count_pulls))
181 .route(
182 "/xrpc/sh.tangled.feed.listComments",
183 get(list_feed_comments),
184 )
185 .route(
186 "/xrpc/sh.tangled.feed.countComments",
187 get(count_feed_comments),
188 )
189 .route("/xrpc/sh.tangled.feed.listReactions", get(list_reactions))
190 .route("/xrpc/sh.tangled.feed.countReactions", get(count_reactions))
191 .route("/xrpc/sh.tangled.git.listRefUpdates", get(list_ref_updates))
192 .route(
193 "/xrpc/sh.tangled.git.countRefUpdates",
194 get(count_ref_updates),
195 )
196 .route(
197 "/xrpc/sh.tangled.repo.listCollaborators",
198 get(list_collaborators),
199 )
200 .route(
201 "/xrpc/sh.tangled.repo.countCollaborators",
202 get(count_collaborators),
203 )
204 .route(
205 "/xrpc/sh.tangled.repo.issue.listStates",
206 get(list_issue_states),
207 )
208 .route(
209 "/xrpc/sh.tangled.repo.issue.countStates",
210 get(count_issue_states),
211 )
212 .route(
213 "/xrpc/sh.tangled.repo.pull.listStatuses",
214 get(list_pull_statuses),
215 )
216 .route(
217 "/xrpc/sh.tangled.repo.pull.countStatuses",
218 get(count_pull_statuses),
219 )
220 .route("/xrpc/sh.tangled.repo.listRepos", get(list_repos))
221 .route("/xrpc/sh.tangled.repo.countRepos", get(count_repos))
222 .route("/xrpc/sh.tangled.knot.listKnots", get(list_knots))
223 .route("/xrpc/sh.tangled.knot.countKnots", get(count_knots))
224 .route("/xrpc/sh.tangled.spindle.listSpindles", get(list_spindles))
225 .route(
226 "/xrpc/sh.tangled.spindle.countSpindles",
227 get(count_spindles),
228 )
229 .route("/xrpc/sh.tangled.publicKey.listKeys", get(list_public_keys))
230 .route(
231 "/xrpc/sh.tangled.publicKey.countKeys",
232 get(count_public_keys),
233 )
234 .route("/xrpc/sh.tangled.graph.listVouches", get(list_vouches))
235 .route("/xrpc/sh.tangled.graph.countVouches", get(count_vouches))
236 .route("/xrpc/sh.tangled.feed.listStarsBy", get(list_stars_by))
237 .route("/xrpc/sh.tangled.feed.countStarsBy", get(count_stars_by))
238 .route(
239 "/xrpc/sh.tangled.feed.listReactionsBy",
240 get(list_reactions_by),
241 )
242 .route(
243 "/xrpc/sh.tangled.feed.countReactionsBy",
244 get(count_reactions_by),
245 )
246 .route("/xrpc/sh.tangled.graph.listFollowsBy", get(list_follows_by))
247 .route(
248 "/xrpc/sh.tangled.graph.countFollowsBy",
249 get(count_follows_by),
250 )
251 .route("/xrpc/sh.tangled.graph.listVouchesBy", get(list_vouches_by))
252 .route(
253 "/xrpc/sh.tangled.graph.countVouchesBy",
254 get(count_vouches_by),
255 )
256 .route(
257 "/xrpc/sh.tangled.git.listRefUpdatesBy",
258 get(list_ref_updates_by),
259 )
260 .route(
261 "/xrpc/sh.tangled.git.countRefUpdatesBy",
262 get(count_ref_updates_by),
263 )
264 .route(
265 "/xrpc/sh.tangled.knot.listMembersBy",
266 get(list_knot_members_by),
267 )
268 .route(
269 "/xrpc/sh.tangled.knot.countMembersBy",
270 get(count_knot_members_by),
271 )
272 .route("/xrpc/sh.tangled.label.listOpsBy", get(list_label_ops_by))
273 .route("/xrpc/sh.tangled.label.countOpsBy", get(count_label_ops_by))
274 .route(
275 "/xrpc/sh.tangled.pipeline.listPipelinesBy",
276 get(list_pipelines_by),
277 )
278 .route(
279 "/xrpc/sh.tangled.pipeline.countPipelinesBy",
280 get(count_pipelines_by),
281 )
282 .route(
283 "/xrpc/sh.tangled.pipeline.listStatusesBy",
284 get(list_pipeline_statuses_by),
285 )
286 .route(
287 "/xrpc/sh.tangled.pipeline.countStatusesBy",
288 get(count_pipeline_statuses_by),
289 )
290 .route(
291 "/xrpc/sh.tangled.repo.listArtifactsBy",
292 get(list_artifacts_by),
293 )
294 .route(
295 "/xrpc/sh.tangled.repo.countArtifactsBy",
296 get(count_artifacts_by),
297 )
298 .route(
299 "/xrpc/sh.tangled.repo.listCollaboratorsBy",
300 get(list_collaborators_by),
301 )
302 .route(
303 "/xrpc/sh.tangled.repo.countCollaboratorsBy",
304 get(count_collaborators_by),
305 )
306 .route("/xrpc/sh.tangled.repo.listIssuesBy", get(list_issues_by))
307 .route("/xrpc/sh.tangled.repo.countIssuesBy", get(count_issues_by))
308 .route(
309 "/xrpc/sh.tangled.feed.listCommentsBy",
310 get(list_feed_comments_by),
311 )
312 .route(
313 "/xrpc/sh.tangled.feed.countCommentsBy",
314 get(count_feed_comments_by),
315 )
316 .route(
317 "/xrpc/sh.tangled.repo.issue.listStatesBy",
318 get(list_issue_states_by),
319 )
320 .route(
321 "/xrpc/sh.tangled.repo.issue.countStatesBy",
322 get(count_issue_states_by),
323 )
324 .route("/xrpc/sh.tangled.repo.listPullsBy", get(list_pulls_by))
325 .route("/xrpc/sh.tangled.repo.countPullsBy", get(count_pulls_by))
326 .route(
327 "/xrpc/sh.tangled.repo.pull.listStatusesBy",
328 get(list_pull_statuses_by),
329 )
330 .route(
331 "/xrpc/sh.tangled.repo.pull.countStatusesBy",
332 get(count_pull_statuses_by),
333 )
334 .route(
335 "/xrpc/sh.tangled.spindle.listMembersBy",
336 get(list_spindle_members_by),
337 )
338 .route(
339 "/xrpc/sh.tangled.spindle.countMembersBy",
340 get(count_spindle_members_by),
341 )
342 .route(
343 "/xrpc/sh.tangled.label.listDefinitions",
344 get(list_label_definitions),
345 )
346 .route(
347 "/xrpc/sh.tangled.label.countDefinitions",
348 get(count_label_definitions),
349 )
350 .route("/xrpc/sh.tangled.label.listOps", get(list_label_ops))
351 .route("/xrpc/sh.tangled.label.countOps", get(count_label_ops))
352 .route(
353 "/xrpc/sh.tangled.pipeline.listPipelines",
354 get(list_pipelines),
355 )
356 .route(
357 "/xrpc/sh.tangled.pipeline.countPipelines",
358 get(count_pipelines),
359 )
360 .route(
361 "/xrpc/sh.tangled.pipeline.listStatuses",
362 get(list_pipeline_statuses),
363 )
364 .route(
365 "/xrpc/sh.tangled.pipeline.countStatuses",
366 get(count_pipeline_statuses),
367 )
368 .route("/xrpc/sh.tangled.repo.listArtifacts", get(list_artifacts))
369 .route("/xrpc/sh.tangled.repo.countArtifacts", get(count_artifacts))
370 .route("/xrpc/sh.tangled.knot.listMembers", get(list_knot_members))
371 .route(
372 "/xrpc/sh.tangled.knot.countMembers",
373 get(count_knot_members),
374 )
375 .route(
376 "/xrpc/sh.tangled.spindle.listMembers",
377 get(list_spindle_members),
378 )
379 .route(
380 "/xrpc/sh.tangled.spindle.countMembers",
381 get(count_spindle_members),
382 )
383 .route("/xrpc/sh.tangled.string.listStrings", get(list_strings))
384 .route("/xrpc/sh.tangled.string.countStrings", get(count_strings))
385 .route("/xrpc/sh.tangled.search.query", get(search_query))
386 .route("/xrpc/sh.tangled.bobbin.getCoverage", get(get_coverage))
387 .route(
388 "/xrpc/com.bad-example.identity.resolveMiniDoc",
389 get(resolve_mini_doc),
390 )
391 .merge(knot_proxied_routes())
392 .layer(
393 TraceLayer::new_for_http()
394 .make_span_with(DefaultMakeSpan::new().level(Level::INFO))
395 .on_request(())
396 .on_response(LatencyFreeTrace)
397 .on_failure(LatencyFreeTrace),
398 )
399 .with_state(state)
400}
401
402#[derive(Clone, Copy, Debug)]
403struct LatencyFreeTrace;
404
405impl<B> OnResponse<B> for LatencyFreeTrace {
406 fn on_response(self, response: &Response<B>, _latency: Duration, _span: &Span) {
407 tracing::event!(
408 target: "tower_http::trace::on_response",
409 Level::INFO,
410 status = response.status().as_u16(),
411 "request completed",
412 );
413 }
414}
415
416impl OnFailure<ServerErrorsFailureClass> for LatencyFreeTrace {
417 fn on_failure(&mut self, error: ServerErrorsFailureClass, _latency: Duration, _span: &Span) {
418 tracing::event!(
419 target: "tower_http::trace::on_failure",
420 Level::WARN,
421 error = %error,
422 "request failed",
423 );
424 }
425}
426
427const REPO_PROXIED_NSIDS: &[&str] = &[
428 "sh.tangled.repo.archive",
429 "sh.tangled.repo.blob",
430 "sh.tangled.repo.branch",
431 "sh.tangled.repo.branches",
432 "sh.tangled.repo.compare",
433 "sh.tangled.repo.describeRepo",
434 "sh.tangled.repo.diff",
435 "sh.tangled.repo.getDefaultBranch",
436 "sh.tangled.repo.languages",
437 "sh.tangled.repo.listSecrets",
438 "sh.tangled.repo.log",
439 "sh.tangled.repo.tag",
440 "sh.tangled.repo.tags",
441 "sh.tangled.repo.tree",
442];
443
444const KNOT_PROXIED_NSIDS: &[&str] = &[
445 "sh.tangled.owner",
446 "sh.tangled.knot.version",
447 "sh.tangled.knot.listKeys",
448];
449
450const PASSTHROUGH_HEADERS: &[&HeaderName] = &[
451 &CONTENT_TYPE,
452 &CONTENT_LENGTH,
453 &CONTENT_ENCODING,
454 &ETAG,
455 &CACHE_CONTROL,
456 &LAST_MODIFIED,
457 &CONTENT_DISPOSITION,
458 &ACCEPT_RANGES,
459 &CONTENT_RANGE,
460];
461
462const FORWARDED_REQUEST_HEADERS: &[&HeaderName] =
463 &[&RANGE, &IF_RANGE, &IF_NONE_MATCH, &IF_MODIFIED_SINCE];
464
465const KNOT_HOST_PARAM: &str = "knot";
466const REPO_PARAM: &str = "repo";
467
468type ProxyParams = Vec<(String, String)>;
469
470fn knot_proxied_routes() -> Router<AppState> {
471 let with_repo = register_proxied(Router::new(), REPO_PROXIED_NSIDS, proxy_repo_handler);
472 register_proxied(with_repo, KNOT_PROXIED_NSIDS, proxy_knot_handler)
473}
474
475fn register_proxied<H, Fut>(
476 router: Router<AppState>,
477 nsids: &[&'static str],
478 handler: H,
479) -> Router<AppState>
480where
481 H: Fn(AppState, HeaderMap, ProxyParams, Nsid<DefaultStr>) -> Fut
482 + Clone
483 + Send
484 + Sync
485 + 'static,
486 Fut: Future<Output = Result<Response, XrpcError>> + Send + 'static,
487{
488 nsids.iter().fold(router, |router, &nsid_lit| {
489 let handler = handler.clone();
490 let nsid = nsid_static(nsid_lit);
491 router.route(
492 &format!("/xrpc/{nsid_lit}"),
493 get(
494 move |State(state): State<AppState>,
495 headers: HeaderMap,
496 Query(params): Query<ProxyParams>| {
497 handler(state, headers, params, nsid.clone())
498 },
499 ),
500 )
501 })
502}
503
504#[derive(Clone, Debug)]
505pub enum SubjectQuery {
506 Did(Did<DefaultStr>),
507 Uri(AtUri<DefaultStr>),
508}
509
510impl<'de> Deserialize<'de> for SubjectQuery {
511 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
512 where
513 D: serde::Deserializer<'de>,
514 {
515 let raw = String::deserialize(deserializer)?;
516 if let Ok(did) = Did::<DefaultStr>::new_owned(&raw) {
517 return Ok(Self::Did(did));
518 }
519 AtUri::<DefaultStr>::new_owned(&raw)
520 .map(Self::Uri)
521 .map_err(serde::de::Error::custom)
522 }
523}
524
525#[derive(Clone, Debug, Eq, PartialEq)]
526pub struct ExpectedNsid {
527 canon: Nsid<DefaultStr>,
528 aliases: &'static [&'static str],
529}
530
531const FEED_COMMENT_LEGACY_ALIASES: &[&str] = &[
532 "sh.tangled.repo.issue.comment",
533 "sh.tangled.repo.pull.comment",
534];
535
536fn aliases_for(nsid: &str) -> &'static [&'static str] {
537 match nsid {
538 "sh.tangled.feed.comment" => FEED_COMMENT_LEGACY_ALIASES,
539 _ => &[],
540 }
541}
542
543impl ExpectedNsid {
544 pub fn new(nsid: Nsid<DefaultStr>) -> Self {
545 let aliases = aliases_for(nsid.as_ref());
546 Self {
547 canon: nsid,
548 aliases,
549 }
550 }
551
552 pub fn from_static(s: &'static str) -> Self {
553 let canon = nsid_static(s);
554 let aliases = aliases_for(s);
555 Self { canon, aliases }
556 }
557
558 pub fn as_nsid(&self) -> &Nsid<DefaultStr> {
559 &self.canon
560 }
561
562 pub fn as_str(&self) -> &str {
563 self.canon.as_ref()
564 }
565
566 fn accepts(&self, other: &str) -> bool {
567 other == self.canon.as_ref() || self.aliases.contains(&other)
568 }
569}
570
571#[derive(Debug, Deserialize)]
572struct GetRepoQuery {
573 repo: AtUri<DefaultStr>,
574}
575
576#[derive(Debug, Deserialize)]
577struct GetRepoByRepoDidQuery {
578 #[serde(rename = "repoDid")]
579 repo_did: Did<DefaultStr>,
580}
581
582#[derive(Debug, Deserialize)]
583struct GetProfileQuery {
584 actor: AtUri<DefaultStr>,
585}
586
587#[derive(Debug, Deserialize)]
588struct GetIssueQuery {
589 issue: AtUri<DefaultStr>,
590}
591
592#[derive(Debug, Deserialize)]
593struct GetPullQuery {
594 pull: AtUri<DefaultStr>,
595}
596
597#[derive(Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
598#[serde(rename_all = "lowercase")]
599enum Order {
600 Asc,
601 #[default]
602 Desc,
603}
604
605impl From<Order> for SortDir {
606 fn from(o: Order) -> Self {
607 match o {
608 Order::Asc => SortDir::Asc,
609 Order::Desc => SortDir::Desc,
610 }
611 }
612}
613
614#[derive(Debug, Deserialize)]
615struct TypedListQuery<F> {
616 subject: SubjectQuery,
617 cursor: Option<String>,
618 limit: Option<u32>,
619 #[serde(default)]
620 order: Order,
621 #[serde(flatten)]
622 filter: F,
623}
624
625impl<F> TypedListQuery<F> {
626 fn dir(&self) -> SortDir {
627 self.order.into()
628 }
629}
630
631#[derive(Debug, Deserialize)]
632struct CountQuery {
633 subject: SubjectQuery,
634}
635
636#[derive(Debug, Deserialize)]
637struct SearchQueryParams {
638 q: String,
639 nsid: Option<Nsid<DefaultStr>>,
640 author: Option<Did<DefaultStr>>,
641 repo: Option<Did<DefaultStr>>,
642 since: Option<String>,
643 until: Option<String>,
644 cursor: Option<String>,
645 limit: Option<u32>,
646}
647
648pub struct XrpcQuery<T>(pub T);
649
650impl<S, T> FromRequestParts<S> for XrpcQuery<T>
651where
652 S: Send + Sync,
653 T: serde::de::DeserializeOwned + Send + 'static,
654{
655 type Rejection = XrpcError;
656
657 async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
658 Query::<T>::from_request_parts(parts, state)
659 .await
660 .map(|Query(t)| Self(t))
661 .map_err(|rej: QueryRejection| XrpcError::InvalidParams(rej.body_text()))
662 }
663}
664
665#[derive(Debug, Error)]
666pub enum XrpcError {
667 #[error("invalid request: {0}")]
668 InvalidParams(String),
669 #[error("record not found")]
670 NotFound,
671 #[error("upstream unavailable: {0}")]
672 UpstreamUnavailable(String),
673 #[error("upstream gone: {0}")]
674 UpstreamGone(String),
675 #[error("invalid record: {0}")]
676 InvalidRecord(String),
677 #[error("internal: {0}")]
678 Internal(String),
679 #[error("overloaded, shedding under memory pressure")]
680 Overloaded,
681}
682
683impl XrpcError {
684 pub fn overloaded() -> Self {
685 Self::Overloaded
686 }
687}
688
689#[derive(Serialize)]
690struct ErrorBody {
691 error: &'static str,
692 message: String,
693}
694
695impl IntoResponse for XrpcError {
696 fn into_response(self) -> Response {
697 let (status, error) = match &self {
698 Self::InvalidParams(_) => (StatusCode::BAD_REQUEST, "InvalidRequest"),
699 Self::NotFound => (StatusCode::NOT_FOUND, "RecordNotFound"),
700 Self::UpstreamUnavailable(_) => (StatusCode::BAD_GATEWAY, "UpstreamFailed"),
701 Self::UpstreamGone(_) => (StatusCode::BAD_GATEWAY, "UpstreamGone"),
702 Self::InvalidRecord(_) => (StatusCode::BAD_GATEWAY, "InvalidRecord"),
703 Self::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "InternalError"),
704 Self::Overloaded => (StatusCode::SERVICE_UNAVAILABLE, "Overloaded"),
705 };
706 let body = ErrorBody {
707 error,
708 message: self.to_string(),
709 };
710 (status, Json(body)).into_response()
711 }
712}
713
714#[derive(Serialize)]
715#[serde(rename_all = "camelCase")]
716struct CoverageEnvelope {
717 ready: bool,
718 events_processed: u64,
719 last_cursor: u64,
720}
721
722impl From<Coverage> for CoverageEnvelope {
723 fn from(c: Coverage) -> Self {
724 Self {
725 ready: c.is_ready(),
726 events_processed: c.events_processed(),
727 last_cursor: c.last_cursor().raw(),
728 }
729 }
730}
731
732#[derive(Serialize)]
733#[serde(rename_all = "camelCase")]
734struct RecordView<V> {
735 uri: AtUri<DefaultStr>,
736 cid: Option<Cid<DefaultStr>>,
737 value: V,
738}
739
740#[derive(Serialize)]
741#[serde(rename_all = "camelCase")]
742struct StatefulItem<V> {
743 #[serde(flatten)]
744 view: RecordView<V>,
745 state: &'static str,
746 #[serde(skip_serializing_if = "Option::is_none")]
747 state_updated_at: Option<String>,
748 comment_count: u64,
749}
750
751fn format_micros(micros: u64) -> String {
752 let signed = i64::try_from(micros).ok();
753 let rfc = signed
754 .and_then(chrono::DateTime::<chrono::Utc>::from_timestamp_micros)
755 .map(|dt| dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true));
756 rfc.unwrap_or_else(|| micros.to_string())
757}
758
759pub(crate) fn source_authority_did(source: &AtUri<DefaultStr>) -> Option<Did<DefaultStr>> {
760 match source.authority() {
761 AtIdentifier::Did(d) => Some(d.clone().into_static()),
762 AtIdentifier::Handle(_) => None,
763 }
764}
765
766fn enrich_issue_view(
767 state: &AppState,
768 view: RecordView<Issue<DefaultStr>>,
769) -> StatefulItem<Issue<DefaultStr>> {
770 let issue_author = source_authority_did(&view.uri);
771 let repo_did = view.value.repo.clone();
772 enrich_view(
773 &state.edges,
774 nsid_static("sh.tangled.feed.comment"),
775 &state.issue_states,
776 view,
777 move |src| accept_state_source(src, issue_author.as_ref(), &repo_did),
778 )
779}
780
781fn enrich_pull_view(
782 state: &AppState,
783 view: RecordView<Pull<DefaultStr>>,
784) -> StatefulItem<Pull<DefaultStr>> {
785 let pull_author = source_authority_did(&view.uri);
786 let target_repo = view.value.target.repo.clone();
787 enrich_view(
788 &state.edges,
789 nsid_static("sh.tangled.feed.comment"),
790 &state.pull_statuses,
791 view,
792 move |src| accept_state_source(src, pull_author.as_ref(), &target_repo),
793 )
794}
795
796pub(crate) fn accept_state_source(
797 source: &AtUri<DefaultStr>,
798 entity_author: Option<&Did<DefaultStr>>,
799 repo_owner: &Did<DefaultStr>,
800) -> bool {
801 let Some(src) = source_authority_did(source) else {
802 return false;
803 };
804 Some(&src) == entity_author || &src == repo_owner
805}
806
807fn enrich_view<V, K, F>(
808 edges: &EdgeStore,
809 comment_nsid: Nsid<DefaultStr>,
810 states: &StateIndex<K>,
811 view: RecordView<V>,
812 accept: F,
813) -> StatefulItem<V>
814where
815 K: StateKind + Default,
816 F: Fn(&AtUri<DefaultStr>) -> bool,
817{
818 let comment_count = edges.count(&EdgeKey::new(
819 comment_nsid,
820 SubjectRef::Uri(view.uri.clone()),
821 ));
822 let (state, state_updated_at) = states
823 .latest_by(&view.uri, accept)
824 .map_or((K::default().wire(), None), |(kind, micros)| {
825 (kind.wire(), Some(format_micros(micros)))
826 });
827 StatefulItem {
828 view,
829 state,
830 state_updated_at,
831 comment_count,
832 }
833}
834
835#[derive(Serialize)]
836#[serde(rename_all = "camelCase")]
837struct CountResponse {
838 count: u64,
839 distinct_authors: u64,
840}
841
842#[derive(Serialize)]
843#[serde(rename_all = "camelCase")]
844struct SearchHitView {
845 uri: AtUri<DefaultStr>,
846 cid: Option<Cid<DefaultStr>>,
847 nsid: Nsid<DefaultStr>,
848 score: f32,
849 value: SearchableRecord,
850}
851
852fn map_slingshot(err: SlingshotError) -> XrpcError {
853 use SlingshotError as E;
854 match err {
855 E::NotFound => XrpcError::NotFound,
856 e @ (E::Decode(_)
857 | E::MissingField(_)
858 | E::InvalidAtUri(_)
859 | E::InvalidCid(_)
860 | E::UriMismatch { .. }) => XrpcError::InvalidRecord(e.to_string()),
861 e @ (E::Network(_)
862 | E::Build(_)
863 | E::Upstream(_)
864 | E::BodyTooLarge { .. }
865 | E::BadScheme(_)) => XrpcError::UpstreamUnavailable(e.to_string()),
866 }
867}
868
869fn parse_uri(raw: &str) -> Result<AtUri<DefaultStr>, XrpcError> {
870 AtUri::<DefaultStr>::new_owned(raw).map_err(|e| XrpcError::InvalidParams(format!("uri: {e}")))
871}
872
873#[derive(Clone, Copy, Debug, Eq, PartialEq)]
874pub enum SubjectShape {
875 BareDid,
876 Collection(&'static str),
877 BareDidOrOneOfCollections(&'static [&'static str]),
878 OneOfCollections(&'static [&'static str]),
879 AnyAtUri,
880}
881
882pub trait HasSubject {
883 const SHAPE: SubjectShape;
884}
885
886pub trait MirrorOf {
887 type Record: XrpcResp;
888 const EDGE_KIND: &'static str;
889 const SHAPE: SubjectShape;
890}
891
892pub struct StarBy;
893pub struct ReactionBy;
894pub struct FollowBy;
895pub struct VouchBy;
896pub struct RefUpdateBy;
897pub struct KnotMemberBy;
898pub struct LabelOpBy;
899pub struct PipelineBy;
900pub struct PipelineStatusBy;
901pub struct ArtifactBy;
902pub struct CollaboratorBy;
903pub struct FeedCommentBy;
904pub struct IssueBy;
905pub struct IssueStateBy;
906pub struct PullBy;
907pub struct PullStatusBy;
908pub struct SpindleMemberBy;
909
910impl MirrorOf for StarBy {
911 type Record = StarRecord;
912 const EDGE_KIND: &'static str = "sh.tangled.feed.star.by";
913 const SHAPE: SubjectShape = SubjectShape::BareDid;
914}
915impl MirrorOf for ReactionBy {
916 type Record = ReactionRecord;
917 const EDGE_KIND: &'static str = "sh.tangled.feed.reaction.by";
918 const SHAPE: SubjectShape = SubjectShape::BareDid;
919}
920impl MirrorOf for FollowBy {
921 type Record = FollowRecord;
922 const EDGE_KIND: &'static str = "sh.tangled.graph.follow.by";
923 const SHAPE: SubjectShape = SubjectShape::BareDid;
924}
925impl MirrorOf for VouchBy {
926 type Record = VouchRecord;
927 const EDGE_KIND: &'static str = "sh.tangled.graph.vouch.by";
928 const SHAPE: SubjectShape = SubjectShape::BareDid;
929}
930impl MirrorOf for RefUpdateBy {
931 type Record = RefUpdateRecord;
932 const EDGE_KIND: &'static str = "sh.tangled.git.refUpdate.by";
933 const SHAPE: SubjectShape = SubjectShape::BareDid;
934}
935impl MirrorOf for KnotMemberBy {
936 type Record = KnotMemberRecord;
937 const EDGE_KIND: &'static str = "sh.tangled.knot.member.by";
938 const SHAPE: SubjectShape = SubjectShape::BareDid;
939}
940impl MirrorOf for LabelOpBy {
941 type Record = LabelOpRecord;
942 const EDGE_KIND: &'static str = "sh.tangled.label.op.by";
943 const SHAPE: SubjectShape = SubjectShape::BareDid;
944}
945impl MirrorOf for PipelineBy {
946 type Record = PipelineRecord;
947 const EDGE_KIND: &'static str = "sh.tangled.pipeline.by";
948 const SHAPE: SubjectShape = SubjectShape::BareDid;
949}
950impl MirrorOf for PipelineStatusBy {
951 type Record = PipelineStatusRecord;
952 const EDGE_KIND: &'static str = "sh.tangled.pipeline.status.by";
953 const SHAPE: SubjectShape = SubjectShape::BareDid;
954}
955impl MirrorOf for ArtifactBy {
956 type Record = ArtifactRecord;
957 const EDGE_KIND: &'static str = "sh.tangled.repo.artifact.by";
958 const SHAPE: SubjectShape = SubjectShape::BareDid;
959}
960impl MirrorOf for CollaboratorBy {
961 type Record = CollaboratorRecord;
962 const EDGE_KIND: &'static str = "sh.tangled.repo.collaborator.by";
963 const SHAPE: SubjectShape = SubjectShape::BareDid;
964}
965impl MirrorOf for FeedCommentBy {
966 type Record = FeedCommentRecord;
967 const EDGE_KIND: &'static str = "sh.tangled.feed.comment.by";
968 const SHAPE: SubjectShape = SubjectShape::BareDid;
969}
970impl MirrorOf for IssueBy {
971 type Record = IssueRecord;
972 const EDGE_KIND: &'static str = "sh.tangled.repo.issue.by";
973 const SHAPE: SubjectShape = SubjectShape::BareDid;
974}
975impl MirrorOf for IssueStateBy {
976 type Record = IssueStateRecord;
977 const EDGE_KIND: &'static str = "sh.tangled.repo.issue.state.by";
978 const SHAPE: SubjectShape = SubjectShape::BareDid;
979}
980impl MirrorOf for PullBy {
981 type Record = PullRecord;
982 const EDGE_KIND: &'static str = "sh.tangled.repo.pull.by";
983 const SHAPE: SubjectShape = SubjectShape::BareDid;
984}
985impl MirrorOf for PullStatusBy {
986 type Record = PullStatusRecord;
987 const EDGE_KIND: &'static str = "sh.tangled.repo.pull.status.by";
988 const SHAPE: SubjectShape = SubjectShape::BareDid;
989}
990impl MirrorOf for SpindleMemberBy {
991 type Record = SpindleMemberRecord;
992 const EDGE_KIND: &'static str = "sh.tangled.spindle.member.by";
993 const SHAPE: SubjectShape = SubjectShape::BareDid;
994}
995
996impl HasSubject for StarRecord {
997 const SHAPE: SubjectShape = SubjectShape::BareDidOrOneOfCollections(&["sh.tangled.string"]);
998}
999impl HasSubject for FollowRecord {
1000 const SHAPE: SubjectShape = SubjectShape::BareDid;
1001}
1002impl HasSubject for IssueRecord {
1003 const SHAPE: SubjectShape = SubjectShape::BareDid;
1004}
1005impl HasSubject for PullRecord {
1006 const SHAPE: SubjectShape = SubjectShape::BareDid;
1007}
1008impl HasSubject for FeedCommentRecord {
1009 const SHAPE: SubjectShape = SubjectShape::OneOfCollections(&[
1010 "sh.tangled.repo.issue",
1011 "sh.tangled.repo.pull",
1012 "sh.tangled.string",
1013 ]);
1014}
1015impl HasSubject for LabelDefinitionRecord {
1016 const SHAPE: SubjectShape = SubjectShape::BareDid;
1017}
1018impl HasSubject for LabelOpRecord {
1019 const SHAPE: SubjectShape =
1020 SubjectShape::OneOfCollections(&["sh.tangled.repo.issue", "sh.tangled.repo.pull"]);
1021}
1022impl HasSubject for PipelineRecord {
1023 const SHAPE: SubjectShape = SubjectShape::BareDid;
1024}
1025impl HasSubject for PipelineStatusRecord {
1026 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.pipeline");
1027}
1028impl HasSubject for ArtifactRecord {
1029 const SHAPE: SubjectShape = SubjectShape::BareDid;
1030}
1031impl HasSubject for KnotMemberRecord {
1032 const SHAPE: SubjectShape = SubjectShape::BareDid;
1033}
1034impl HasSubject for SpindleMemberRecord {
1035 const SHAPE: SubjectShape = SubjectShape::BareDid;
1036}
1037impl HasSubject for TangledStringRecord {
1038 const SHAPE: SubjectShape = SubjectShape::BareDid;
1039}
1040impl HasSubject for ReactionRecord {
1041 const SHAPE: SubjectShape = SubjectShape::AnyAtUri;
1042}
1043impl HasSubject for RefUpdateRecord {
1044 const SHAPE: SubjectShape = SubjectShape::BareDid;
1045}
1046impl HasSubject for CollaboratorRecord {
1047 const SHAPE: SubjectShape = SubjectShape::BareDid;
1048}
1049impl HasSubject for IssueStateRecord {
1050 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.repo.issue");
1051}
1052impl HasSubject for PullStatusRecord {
1053 const SHAPE: SubjectShape = SubjectShape::Collection("sh.tangled.repo.pull");
1054}
1055impl HasSubject for KnotRecord {
1056 const SHAPE: SubjectShape = SubjectShape::BareDid;
1057}
1058impl HasSubject for SpindleRecord {
1059 const SHAPE: SubjectShape = SubjectShape::BareDid;
1060}
1061impl HasSubject for PublicKeyRecord {
1062 const SHAPE: SubjectShape = SubjectShape::BareDid;
1063}
1064impl HasSubject for RepoRecord {
1065 const SHAPE: SubjectShape = SubjectShape::BareDid;
1066}
1067impl HasSubject for VouchRecord {
1068 const SHAPE: SubjectShape = SubjectShape::BareDid;
1069}
1070
1071fn parse_subject(raw: &SubjectQuery, shape: SubjectShape) -> Result<SubjectRef, XrpcError> {
1072 let uri = match raw {
1073 SubjectQuery::Did(did) => {
1074 return match shape {
1075 SubjectShape::BareDid | SubjectShape::BareDidOrOneOfCollections(_) => {
1076 Ok(SubjectRef::Did(did.clone()))
1077 }
1078 SubjectShape::Collection(expected) => Err(XrpcError::InvalidParams(format!(
1079 "subject must be at://<did>/{expected}/<rkey>, got bare did"
1080 ))),
1081 SubjectShape::OneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!(
1082 "subject must be at://<did>/<nsid>/<rkey> with nsid in [{}], got bare did",
1083 allowed.join(", "),
1084 ))),
1085 SubjectShape::AnyAtUri => Err(XrpcError::InvalidParams(
1086 "subject must be at-uri form, got bare did".into(),
1087 )),
1088 };
1089 }
1090 SubjectQuery::Uri(uri) => uri,
1091 };
1092 if matches!(uri.authority(), AtIdentifier::Handle(_)) {
1093 return Err(XrpcError::InvalidParams(
1094 "subject authority must be a did, not a handle".into(),
1095 ));
1096 }
1097 let Some(collection) = uri.collection() else {
1098 return Err(XrpcError::InvalidParams(
1099 "subject must be a bare did or full at://<did>/<nsid>/<rkey>".into(),
1100 ));
1101 };
1102 let c = collection.as_ref();
1103 match shape {
1104 SubjectShape::BareDid => Err(XrpcError::InvalidParams(format!(
1105 "subject must be a bare did, got at-uri with collection {c}"
1106 ))),
1107 SubjectShape::Collection(expected) if c == expected => {
1108 require_rkey(uri, expected)?;
1109 Ok(SubjectRef::Uri(uri.clone()))
1110 }
1111 SubjectShape::Collection(expected) => Err(XrpcError::InvalidParams(format!(
1112 "subject must be at://<did>/{expected}/<rkey>, got collection {c}"
1113 ))),
1114 SubjectShape::OneOfCollections(allowed) if allowed.contains(&c) => {
1115 require_rkey(uri, c)?;
1116 Ok(SubjectRef::Uri(uri.clone()))
1117 }
1118 SubjectShape::OneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!(
1119 "subject must be at://<did>/<nsid>/<rkey> with nsid in [{}], got collection {c}",
1120 allowed.join(", "),
1121 ))),
1122 SubjectShape::BareDidOrOneOfCollections(allowed) if allowed.contains(&c) => {
1123 require_rkey(uri, c)?;
1124 Ok(SubjectRef::Uri(uri.clone()))
1125 }
1126 SubjectShape::BareDidOrOneOfCollections(allowed) => Err(XrpcError::InvalidParams(format!(
1127 "subject must be a bare did or at://<did>/<nsid>/<rkey> with nsid in [{}], got collection {c}",
1128 allowed.join(", "),
1129 ))),
1130 SubjectShape::AnyAtUri => Ok(SubjectRef::Uri(uri.clone())),
1131 }
1132}
1133
1134fn require_rkey(uri: &AtUri<DefaultStr>, expected: &str) -> Result<(), XrpcError> {
1135 uri.rkey().map(|_| ()).ok_or_else(|| {
1136 XrpcError::InvalidParams(format!(
1137 "subject must be at://<did>/{expected}/<rkey>; missing rkey"
1138 ))
1139 })
1140}
1141
1142fn parse_cursor(raw: Option<&str>) -> Result<PageCursor, XrpcError> {
1143 PageCursor::from_token(raw)
1144 .map_err(|e: CursorParseError| XrpcError::InvalidParams(format!("cursor: {e}")))
1145}
1146
1147fn parse_limit(raw: Option<u32>) -> Result<PageLimit, XrpcError> {
1148 PageLimit::new(raw.unwrap_or(DEFAULT_LIMIT))
1149 .map_err(|e| XrpcError::InvalidParams(format!("limit: {e}")))
1150}
1151
1152pub(crate) fn at_uri_owned_by(uri: &AtUri<DefaultStr>, author: &Did<DefaultStr>) -> bool {
1153 match uri.authority() {
1154 AtIdentifier::Did(d) => d.as_ref() == author.as_ref(),
1155 AtIdentifier::Handle(_) => false,
1156 }
1157}
1158
1159async fn resolve_for_view(
1160 state: &AppState,
1161 expected_nsid: &Nsid<DefaultStr>,
1162 uri: AtUri<DefaultStr>,
1163) -> Result<Arc<RecordBody>, XrpcError> {
1164 let raw = uri.as_ref().to_owned();
1165 resolve(state, ExpectedNsid::new(expected_nsid.clone()), uri)
1166 .await
1167 .map(|(body, _did)| body)
1168 .map_err(|e| match e {
1169 XrpcError::NotFound => XrpcError::UpstreamGone(raw),
1170 other => other,
1171 })
1172}
1173
1174async fn resolve(
1175 state: &AppState,
1176 expected: ExpectedNsid,
1177 uri: AtUri<DefaultStr>,
1178) -> Result<(Arc<RecordBody>, Did<DefaultStr>), XrpcError> {
1179 let collection = uri
1180 .collection()
1181 .ok_or_else(|| XrpcError::InvalidParams("uri missing collection".into()))?;
1182 if !expected.accepts(collection.as_ref()) {
1183 return Err(XrpcError::InvalidParams(format!(
1184 "collection mismatch: expected {}, got {}",
1185 expected.as_str(),
1186 collection.as_ref()
1187 )));
1188 }
1189 let rkey = uri
1190 .rkey()
1191 .ok_or_else(|| XrpcError::InvalidParams("uri missing rkey".into()))?;
1192 let did_ref = match uri.authority() {
1193 AtIdentifier::Did(d) => d,
1194 AtIdentifier::Handle(_) => {
1195 return Err(XrpcError::InvalidParams(
1196 "uri authority must be a did, not a handle".into(),
1197 ));
1198 }
1199 };
1200 let did: Did<DefaultStr> = did_ref.clone().into_static();
1201
1202 if let Some(hit) = state.records.get(&uri) {
1203 return Ok((hit, did));
1204 }
1205 let body = state
1206 .slingshot
1207 .get_record(&did_ref, &collection, &rkey)
1208 .await
1209 .map_err(map_slingshot)?;
1210 verify_type_tag(&body, &expected)?;
1211 state.records.put(uri, body.clone());
1212 Ok((body, did))
1213}
1214
1215#[derive(Deserialize)]
1216struct TypeTag<'a> {
1217 #[serde(rename = "$type", borrow)]
1218 ty: &'a str,
1219}
1220
1221fn verify_type_tag(body: &RecordBody, expected: &ExpectedNsid) -> Result<(), XrpcError> {
1222 let bytes = body.value.as_ref();
1223 let ty: std::borrow::Cow<'_, str> = match serde_json::from_slice::<TypeTag>(bytes) {
1224 Ok(t) => std::borrow::Cow::Borrowed(t.ty),
1225 Err(_) => {
1226 let value: serde_json::Value = serde_json::from_slice(bytes)
1227 .map_err(|e| XrpcError::InvalidRecord(format!("$type peek: {e}")))?;
1228 value
1229 .as_object()
1230 .and_then(|m| m.get("$type"))
1231 .and_then(|v| v.as_str())
1232 .map(|s| std::borrow::Cow::Owned(s.to_owned()))
1233 .ok_or_else(|| XrpcError::InvalidRecord("$type peek: missing $type field".into()))?
1234 }
1235 };
1236 if !expected.accepts(ty.as_ref()) {
1237 return Err(XrpcError::InvalidRecord(format!(
1238 "$type mismatch: expected {}, got {}",
1239 expected.as_str(),
1240 ty
1241 )));
1242 }
1243 Ok(())
1244}
1245
1246fn wire_type_nsid(bytes: &[u8]) -> Option<Nsid<DefaultStr>> {
1247 let ty = serde_json::from_slice::<TypeTag>(bytes).ok()?.ty;
1248 Nsid::<DefaultStr>::new_owned(ty).ok()
1249}
1250
1251async fn deserialize_or_upgrade<V>(
1252 state: &AppState,
1253 nsid: &Nsid<DefaultStr>,
1254 bytes: &[u8],
1255) -> Result<V, XrpcError>
1256where
1257 V: serde::de::DeserializeOwned,
1258{
1259 match serde_json::from_slice::<V>(bytes) {
1260 Ok(v) => Ok(v),
1261 Err(canon_err) => {
1262 let normalized = normalize_record_fields(bytes);
1263 let working: &[u8] = normalized.as_deref().unwrap_or(bytes);
1264 if normalized.is_some()
1265 && let Ok(v) = serde_json::from_slice::<V>(working)
1266 {
1267 return Ok(v);
1268 }
1269 let scrubbed = scrub_record_bytes(nsid, working);
1270 let retry_bytes: &[u8] = scrubbed.as_deref().unwrap_or(working);
1271 if scrubbed.is_some()
1272 && let Ok(v) = serde_json::from_slice::<V>(retry_bytes)
1273 {
1274 return Ok(v);
1275 }
1276 let wire_nsid = wire_type_nsid(retry_bytes).unwrap_or_else(|| nsid.clone());
1277 match upgrade_wire_bytes(&wire_nsid, retry_bytes, &state.resolver).await {
1278 Ok(canon_bytes) => serde_json::from_slice(&canon_bytes)
1279 .map_err(|e| XrpcError::InvalidRecord(e.to_string())),
1280 Err(_) => Err(XrpcError::InvalidRecord(canon_err.to_string())),
1281 }
1282 }
1283 }
1284}
1285
1286async fn fetch_from_uri<R, V>(
1287 state: &AppState,
1288 uri: AtUri<DefaultStr>,
1289) -> Result<(Arc<RecordBody>, V), XrpcError>
1290where
1291 R: XrpcResp,
1292 V: serde::de::DeserializeOwned + NormalizeRepoRefs,
1293{
1294 let raw = uri.as_str().to_owned();
1295 let nsid = nsid_static(R::NSID);
1296 let (body, _did) = resolve(state, ExpectedNsid::new(nsid.clone()), uri).await?;
1297 let value: V = deserialize_or_upgrade(state, &nsid, &body.value).await?;
1298 let value = value
1299 .normalize(&state.resolver)
1300 .await
1301 .ok_or(XrpcError::UpstreamGone(raw))?;
1302 Ok((body, value))
1303}
1304
1305async fn fetch<R, V>(
1306 state: &AppState,
1307 uri: &AtUri<DefaultStr>,
1308) -> Result<(Arc<RecordBody>, V), XrpcError>
1309where
1310 R: XrpcResp,
1311 V: serde::de::DeserializeOwned + NormalizeRepoRefs,
1312{
1313 fetch_from_uri::<R, V>(state, uri.clone()).await
1314}
1315
1316async fn get_repo(
1317 State(state): State<AppState>,
1318 XrpcQuery(q): XrpcQuery<GetRepoQuery>,
1319) -> Result<Json<RepoGetRecordOutput<DefaultStr>>, XrpcError> {
1320 let (body, value) = fetch::<RepoRecord, Repo<DefaultStr>>(&state, &q.repo).await?;
1321 Ok(Json(RepoGetRecordOutput {
1322 cid: Some(body.cid.clone()),
1323 uri: body.uri.clone(),
1324 value,
1325 }))
1326}
1327
1328async fn get_repo_by_repo_did(
1329 State(state): State<AppState>,
1330 XrpcQuery(q): XrpcQuery<GetRepoByRepoDidQuery>,
1331) -> Result<Json<RepoGetRecordOutput<DefaultStr>>, XrpcError> {
1332 let ident = state
1333 .resolver
1334 .lookup_by_repo_did(&q.repo_did)
1335 .await
1336 .ok_or(XrpcError::NotFound)?;
1337 let uri = AtUri::<DefaultStr>::from_parts_owned(
1338 ident.owner.as_str(),
1339 RepoRecord::NSID,
1340 ident.rkey.as_str(),
1341 )
1342 .expect("Did and Rkey newtypes already validated, at-uri assembly cannot fail");
1343 let (body, value) = fetch_from_uri::<RepoRecord, Repo<DefaultStr>>(&state, uri).await?;
1344 Ok(Json(RepoGetRecordOutput {
1345 cid: Some(body.cid.clone()),
1346 uri: body.uri.clone(),
1347 value,
1348 }))
1349}
1350
1351async fn get_profile(
1352 State(state): State<AppState>,
1353 XrpcQuery(q): XrpcQuery<GetProfileQuery>,
1354) -> Result<Json<ProfileGetRecordOutput<DefaultStr>>, XrpcError> {
1355 let (body, value) = fetch::<ProfileRecord, Profile<DefaultStr>>(&state, &q.actor).await?;
1356 Ok(Json(ProfileGetRecordOutput {
1357 cid: Some(body.cid.clone()),
1358 uri: body.uri.clone(),
1359 value,
1360 }))
1361}
1362
1363async fn get_issue(
1364 State(state): State<AppState>,
1365 XrpcQuery(q): XrpcQuery<GetIssueQuery>,
1366) -> Result<Json<IssueGetRecordOutput<DefaultStr>>, XrpcError> {
1367 let (body, value) = fetch::<IssueRecord, Issue<DefaultStr>>(&state, &q.issue).await?;
1368 Ok(Json(IssueGetRecordOutput {
1369 cid: Some(body.cid.clone()),
1370 uri: body.uri.clone(),
1371 value,
1372 }))
1373}
1374
1375async fn get_pull(
1376 State(state): State<AppState>,
1377 XrpcQuery(q): XrpcQuery<GetPullQuery>,
1378) -> Result<Json<PullGetRecordOutput<DefaultStr>>, XrpcError> {
1379 let (body, value) = fetch::<PullRecord, Pull<DefaultStr>>(&state, &q.pull).await?;
1380 Ok(Json(PullGetRecordOutput {
1381 cid: Some(body.cid.clone()),
1382 uri: body.uri.clone(),
1383 value,
1384 }))
1385}
1386
1387async fn get_repos(
1388 State(state): State<AppState>,
1389 RawQuery(query): RawQuery,
1390) -> Result<Response, XrpcError> {
1391 let uris = collect_repeated(query.as_deref(), BULK_REPOS_KEY);
1392 bulk_fetch::<RepoRecord, Repo<DefaultStr>>(&state, uris).await
1393}
1394
1395async fn get_profiles(
1396 State(state): State<AppState>,
1397 RawQuery(query): RawQuery,
1398) -> Result<Response, XrpcError> {
1399 let uris = collect_repeated(query.as_deref(), BULK_PROFILES_KEY);
1400 bulk_fetch::<ProfileRecord, Profile<DefaultStr>>(&state, uris).await
1401}
1402
1403async fn get_issues(
1404 State(state): State<AppState>,
1405 RawQuery(query): RawQuery,
1406) -> Result<Response, XrpcError> {
1407 let uris = collect_repeated(query.as_deref(), BULK_ISSUES_KEY);
1408 bulk_fetch::<IssueRecord, Issue<DefaultStr>>(&state, uris).await
1409}
1410
1411async fn get_pulls(
1412 State(state): State<AppState>,
1413 RawQuery(query): RawQuery,
1414) -> Result<Response, XrpcError> {
1415 let uris = collect_repeated(query.as_deref(), BULK_PULLS_KEY);
1416 bulk_fetch::<PullRecord, Pull<DefaultStr>>(&state, uris).await
1417}
1418
1419const BULK_REPOS_KEY: &str = "repos";
1420const BULK_PROFILES_KEY: &str = "actors";
1421const BULK_ISSUES_KEY: &str = "issues";
1422const BULK_PULLS_KEY: &str = "pulls";
1423const BULK_LIMIT: usize = 50;
1424
1425fn collect_repeated(query: Option<&str>, key: &str) -> Vec<String> {
1426 let Some(q) = query else {
1427 return Vec::new();
1428 };
1429 form_urlencoded::parse(q.as_bytes())
1430 .filter_map(|(k, v)| (k == key).then(|| v.into_owned()))
1431 .collect()
1432}
1433
1434async fn hydrate_record_view<V>(
1435 state: &AppState,
1436 nsid: &Nsid<DefaultStr>,
1437 uri: AtUri<DefaultStr>,
1438 sort_micros: u64,
1439) -> Result<Option<RecordView<V>>, XrpcError>
1440where
1441 V: serde::de::DeserializeOwned + NormalizeRepoRefs,
1442{
1443 if let Some(source) = decode_knot_owned_source(&uri) {
1444 return synthesize_knot_owned_view::<V>(state, uri, source, sort_micros).await;
1445 }
1446 let body = resolve_for_view(state, nsid, uri).await?;
1447 let value: V = deserialize_or_upgrade::<V>(state, nsid, &body.value).await?;
1448 let Some(value) = value.normalize(&state.resolver).await else {
1449 return Ok(None);
1450 };
1451 Ok(Some(RecordView {
1452 uri: body.uri.clone(),
1453 cid: Some(body.cid.clone()),
1454 value,
1455 }))
1456}
1457
1458async fn synthesize_knot_owned_view<V>(
1459 state: &AppState,
1460 uri: AtUri<DefaultStr>,
1461 source: KnotOwnedSource,
1462 sort_micros: u64,
1463) -> Result<Option<RecordView<V>>, XrpcError>
1464where
1465 V: serde::de::DeserializeOwned + NormalizeRepoRefs,
1466{
1467 let Some(body) = synth_knot_owned_value(source, sort_micros) else {
1468 return Ok(None);
1469 };
1470 let Ok(value) = serde_json::from_value::<V>(body) else {
1471 return Ok(None);
1472 };
1473 let Some(value) = value.normalize(&state.resolver).await else {
1474 return Ok(None);
1475 };
1476 Ok(Some(RecordView {
1477 uri,
1478 cid: None,
1479 value,
1480 }))
1481}
1482
1483fn synth_knot_owned_value(source: KnotOwnedSource, sort_micros: u64) -> Option<serde_json::Value> {
1484 let created_at = micros_to_rfc3339(sort_micros)?;
1485 match source {
1486 KnotOwnedSource::Member { knot, subject } => Some(serde_json::json!({
1487 "domain": knot_did_host(&knot)?,
1488 "subject": subject.as_ref(),
1489 "createdAt": created_at,
1490 })),
1491 KnotOwnedSource::Collaborator { repo, subject } => Some(serde_json::json!({
1492 "repo": repo.as_ref(),
1493 "subject": subject.as_ref(),
1494 "createdAt": created_at,
1495 })),
1496 }
1497}
1498
1499fn micros_to_rfc3339(micros: u64) -> Option<String> {
1500 let micros = i64::try_from(micros).ok()?;
1501 chrono::DateTime::from_timestamp_micros(micros)
1502 .map(|dt| dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true))
1503}
1504
1505#[derive(Clone, Copy)]
1506enum HitProvenance {
1507 ClientSupplied,
1508 Indexed,
1509}
1510
1511fn is_index_evictable(err: &XrpcError) -> bool {
1512 matches!(
1513 err,
1514 XrpcError::NotFound
1515 | XrpcError::UpstreamGone(_)
1516 | XrpcError::InvalidRecord(_)
1517 | XrpcError::InvalidParams(_)
1518 )
1519}
1520
1521fn drop_unhydratable<V>(
1522 provenance: HitProvenance,
1523 nsid: &Nsid<DefaultStr>,
1524 uri: &AtUri<DefaultStr>,
1525 result: Result<Option<V>, XrpcError>,
1526) -> Result<Option<V>, XrpcError> {
1527 match result {
1528 Ok(view) => Ok(view),
1529 Err(err @ (XrpcError::NotFound | XrpcError::UpstreamGone(_))) => {
1530 tracing::debug!(
1531 uri = %uri,
1532 nsid = %nsid.as_ref(),
1533 error = %err,
1534 "dropping gone hit during hydration",
1535 );
1536 Ok(None)
1537 }
1538 Err(err @ XrpcError::UpstreamUnavailable(_)) => {
1539 tracing::warn!(
1540 uri = %uri,
1541 nsid = %nsid.as_ref(),
1542 error = %err,
1543 "dropping hit, upstream unavailable during hydration",
1544 );
1545 Ok(None)
1546 }
1547 Err(err @ XrpcError::InvalidRecord(_)) => {
1548 tracing::warn!(
1549 uri = %uri,
1550 nsid = %nsid.as_ref(),
1551 error = %err,
1552 "dropping invalid hit during hydration",
1553 );
1554 Ok(None)
1555 }
1556 Err(err @ XrpcError::InvalidParams(_)) => match provenance {
1557 HitProvenance::ClientSupplied => Err(err),
1558 HitProvenance::Indexed => {
1559 tracing::warn!(
1560 uri = %uri,
1561 nsid = %nsid.as_ref(),
1562 error = %err,
1563 "dropping malformed indexed hit during hydration",
1564 );
1565 Ok(None)
1566 }
1567 },
1568 Err(err @ (XrpcError::Internal(_) | XrpcError::Overloaded)) => Err(err),
1569 }
1570}
1571
1572fn hydrate_stream<T, Fut, V>(
1573 items: impl IntoIterator<Item = T>,
1574 produce: impl FnMut(T) -> Fut,
1575) -> impl Stream<Item = Result<V, XrpcError>>
1576where
1577 Fut: Future<Output = Result<Option<V>, XrpcError>>,
1578{
1579 stream::iter(items)
1580 .map(produce)
1581 .buffered(FETCH_CONCURRENCY)
1582 .try_filter_map(|view| async move { Ok(view) })
1583}
1584
1585fn hydrate_record_stream<V>(
1586 state: &AppState,
1587 nsid: Nsid<DefaultStr>,
1588 items: Vec<EdgeItem>,
1589 provenance: HitProvenance,
1590) -> impl Stream<Item = Result<RecordView<V>, XrpcError>> + Send + 'static
1591where
1592 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static,
1593{
1594 let owned = state.clone();
1595 hydrate_stream(items, move |item| {
1596 let owned = owned.clone();
1597 let nsid = nsid.clone();
1598 async move {
1599 let EdgeItem { uri, sort_micros } = item;
1600 let result = hydrate_record_view::<V>(&owned, &nsid, uri.clone(), sort_micros).await;
1601 if matches!(provenance, HitProvenance::Indexed)
1602 && let Err(err) = &result
1603 && is_index_evictable(err)
1604 {
1605 owned.edges.remove_source(&uri);
1606 }
1607 drop_unhydratable(provenance, &nsid, &uri, result)
1608 }
1609 })
1610}
1611
1612enum PagePhase {
1613 Head,
1614 Body { first: bool },
1615 Done,
1616}
1617
1618struct PageState<S> {
1619 items: std::pin::Pin<Box<S>>,
1620 phase: PagePhase,
1621 array_key: &'static str,
1622 tail: Vec<u8>,
1623 permit: Option<HeavyPermit>,
1624}
1625
1626fn paged_tail(cursor: Option<String>) -> Vec<u8> {
1627 let encoded = serde_json::to_string(&cursor).unwrap_or_else(|_| "null".to_owned());
1628 format!("],\"cursor\":{encoded}}}").into_bytes()
1629}
1630
1631fn unpaged_tail() -> Vec<u8> {
1632 b"]}".to_vec()
1633}
1634
1635fn json_stream<V, S>(
1636 array_key: &'static str,
1637 items: S,
1638 tail: Vec<u8>,
1639 permit: Option<HeavyPermit>,
1640) -> Response
1641where
1642 V: Serialize + Send + 'static,
1643 S: Stream<Item = Result<V, XrpcError>> + Send + 'static,
1644{
1645 let init = PageState {
1646 items: Box::pin(items),
1647 phase: PagePhase::Head,
1648 array_key,
1649 tail,
1650 permit,
1651 };
1652 let chunks = stream::unfold(init, |mut st| async move {
1653 match st.phase {
1654 PagePhase::Head => {
1655 let head = format!("{{\"{}\":[", st.array_key).into_bytes();
1656 st.phase = PagePhase::Body { first: true };
1657 Some((Ok::<Vec<u8>, Infallible>(head), st))
1658 }
1659 PagePhase::Body { first } => match st.items.next().await {
1660 Some(Ok(view)) => match serde_json::to_vec(&view) {
1661 Ok(encoded) => {
1662 let mut chunk = Vec::with_capacity(encoded.len() + 1);
1663 if !first {
1664 chunk.push(b',');
1665 }
1666 chunk.extend_from_slice(&encoded);
1667 st.phase = PagePhase::Body { first: false };
1668 Some((Ok(chunk), st))
1669 }
1670 Err(e) => {
1671 tracing::warn!(error = %e, "skipping hit, serialize failed mid-stream");
1672 Some((Ok(Vec::new()), st))
1673 }
1674 },
1675 Some(Err(e)) => {
1676 tracing::warn!(error = %e, "ending page early, hydration failed mid-stream");
1677 let tail = std::mem::take(&mut st.tail);
1678 st.phase = PagePhase::Done;
1679 Some((Ok(tail), st))
1680 }
1681 None => {
1682 let tail = std::mem::take(&mut st.tail);
1683 st.phase = PagePhase::Done;
1684 Some((Ok(tail), st))
1685 }
1686 },
1687 PagePhase::Done => {
1688 drop(st.permit.take());
1689 None
1690 }
1691 }
1692 });
1693 (
1694 [(CONTENT_TYPE, "application/json")],
1695 Body::from_stream(chunks),
1696 )
1697 .into_response()
1698}
1699
1700async fn bulk_fetch<R, V>(state: &AppState, uris: Vec<String>) -> Result<Response, XrpcError>
1701where
1702 R: XrpcResp,
1703 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static,
1704{
1705 if uris.is_empty() {
1706 return Err(XrpcError::InvalidParams("at least one uri required".into()));
1707 }
1708 if uris.len() > BULK_LIMIT {
1709 return Err(XrpcError::InvalidParams(format!(
1710 "at most {BULK_LIMIT} uris per request"
1711 )));
1712 }
1713 let parsed: Vec<AtUri<DefaultStr>> = uris
1714 .iter()
1715 .map(|s| parse_uri(s))
1716 .collect::<Result<_, _>>()?;
1717 let nsid = nsid_static(R::NSID);
1718 if let Some(bad) = parsed
1719 .iter()
1720 .find(|uri| uri.collection().is_none_or(|c| c.as_ref() != nsid.as_ref()))
1721 {
1722 return Err(XrpcError::InvalidParams(format!(
1723 "uri collection must be {}, got {}",
1724 nsid.as_ref(),
1725 bad.as_ref()
1726 )));
1727 }
1728 let permit = state.heavy_permit()?;
1729 let items = parsed
1730 .into_iter()
1731 .map(|uri| EdgeItem {
1732 uri,
1733 sort_micros: 0,
1734 })
1735 .collect();
1736 let views = hydrate_record_stream::<V>(state, nsid, items, HitProvenance::ClientSupplied);
1737 Ok(json_stream::<RecordView<V>, _>(
1738 "items",
1739 views,
1740 unpaged_tail(),
1741 permit,
1742 ))
1743}
1744
1745fn record_edge_page<R, F>(
1746 state: &AppState,
1747 q: &TypedListQuery<F>,
1748) -> Result<(EdgePage, Nsid<DefaultStr>), XrpcError>
1749where
1750 R: XrpcResp + HasSubject,
1751 F: ListFilter,
1752{
1753 let subject = parse_subject(&q.subject, R::SHAPE)?;
1754 let cursor = parse_cursor(q.cursor.as_deref())?;
1755 let limit = parse_limit(q.limit)?;
1756 let dir = q.dir();
1757 let nsid = nsid_static(R::NSID);
1758 let page = if q.filter.is_identity() {
1759 let key = EdgeKey::new(nsid.clone(), subject);
1760 state.edges.list(&key, cursor, limit, dir)
1761 } else {
1762 let pred = q.filter.predicate(state, &subject);
1763 let key = EdgeKey::new(nsid.clone(), subject);
1764 state.edges.list_filtered(&key, cursor, limit, dir, pred)
1765 };
1766 Ok((page, nsid))
1767}
1768
1769async fn list_records<R, V, F>(
1770 state: &AppState,
1771 q: TypedListQuery<F>,
1772) -> Result<Response, XrpcError>
1773where
1774 R: XrpcResp + HasSubject,
1775 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static,
1776 F: ListFilter,
1777{
1778 let (page, nsid) = record_edge_page::<R, F>(state, &q)?;
1779 let permit = state.heavy_permit()?;
1780 let views = hydrate_record_stream::<V>(state, nsid, page.items, HitProvenance::Indexed);
1781 Ok(json_stream::<RecordView<V>, _>(
1782 "items",
1783 views,
1784 paged_tail(page.next.map(PageToken::encode_token)),
1785 permit,
1786 ))
1787}
1788
1789fn count_for<R: XrpcResp + HasSubject>(
1790 state: &AppState,
1791 q: CountQuery,
1792) -> Result<CountResponse, XrpcError> {
1793 let subject = parse_subject(&q.subject, R::SHAPE)?;
1794 let key = EdgeKey::new(nsid_static(R::NSID), subject);
1795 Ok(CountResponse {
1796 count: state.edges.count(&key),
1797 distinct_authors: state.edges.count_distinct_authors(&key),
1798 })
1799}
1800
1801fn mirror_edge_page<M, F>(
1802 state: &AppState,
1803 q: &TypedListQuery<F>,
1804) -> Result<(EdgePage, Nsid<DefaultStr>), XrpcError>
1805where
1806 M: MirrorOf,
1807 F: ListFilter,
1808{
1809 let subject = parse_subject(&q.subject, M::SHAPE)?;
1810 let cursor = parse_cursor(q.cursor.as_deref())?;
1811 let limit = parse_limit(q.limit)?;
1812 let dir = q.dir();
1813 let edge_nsid = nsid_static(M::EDGE_KIND);
1814 let page = if q.filter.is_identity() {
1815 let key = EdgeKey::new(edge_nsid, subject);
1816 state.edges.list(&key, cursor, limit, dir)
1817 } else {
1818 let pred = q.filter.predicate(state, &subject);
1819 let key = EdgeKey::new(edge_nsid, subject);
1820 state.edges.list_filtered(&key, cursor, limit, dir, pred)
1821 };
1822 let record_nsid = nsid_static(<M::Record as XrpcResp>::NSID);
1823 Ok((page, record_nsid))
1824}
1825
1826async fn list_mirror<M, V, F>(state: &AppState, q: TypedListQuery<F>) -> Result<Response, XrpcError>
1827where
1828 M: MirrorOf,
1829 V: serde::de::DeserializeOwned + Serialize + NormalizeRepoRefs + Send + 'static,
1830 F: ListFilter,
1831{
1832 let (page, record_nsid) = mirror_edge_page::<M, F>(state, &q)?;
1833 let permit = state.heavy_permit()?;
1834 let views = hydrate_record_stream::<V>(state, record_nsid, page.items, HitProvenance::Indexed);
1835 Ok(json_stream::<RecordView<V>, _>(
1836 "items",
1837 views,
1838 paged_tail(page.next.map(PageToken::encode_token)),
1839 permit,
1840 ))
1841}
1842
1843fn count_mirror<M: MirrorOf>(state: &AppState, q: CountQuery) -> Result<CountResponse, XrpcError> {
1844 let subject = parse_subject(&q.subject, M::SHAPE)?;
1845 let key = EdgeKey::new(nsid_static(M::EDGE_KIND), subject);
1846 Ok(CountResponse {
1847 count: state.edges.count(&key),
1848 distinct_authors: state.edges.count_distinct_authors(&key),
1849 })
1850}
1851
1852async fn list_stars(
1853 State(state): State<AppState>,
1854 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1855) -> Result<Response, XrpcError> {
1856 list_records::<StarRecord, Star<DefaultStr>, _>(&state, q).await
1857}
1858
1859async fn count_stars(
1860 State(state): State<AppState>,
1861 XrpcQuery(q): XrpcQuery<CountQuery>,
1862) -> Result<Json<CountResponse>, XrpcError> {
1863 count_for::<StarRecord>(&state, q).map(Json)
1864}
1865
1866async fn list_follows(
1867 State(state): State<AppState>,
1868 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1869) -> Result<Response, XrpcError> {
1870 list_records::<FollowRecord, Follow<DefaultStr>, _>(&state, q).await
1871}
1872
1873async fn count_follows(
1874 State(state): State<AppState>,
1875 XrpcQuery(q): XrpcQuery<CountQuery>,
1876) -> Result<Json<CountResponse>, XrpcError> {
1877 count_for::<FollowRecord>(&state, q).map(Json)
1878}
1879
1880async fn list_issues(
1881 State(state): State<AppState>,
1882 XrpcQuery(q): XrpcQuery<TypedListQuery<IssueFilter>>,
1883) -> Result<Response, XrpcError> {
1884 let (page, nsid) = record_edge_page::<IssueRecord, _>(&state, &q)?;
1885 let permit = state.heavy_permit()?;
1886 let owned = state.clone();
1887 let items = hydrate_record_stream::<Issue<DefaultStr>>(
1888 &state,
1889 nsid,
1890 page.items,
1891 HitProvenance::Indexed,
1892 )
1893 .map(move |view| view.map(|v| enrich_issue_view(&owned, v)));
1894 Ok(json_stream::<StatefulItem<Issue<DefaultStr>>, _>(
1895 "items",
1896 items,
1897 paged_tail(page.next.map(PageToken::encode_token)),
1898 permit,
1899 ))
1900}
1901
1902async fn count_issues(
1903 State(state): State<AppState>,
1904 XrpcQuery(q): XrpcQuery<CountQuery>,
1905) -> Result<Json<CountResponse>, XrpcError> {
1906 count_for::<IssueRecord>(&state, q).map(Json)
1907}
1908
1909async fn list_pulls(
1910 State(state): State<AppState>,
1911 XrpcQuery(q): XrpcQuery<TypedListQuery<PullFilter>>,
1912) -> Result<Response, XrpcError> {
1913 let (page, nsid) = record_edge_page::<PullRecord, _>(&state, &q)?;
1914 let permit = state.heavy_permit()?;
1915 let owned = state.clone();
1916 let items =
1917 hydrate_record_stream::<Pull<DefaultStr>>(&state, nsid, page.items, HitProvenance::Indexed)
1918 .map(move |view| view.map(|v| enrich_pull_view(&owned, v)));
1919 Ok(json_stream::<StatefulItem<Pull<DefaultStr>>, _>(
1920 "items",
1921 items,
1922 paged_tail(page.next.map(PageToken::encode_token)),
1923 permit,
1924 ))
1925}
1926
1927async fn count_pulls(
1928 State(state): State<AppState>,
1929 XrpcQuery(q): XrpcQuery<CountQuery>,
1930) -> Result<Json<CountResponse>, XrpcError> {
1931 count_for::<PullRecord>(&state, q).map(Json)
1932}
1933
1934async fn list_feed_comments(
1935 State(state): State<AppState>,
1936 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1937) -> Result<Response, XrpcError> {
1938 list_records::<FeedCommentRecord, FeedComment<DefaultStr>, _>(&state, q).await
1939}
1940
1941async fn count_feed_comments(
1942 State(state): State<AppState>,
1943 XrpcQuery(q): XrpcQuery<CountQuery>,
1944) -> Result<Json<CountResponse>, XrpcError> {
1945 count_for::<FeedCommentRecord>(&state, q).map(Json)
1946}
1947
1948async fn list_reactions(
1949 State(state): State<AppState>,
1950 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1951) -> Result<Response, XrpcError> {
1952 list_records::<ReactionRecord, Reaction<DefaultStr>, _>(&state, q).await
1953}
1954
1955async fn count_reactions(
1956 State(state): State<AppState>,
1957 XrpcQuery(q): XrpcQuery<CountQuery>,
1958) -> Result<Json<CountResponse>, XrpcError> {
1959 count_for::<ReactionRecord>(&state, q).map(Json)
1960}
1961
1962async fn list_ref_updates(
1963 State(state): State<AppState>,
1964 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1965) -> Result<Response, XrpcError> {
1966 list_records::<RefUpdateRecord, RefUpdate<DefaultStr>, _>(&state, q).await
1967}
1968
1969async fn count_ref_updates(
1970 State(state): State<AppState>,
1971 XrpcQuery(q): XrpcQuery<CountQuery>,
1972) -> Result<Json<CountResponse>, XrpcError> {
1973 count_for::<RefUpdateRecord>(&state, q).map(Json)
1974}
1975
1976async fn list_collaborators(
1977 State(state): State<AppState>,
1978 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1979) -> Result<Response, XrpcError> {
1980 list_records::<CollaboratorRecord, Collaborator<DefaultStr>, _>(&state, q).await
1981}
1982
1983async fn count_collaborators(
1984 State(state): State<AppState>,
1985 XrpcQuery(q): XrpcQuery<CountQuery>,
1986) -> Result<Json<CountResponse>, XrpcError> {
1987 count_for::<CollaboratorRecord>(&state, q).map(Json)
1988}
1989
1990async fn list_issue_states(
1991 State(state): State<AppState>,
1992 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
1993) -> Result<Response, XrpcError> {
1994 list_records::<IssueStateRecord, IssueState<DefaultStr>, _>(&state, q).await
1995}
1996
1997async fn count_issue_states(
1998 State(state): State<AppState>,
1999 XrpcQuery(q): XrpcQuery<CountQuery>,
2000) -> Result<Json<CountResponse>, XrpcError> {
2001 count_for::<IssueStateRecord>(&state, q).map(Json)
2002}
2003
2004async fn list_pull_statuses(
2005 State(state): State<AppState>,
2006 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2007) -> Result<Response, XrpcError> {
2008 list_records::<PullStatusRecord, PullStatus<DefaultStr>, _>(&state, q).await
2009}
2010
2011async fn count_pull_statuses(
2012 State(state): State<AppState>,
2013 XrpcQuery(q): XrpcQuery<CountQuery>,
2014) -> Result<Json<CountResponse>, XrpcError> {
2015 count_for::<PullStatusRecord>(&state, q).map(Json)
2016}
2017
2018async fn list_repos(
2019 State(state): State<AppState>,
2020 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2021) -> Result<Response, XrpcError> {
2022 list_records::<RepoRecord, Repo<DefaultStr>, _>(&state, q).await
2023}
2024
2025async fn count_repos(
2026 State(state): State<AppState>,
2027 XrpcQuery(q): XrpcQuery<CountQuery>,
2028) -> Result<Json<CountResponse>, XrpcError> {
2029 count_for::<RepoRecord>(&state, q).map(Json)
2030}
2031
2032async fn list_knots(
2033 State(state): State<AppState>,
2034 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2035) -> Result<Response, XrpcError> {
2036 list_records::<KnotRecord, Knot<DefaultStr>, _>(&state, q).await
2037}
2038
2039async fn count_knots(
2040 State(state): State<AppState>,
2041 XrpcQuery(q): XrpcQuery<CountQuery>,
2042) -> Result<Json<CountResponse>, XrpcError> {
2043 count_for::<KnotRecord>(&state, q).map(Json)
2044}
2045
2046async fn list_spindles(
2047 State(state): State<AppState>,
2048 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2049) -> Result<Response, XrpcError> {
2050 list_records::<SpindleRecord, Spindle<DefaultStr>, _>(&state, q).await
2051}
2052
2053async fn count_spindles(
2054 State(state): State<AppState>,
2055 XrpcQuery(q): XrpcQuery<CountQuery>,
2056) -> Result<Json<CountResponse>, XrpcError> {
2057 count_for::<SpindleRecord>(&state, q).map(Json)
2058}
2059
2060async fn list_public_keys(
2061 State(state): State<AppState>,
2062 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2063) -> Result<Response, XrpcError> {
2064 list_records::<PublicKeyRecord, PublicKey<DefaultStr>, _>(&state, q).await
2065}
2066
2067async fn count_public_keys(
2068 State(state): State<AppState>,
2069 XrpcQuery(q): XrpcQuery<CountQuery>,
2070) -> Result<Json<CountResponse>, XrpcError> {
2071 count_for::<PublicKeyRecord>(&state, q).map(Json)
2072}
2073
2074async fn list_vouches(
2075 State(state): State<AppState>,
2076 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2077) -> Result<Response, XrpcError> {
2078 list_records::<VouchRecord, Vouch<DefaultStr>, _>(&state, q).await
2079}
2080
2081async fn count_vouches(
2082 State(state): State<AppState>,
2083 XrpcQuery(q): XrpcQuery<CountQuery>,
2084) -> Result<Json<CountResponse>, XrpcError> {
2085 count_for::<VouchRecord>(&state, q).map(Json)
2086}
2087
2088async fn list_stars_by(
2089 State(state): State<AppState>,
2090 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2091) -> Result<Response, XrpcError> {
2092 list_mirror::<StarBy, Star<DefaultStr>, _>(&state, q).await
2093}
2094async fn count_stars_by(
2095 State(state): State<AppState>,
2096 XrpcQuery(q): XrpcQuery<CountQuery>,
2097) -> Result<Json<CountResponse>, XrpcError> {
2098 count_mirror::<StarBy>(&state, q).map(Json)
2099}
2100
2101async fn list_reactions_by(
2102 State(state): State<AppState>,
2103 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2104) -> Result<Response, XrpcError> {
2105 list_mirror::<ReactionBy, Reaction<DefaultStr>, _>(&state, q).await
2106}
2107async fn count_reactions_by(
2108 State(state): State<AppState>,
2109 XrpcQuery(q): XrpcQuery<CountQuery>,
2110) -> Result<Json<CountResponse>, XrpcError> {
2111 count_mirror::<ReactionBy>(&state, q).map(Json)
2112}
2113
2114async fn list_follows_by(
2115 State(state): State<AppState>,
2116 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2117) -> Result<Response, XrpcError> {
2118 list_mirror::<FollowBy, Follow<DefaultStr>, _>(&state, q).await
2119}
2120async fn count_follows_by(
2121 State(state): State<AppState>,
2122 XrpcQuery(q): XrpcQuery<CountQuery>,
2123) -> Result<Json<CountResponse>, XrpcError> {
2124 count_mirror::<FollowBy>(&state, q).map(Json)
2125}
2126
2127async fn list_vouches_by(
2128 State(state): State<AppState>,
2129 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2130) -> Result<Response, XrpcError> {
2131 list_mirror::<VouchBy, Vouch<DefaultStr>, _>(&state, q).await
2132}
2133async fn count_vouches_by(
2134 State(state): State<AppState>,
2135 XrpcQuery(q): XrpcQuery<CountQuery>,
2136) -> Result<Json<CountResponse>, XrpcError> {
2137 count_mirror::<VouchBy>(&state, q).map(Json)
2138}
2139
2140async fn list_ref_updates_by(
2141 State(state): State<AppState>,
2142 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2143) -> Result<Response, XrpcError> {
2144 list_mirror::<RefUpdateBy, RefUpdate<DefaultStr>, _>(&state, q).await
2145}
2146async fn count_ref_updates_by(
2147 State(state): State<AppState>,
2148 XrpcQuery(q): XrpcQuery<CountQuery>,
2149) -> Result<Json<CountResponse>, XrpcError> {
2150 count_mirror::<RefUpdateBy>(&state, q).map(Json)
2151}
2152
2153async fn list_knot_members_by(
2154 State(state): State<AppState>,
2155 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2156) -> Result<Response, XrpcError> {
2157 list_mirror::<KnotMemberBy, KnotMember<DefaultStr>, _>(&state, q).await
2158}
2159async fn count_knot_members_by(
2160 State(state): State<AppState>,
2161 XrpcQuery(q): XrpcQuery<CountQuery>,
2162) -> Result<Json<CountResponse>, XrpcError> {
2163 count_mirror::<KnotMemberBy>(&state, q).map(Json)
2164}
2165
2166async fn list_label_ops_by(
2167 State(state): State<AppState>,
2168 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2169) -> Result<Response, XrpcError> {
2170 list_mirror::<LabelOpBy, LabelOp<DefaultStr>, _>(&state, q).await
2171}
2172async fn count_label_ops_by(
2173 State(state): State<AppState>,
2174 XrpcQuery(q): XrpcQuery<CountQuery>,
2175) -> Result<Json<CountResponse>, XrpcError> {
2176 count_mirror::<LabelOpBy>(&state, q).map(Json)
2177}
2178
2179async fn list_pipelines_by(
2180 State(state): State<AppState>,
2181 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2182) -> Result<Response, XrpcError> {
2183 list_mirror::<PipelineBy, Pipeline<DefaultStr>, _>(&state, q).await
2184}
2185async fn count_pipelines_by(
2186 State(state): State<AppState>,
2187 XrpcQuery(q): XrpcQuery<CountQuery>,
2188) -> Result<Json<CountResponse>, XrpcError> {
2189 count_mirror::<PipelineBy>(&state, q).map(Json)
2190}
2191
2192async fn list_pipeline_statuses_by(
2193 State(state): State<AppState>,
2194 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2195) -> Result<Response, XrpcError> {
2196 list_mirror::<PipelineStatusBy, PipelineStatus<DefaultStr>, _>(&state, q).await
2197}
2198async fn count_pipeline_statuses_by(
2199 State(state): State<AppState>,
2200 XrpcQuery(q): XrpcQuery<CountQuery>,
2201) -> Result<Json<CountResponse>, XrpcError> {
2202 count_mirror::<PipelineStatusBy>(&state, q).map(Json)
2203}
2204
2205async fn list_artifacts_by(
2206 State(state): State<AppState>,
2207 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2208) -> Result<Response, XrpcError> {
2209 list_mirror::<ArtifactBy, Artifact<DefaultStr>, _>(&state, q).await
2210}
2211async fn count_artifacts_by(
2212 State(state): State<AppState>,
2213 XrpcQuery(q): XrpcQuery<CountQuery>,
2214) -> Result<Json<CountResponse>, XrpcError> {
2215 count_mirror::<ArtifactBy>(&state, q).map(Json)
2216}
2217
2218async fn list_collaborators_by(
2219 State(state): State<AppState>,
2220 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2221) -> Result<Response, XrpcError> {
2222 list_mirror::<CollaboratorBy, Collaborator<DefaultStr>, _>(&state, q).await
2223}
2224async fn count_collaborators_by(
2225 State(state): State<AppState>,
2226 XrpcQuery(q): XrpcQuery<CountQuery>,
2227) -> Result<Json<CountResponse>, XrpcError> {
2228 count_mirror::<CollaboratorBy>(&state, q).map(Json)
2229}
2230
2231async fn list_issues_by(
2232 State(state): State<AppState>,
2233 XrpcQuery(q): XrpcQuery<TypedListQuery<IssueFilter>>,
2234) -> Result<Response, XrpcError> {
2235 let (page, nsid) = mirror_edge_page::<IssueBy, _>(&state, &q)?;
2236 let permit = state.heavy_permit()?;
2237 let owned = state.clone();
2238 let items = hydrate_record_stream::<Issue<DefaultStr>>(
2239 &state,
2240 nsid,
2241 page.items,
2242 HitProvenance::Indexed,
2243 )
2244 .map(move |view| view.map(|v| enrich_issue_view(&owned, v)));
2245 Ok(json_stream::<StatefulItem<Issue<DefaultStr>>, _>(
2246 "items",
2247 items,
2248 paged_tail(page.next.map(PageToken::encode_token)),
2249 permit,
2250 ))
2251}
2252async fn count_issues_by(
2253 State(state): State<AppState>,
2254 XrpcQuery(q): XrpcQuery<CountQuery>,
2255) -> Result<Json<CountResponse>, XrpcError> {
2256 count_mirror::<IssueBy>(&state, q).map(Json)
2257}
2258
2259async fn list_feed_comments_by(
2260 State(state): State<AppState>,
2261 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2262) -> Result<Response, XrpcError> {
2263 list_mirror::<FeedCommentBy, FeedComment<DefaultStr>, _>(&state, q).await
2264}
2265async fn count_feed_comments_by(
2266 State(state): State<AppState>,
2267 XrpcQuery(q): XrpcQuery<CountQuery>,
2268) -> Result<Json<CountResponse>, XrpcError> {
2269 count_mirror::<FeedCommentBy>(&state, q).map(Json)
2270}
2271
2272async fn list_issue_states_by(
2273 State(state): State<AppState>,
2274 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2275) -> Result<Response, XrpcError> {
2276 list_mirror::<IssueStateBy, IssueState<DefaultStr>, _>(&state, q).await
2277}
2278async fn count_issue_states_by(
2279 State(state): State<AppState>,
2280 XrpcQuery(q): XrpcQuery<CountQuery>,
2281) -> Result<Json<CountResponse>, XrpcError> {
2282 count_mirror::<IssueStateBy>(&state, q).map(Json)
2283}
2284
2285async fn list_pulls_by(
2286 State(state): State<AppState>,
2287 XrpcQuery(q): XrpcQuery<TypedListQuery<PullFilter>>,
2288) -> Result<Response, XrpcError> {
2289 let (page, nsid) = mirror_edge_page::<PullBy, _>(&state, &q)?;
2290 let permit = state.heavy_permit()?;
2291 let owned = state.clone();
2292 let items =
2293 hydrate_record_stream::<Pull<DefaultStr>>(&state, nsid, page.items, HitProvenance::Indexed)
2294 .map(move |view| view.map(|v| enrich_pull_view(&owned, v)));
2295 Ok(json_stream::<StatefulItem<Pull<DefaultStr>>, _>(
2296 "items",
2297 items,
2298 paged_tail(page.next.map(PageToken::encode_token)),
2299 permit,
2300 ))
2301}
2302async fn count_pulls_by(
2303 State(state): State<AppState>,
2304 XrpcQuery(q): XrpcQuery<CountQuery>,
2305) -> Result<Json<CountResponse>, XrpcError> {
2306 count_mirror::<PullBy>(&state, q).map(Json)
2307}
2308
2309async fn list_pull_statuses_by(
2310 State(state): State<AppState>,
2311 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2312) -> Result<Response, XrpcError> {
2313 list_mirror::<PullStatusBy, PullStatus<DefaultStr>, _>(&state, q).await
2314}
2315async fn count_pull_statuses_by(
2316 State(state): State<AppState>,
2317 XrpcQuery(q): XrpcQuery<CountQuery>,
2318) -> Result<Json<CountResponse>, XrpcError> {
2319 count_mirror::<PullStatusBy>(&state, q).map(Json)
2320}
2321
2322async fn list_spindle_members_by(
2323 State(state): State<AppState>,
2324 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2325) -> Result<Response, XrpcError> {
2326 list_mirror::<SpindleMemberBy, SpindleMember<DefaultStr>, _>(&state, q).await
2327}
2328async fn count_spindle_members_by(
2329 State(state): State<AppState>,
2330 XrpcQuery(q): XrpcQuery<CountQuery>,
2331) -> Result<Json<CountResponse>, XrpcError> {
2332 count_mirror::<SpindleMemberBy>(&state, q).map(Json)
2333}
2334
2335async fn list_label_definitions(
2336 State(state): State<AppState>,
2337 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2338) -> Result<Response, XrpcError> {
2339 list_records::<LabelDefinitionRecord, LabelDefinition<DefaultStr>, _>(&state, q).await
2340}
2341
2342async fn count_label_definitions(
2343 State(state): State<AppState>,
2344 XrpcQuery(q): XrpcQuery<CountQuery>,
2345) -> Result<Json<CountResponse>, XrpcError> {
2346 count_for::<LabelDefinitionRecord>(&state, q).map(Json)
2347}
2348
2349async fn list_label_ops(
2350 State(state): State<AppState>,
2351 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2352) -> Result<Response, XrpcError> {
2353 list_records::<LabelOpRecord, LabelOp<DefaultStr>, _>(&state, q).await
2354}
2355
2356async fn count_label_ops(
2357 State(state): State<AppState>,
2358 XrpcQuery(q): XrpcQuery<CountQuery>,
2359) -> Result<Json<CountResponse>, XrpcError> {
2360 count_for::<LabelOpRecord>(&state, q).map(Json)
2361}
2362
2363async fn list_pipelines(
2364 State(state): State<AppState>,
2365 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2366) -> Result<Response, XrpcError> {
2367 list_records::<PipelineRecord, Pipeline<DefaultStr>, _>(&state, q).await
2368}
2369
2370async fn count_pipelines(
2371 State(state): State<AppState>,
2372 XrpcQuery(q): XrpcQuery<CountQuery>,
2373) -> Result<Json<CountResponse>, XrpcError> {
2374 count_for::<PipelineRecord>(&state, q).map(Json)
2375}
2376
2377async fn list_pipeline_statuses(
2378 State(state): State<AppState>,
2379 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2380) -> Result<Response, XrpcError> {
2381 list_records::<PipelineStatusRecord, PipelineStatus<DefaultStr>, _>(&state, q).await
2382}
2383
2384async fn count_pipeline_statuses(
2385 State(state): State<AppState>,
2386 XrpcQuery(q): XrpcQuery<CountQuery>,
2387) -> Result<Json<CountResponse>, XrpcError> {
2388 count_for::<PipelineStatusRecord>(&state, q).map(Json)
2389}
2390
2391async fn list_artifacts(
2392 State(state): State<AppState>,
2393 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2394) -> Result<Response, XrpcError> {
2395 list_records::<ArtifactRecord, Artifact<DefaultStr>, _>(&state, q).await
2396}
2397
2398async fn count_artifacts(
2399 State(state): State<AppState>,
2400 XrpcQuery(q): XrpcQuery<CountQuery>,
2401) -> Result<Json<CountResponse>, XrpcError> {
2402 count_for::<ArtifactRecord>(&state, q).map(Json)
2403}
2404
2405async fn list_knot_members(
2406 State(state): State<AppState>,
2407 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2408) -> Result<Response, XrpcError> {
2409 list_records::<KnotMemberRecord, KnotMember<DefaultStr>, _>(&state, q).await
2410}
2411
2412async fn count_knot_members(
2413 State(state): State<AppState>,
2414 XrpcQuery(q): XrpcQuery<CountQuery>,
2415) -> Result<Json<CountResponse>, XrpcError> {
2416 count_for::<KnotMemberRecord>(&state, q).map(Json)
2417}
2418
2419async fn list_spindle_members(
2420 State(state): State<AppState>,
2421 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2422) -> Result<Response, XrpcError> {
2423 list_records::<SpindleMemberRecord, SpindleMember<DefaultStr>, _>(&state, q).await
2424}
2425
2426async fn count_spindle_members(
2427 State(state): State<AppState>,
2428 XrpcQuery(q): XrpcQuery<CountQuery>,
2429) -> Result<Json<CountResponse>, XrpcError> {
2430 count_for::<SpindleMemberRecord>(&state, q).map(Json)
2431}
2432
2433async fn list_strings(
2434 State(state): State<AppState>,
2435 XrpcQuery(q): XrpcQuery<TypedListQuery<NoFilter>>,
2436) -> Result<Response, XrpcError> {
2437 list_records::<TangledStringRecord, TangledString<DefaultStr>, _>(&state, q).await
2438}
2439
2440async fn count_strings(
2441 State(state): State<AppState>,
2442 XrpcQuery(q): XrpcQuery<CountQuery>,
2443) -> Result<Json<CountResponse>, XrpcError> {
2444 count_for::<TangledStringRecord>(&state, q).map(Json)
2445}
2446
2447#[derive(Deserialize)]
2448struct ResolveMiniDocParams {
2449 identifier: AtIdentifier<DefaultStr>,
2450}
2451
2452async fn resolve_mini_doc(
2453 State(state): State<AppState>,
2454 XrpcQuery(q): XrpcQuery<ResolveMiniDocParams>,
2455) -> Result<Response, XrpcError> {
2456 let body = state
2457 .slingshot
2458 .resolve_mini_doc(&q.identifier)
2459 .await
2460 .map_err(map_slingshot)?;
2461 Ok((StatusCode::OK, [(CONTENT_TYPE, "application/json")], body).into_response())
2462}
2463
2464async fn get_coverage(State(state): State<AppState>) -> Json<CoverageEnvelope> {
2465 Json(state.coverage.snapshot().into())
2466}
2467
2468async fn search_query(
2469 State(state): State<AppState>,
2470 XrpcQuery(q): XrpcQuery<SearchQueryParams>,
2471) -> Result<Response, XrpcError> {
2472 if q.q.trim().is_empty() {
2473 return Err(XrpcError::InvalidParams("q must not be empty".into()));
2474 }
2475 let cursor = SearchCursor::from_token(q.cursor.as_deref())
2476 .map_err(|e| XrpcError::InvalidParams(format!("cursor: {e}")))?;
2477 let limit = parse_limit(q.limit)?;
2478 let filters = build_search_filters(&q)?;
2479 let permit = state.heavy_permit()?;
2480 let page = state
2481 .search
2482 .search(&q.q, filters, cursor, limit.get())
2483 .await
2484 .map_err(map_search_err)?;
2485 let next = page.next.map(SearchOffset::encode_token);
2486 let owned = state.clone();
2487 let hits = hydrate_stream(page.hits, move |hit| {
2488 let owned = owned.clone();
2489 let uri = hit.uri.clone();
2490 let nsid = hit.nsid.clone();
2491 async move {
2492 let result = hydrate_search_hit(&owned, hit).await;
2493 drop_unhydratable(HitProvenance::Indexed, &nsid, &uri, result)
2494 }
2495 });
2496 Ok(json_stream::<SearchHitView, _>(
2497 "hits",
2498 hits,
2499 paged_tail(next),
2500 permit,
2501 ))
2502}
2503
2504fn build_search_filters(q: &SearchQueryParams) -> Result<SearchFilters, XrpcError> {
2505 let since = q
2506 .since
2507 .as_deref()
2508 .map(parse_rfc3339_seconds)
2509 .transpose()
2510 .map_err(|e| XrpcError::InvalidParams(format!("since: {e}")))?;
2511 let until = q
2512 .until
2513 .as_deref()
2514 .map(parse_rfc3339_seconds)
2515 .transpose()
2516 .map_err(|e| XrpcError::InvalidParams(format!("until: {e}")))?;
2517 if let (Some(s), Some(u)) = (since, until)
2518 && s > u
2519 {
2520 return Err(XrpcError::InvalidParams("since must be <= until".into()));
2521 }
2522 Ok(SearchFilters {
2523 nsid: q.nsid.clone(),
2524 author: q.author.clone(),
2525 repo: q.repo.clone(),
2526 since,
2527 until,
2528 })
2529}
2530
2531fn parse_rfc3339_seconds(raw: &str) -> Result<i64, String> {
2532 chrono::DateTime::parse_from_rfc3339(raw)
2533 .map(|dt| dt.timestamp())
2534 .map_err(|e| format!("expected RFC3339, got {raw}: {e}"))
2535}
2536
2537async fn hydrate_search_hit(
2538 state: &AppState,
2539 hit: SearchHit,
2540) -> Result<Option<SearchHitView>, XrpcError> {
2541 let SearchHit { uri, nsid, score } = hit;
2542 let body = resolve_for_view(state, &nsid, uri).await?;
2543 let record = decode_canon_or_upgrade(&nsid, &body.value, &state.resolver)
2544 .await
2545 .map_err(|err| XrpcError::InvalidRecord(err.to_string()))?;
2546 let Some(value) = SearchableRecord::try_from_record(record) else {
2547 return Ok(None);
2548 };
2549 let Some(value) = value.normalize(&state.resolver).await else {
2550 return Ok(None);
2551 };
2552 Ok(Some(SearchHitView {
2553 uri: body.uri.clone(),
2554 cid: Some(body.cid.clone()),
2555 nsid,
2556 score,
2557 value,
2558 }))
2559}
2560
2561fn map_search_err(err: SearchError) -> XrpcError {
2562 use SearchError as E;
2563 match err {
2564 E::Query(e) => XrpcError::InvalidParams(format!("query: {e}")),
2565 e @ (E::Tantivy(_)
2566 | E::InvalidUri(_)
2567 | E::InvalidNsid(_)
2568 | E::MissingField(_)
2569 | E::Cancelled(_)) => XrpcError::Internal(format!("search: {e}")),
2570 }
2571}
2572
2573fn map_proxy_error(err: KnotProxyError) -> XrpcError {
2574 match err {
2575 KnotProxyError::CircuitOpen => {
2576 XrpcError::UpstreamUnavailable("knot circuit breaker open".into())
2577 }
2578 KnotProxyError::BlockedHost { host, reason } => {
2579 XrpcError::InvalidRecord(format!("knot host {host} is {reason} address space"))
2580 }
2581 KnotProxyError::PlaintextHttp { host } => {
2582 XrpcError::InvalidRecord(format!("knot host {host} requires https"))
2583 }
2584 KnotProxyError::Connect(e) => XrpcError::UpstreamUnavailable(format!("connect: {e}")),
2585 KnotProxyError::Timeout(e) => {
2586 XrpcError::UpstreamUnavailable(format!("upstream timeout: {e}"))
2587 }
2588 KnotProxyError::Redirect(e) => XrpcError::UpstreamUnavailable(format!("redirect: {e}")),
2589 KnotProxyError::Transport(e) => XrpcError::UpstreamUnavailable(format!("transport: {e}")),
2590 KnotProxyError::Upstream(s) => XrpcError::UpstreamUnavailable(format!("status {s}")),
2591 }
2592}
2593
2594fn validate_client_supplied_knot(state: &AppState, host: &KnotHost) -> Result<(), XrpcError> {
2595 let host_str = || host.url().host_str().unwrap_or_default().to_owned();
2596 if state.knots.requires_https() && host.url().scheme() != "https" {
2597 return Err(XrpcError::InvalidParams(format!(
2598 "knot host {} must be https",
2599 host_str(),
2600 )));
2601 }
2602 if state.knots.allows_private_hosts() {
2603 return Ok(());
2604 }
2605 match host.private_literal_reason() {
2606 None => Ok(()),
2607 Some(reason) => Err(XrpcError::InvalidParams(format!(
2608 "knot host {} blocked: {} address space",
2609 host_str(),
2610 reason,
2611 ))),
2612 }
2613}
2614
2615async fn resolve_knot_target(
2616 state: &AppState,
2617 repo_uri: AtUri<DefaultStr>,
2618) -> Result<(KnotHost, RepoSlug), XrpcError> {
2619 let rkey: Option<Rkey<DefaultStr>> = repo_uri.rkey().map(|r| r.clone().into_static());
2620 let (body, did) = resolve(state, ExpectedNsid::from_static(RepoRecord::NSID), repo_uri).await?;
2621 let value: Repo<DefaultStr> = serde_json::from_slice(&body.value)
2622 .map_err(|e| XrpcError::InvalidRecord(format!("decode repo record: {e}")))?;
2623 let host = KnotHost::parse(value.knot.as_ref())
2624 .map_err(|e| XrpcError::InvalidRecord(format!("knot field: {e}")))?;
2625 let name = pick_human_slug(rkey.as_ref(), value.name.as_deref()).ok_or_else(|| {
2626 XrpcError::InvalidRecord("at-uri missing rkey and record missing name".to_string())
2627 })?;
2628 let slug = RepoSlug::new(&did, &name)
2629 .map_err(|e| XrpcError::InvalidRecord(format!("repo slug: {e}")))?;
2630 Ok((host, slug))
2631}
2632
2633fn pick_human_slug(rkey: Option<&Rkey<DefaultStr>>, name: Option<&str>) -> Option<String> {
2634 match rkey {
2635 Some(r) if jacquard_common::types::tid::Tid::new(r.as_ref()).is_ok() => {
2636 Some(name.unwrap_or(r.as_ref()).to_owned())
2637 }
2638 Some(r) => Some(r.as_ref().to_owned()),
2639 None => name.map(str::to_owned),
2640 }
2641}
2642
2643fn filter_request_headers(client: &HeaderMap) -> HeaderMap {
2644 FORWARDED_REQUEST_HEADERS
2645 .iter()
2646 .fold(HeaderMap::new(), |mut acc, name| {
2647 if let Some(value) = client.get(*name) {
2648 acc.insert((*name).clone(), value.clone());
2649 }
2650 acc
2651 })
2652}
2653
2654fn upstream_to_axum(resp: ProxyResponse) -> Response {
2655 let status = resp.status();
2656 let upstream_headers = resp.headers().clone();
2657 let body = Body::from_stream(resp.into_body_stream());
2658 let mut response = Response::builder()
2659 .status(status)
2660 .body(body)
2661 .expect("response body construction must succeed");
2662 let response_headers = response.headers_mut();
2663 PASSTHROUGH_HEADERS.iter().for_each(|name| {
2664 if let Some(value) = upstream_headers.get(*name) {
2665 response_headers.insert((*name).clone(), value.clone());
2666 }
2667 });
2668 response
2669}
2670
2671async fn dispatch_proxy(
2672 state: AppState,
2673 headers: HeaderMap,
2674 nsid: Nsid<DefaultStr>,
2675 host: KnotHost,
2676 params: ProxyParams,
2677) -> Result<Response, XrpcError> {
2678 let forward: Vec<(&str, &str)> = params
2679 .iter()
2680 .map(|(k, v)| (k.as_str(), v.as_str()))
2681 .collect();
2682 let allowed = filter_request_headers(&headers);
2683 let upstream = state
2684 .knots
2685 .forward(&host, &nsid, &forward, allowed)
2686 .await
2687 .map_err(map_proxy_error)?;
2688 Ok(upstream_to_axum(upstream))
2689}
2690
2691fn extract_param(
2692 params: ProxyParams,
2693 key: &str,
2694) -> Result<Option<(String, ProxyParams)>, XrpcError> {
2695 let (matching, rest): (ProxyParams, ProxyParams) =
2696 params.into_iter().partition(|(k, _)| k == key);
2697 match matching.as_slice() {
2698 [] => Ok(None),
2699 [_] => Ok(matching.into_iter().next().map(|(_, v)| (v, rest))),
2700 _ => Err(XrpcError::InvalidParams(format!(
2701 "{key} parameter must appear at most once, got {}",
2702 matching.len(),
2703 ))),
2704 }
2705}
2706
2707async fn proxy_repo_handler(
2708 state: AppState,
2709 headers: HeaderMap,
2710 params: ProxyParams,
2711 nsid: Nsid<DefaultStr>,
2712) -> Result<Response, XrpcError> {
2713 let (repo_raw, rest) = extract_param(params, REPO_PARAM)?
2714 .ok_or_else(|| XrpcError::InvalidParams("missing repo".into()))?;
2715 let repo_uri = parse_uri(&repo_raw)?;
2716 let (host, slug) = resolve_knot_target(&state, repo_uri).await?;
2717 let forward = rest
2718 .into_iter()
2719 .chain(std::iter::once((
2720 REPO_PARAM.to_owned(),
2721 slug.as_str().to_owned(),
2722 )))
2723 .collect();
2724 dispatch_proxy(state, headers, nsid, host, forward).await
2725}
2726
2727async fn proxy_knot_handler(
2728 state: AppState,
2729 headers: HeaderMap,
2730 params: ProxyParams,
2731 nsid: Nsid<DefaultStr>,
2732) -> Result<Response, XrpcError> {
2733 let (knot_raw, forward) = extract_param(params, KNOT_HOST_PARAM)?
2734 .ok_or_else(|| XrpcError::InvalidParams("missing knot".into()))?;
2735 let host =
2736 KnotHost::parse(&knot_raw).map_err(|e| XrpcError::InvalidParams(format!("knot: {e}")))?;
2737 validate_client_supplied_knot(&state, &host)?;
2738 dispatch_proxy(state, headers, nsid, host, forward).await
2739}