.github
constellation
src
bin
consumer
server
storage
templates
links
···
28
28
- name: get nightly toolchain for jetstream fmt
29
29
run: rustup toolchain install nightly --allow-downgrade -c rustfmt
30
30
- name: fmt
31
31
-
run: cargo fmt --package links --package constellation --package ufos --package spacedust --package who-am-i --package slingshot --package pocket -- --check
31
31
+
run: >-
32
32
+
cargo fmt
33
33
+
--package constellation
34
34
+
--package links
35
35
+
--package pocket
36
36
+
--package quasar
37
37
+
--package slingshot
38
38
+
--package spacedust
39
39
+
--package ufos
32
40
- name: fmt jetstream (nightly)
33
41
run: cargo +nightly fmt --package jetstream -- --check
34
42
- name: clippy
···
185
185
"nom",
186
186
"num-traits",
187
187
"rusticata-macros",
188
188
-
"thiserror 2.0.16",
188
188
+
"thiserror 2.0.18",
189
189
"time",
190
190
]
191
191
···
968
968
"serde_json",
969
969
"serde_with",
970
970
"tempfile",
971
971
+
"thiserror 2.0.18",
971
972
"tinyjson",
972
973
"tokio",
973
974
"tokio-util",
···
1345
1346
"slog-bunyan",
1346
1347
"slog-json",
1347
1348
"slog-term",
1348
1348
-
"thiserror 2.0.16",
1349
1349
+
"thiserror 2.0.18",
1349
1350
"tokio",
1350
1351
"tokio-rustls 0.25.0",
1351
1352
"toml",
···
2116
2117
"once_cell",
2117
2118
"rand 0.9.3",
2118
2119
"ring",
2119
2119
-
"thiserror 2.0.16",
2120
2120
+
"thiserror 2.0.18",
2120
2121
"tinyvec",
2121
2122
"tokio",
2122
2123
"tracing",
···
2139
2140
"rand 0.9.3",
2140
2141
"resolv-conf",
2141
2142
"smallvec",
2142
2142
-
"thiserror 2.0.16",
2143
2143
+
"thiserror 2.0.18",
2143
2144
"tokio",
2144
2145
"tracing",
2145
2146
]
···
2625
2626
"metrics",
2626
2627
"serde",
2627
2628
"serde_json",
2628
2628
-
"thiserror 2.0.16",
2629
2629
+
"thiserror 2.0.18",
2629
2630
"tokio",
2630
2631
"tokio-tungstenite 0.26.2",
2631
2632
"url",
···
2857
2858
name = "links"
2858
2859
version = "0.1.0"
2859
2860
dependencies = [
2860
2860
-
"anyhow",
2861
2861
"fluent-uri",
2862
2862
-
"nom",
2863
2863
-
"thiserror 2.0.16",
2862
2862
+
"thiserror 2.0.18",
2864
2863
"tinyjson",
2865
2864
]
2866
2865
···
3081
3080
"metrics",
3082
3081
"metrics-util 0.20.0",
3083
3082
"quanta",
3084
3084
-
"thiserror 2.0.16",
3083
3083
+
"thiserror 2.0.18",
3085
3084
"tokio",
3086
3085
"tracing",
3087
3086
]
···
3595
3594
"rusqlite",
3596
3595
"serde",
3597
3596
"serde_json",
3598
3598
-
"thiserror 2.0.16",
3597
3597
+
"thiserror 2.0.18",
3599
3598
"tokio",
3600
3599
"tracing-subscriber",
3601
3600
]
···
3638
3637
"smallvec",
3639
3638
"sync_wrapper",
3640
3639
"tempfile",
3641
3641
-
"thiserror 2.0.16",
3640
3640
+
"thiserror 2.0.18",
3642
3641
"tokio",
3643
3642
"tokio-rustls 0.26.2",
3644
3643
"tokio-stream",
···
3682
3681
"serde_json",
3683
3682
"serde_urlencoded",
3684
3683
"serde_yaml",
3685
3685
-
"thiserror 2.0.16",
3684
3684
+
"thiserror 2.0.18",
3686
3685
"tokio",
3687
3686
]
3688
3687
···
3701
3700
"quote",
3702
3701
"regex",
3703
3702
"syn",
3704
3704
-
"thiserror 2.0.16",
3703
3703
+
"thiserror 2.0.18",
3705
3704
]
3706
3705
3707
3706
[[package]]
···
3850
3849
"rustc-hash 2.1.1",
3851
3850
"rustls 0.23.31",
3852
3851
"socket2 0.5.9",
3853
3853
-
"thiserror 2.0.16",
3852
3852
+
"thiserror 2.0.18",
3854
3853
"tokio",
3855
3854
"tracing",
3856
3855
"web-time",
···
3871
3870
"rustls 0.23.31",
3872
3871
"rustls-pki-types",
3873
3872
"slab",
3874
3874
-
"thiserror 2.0.16",
3873
3873
+
"thiserror 2.0.18",
3875
3874
"tinyvec",
3876
3875
"tracing",
3877
3876
"web-time",
···
4586
4585
"percent-encoding",
4587
4586
"ryu",
4588
4587
"serde",
4589
4589
-
"thiserror 2.0.16",
4588
4588
+
"thiserror 2.0.18",
4590
4589
]
4591
4590
4592
4591
[[package]]
···
4756
4755
"rustls 0.23.31",
4757
4756
"serde",
4758
4757
"serde_json",
4759
4759
-
"thiserror 2.0.16",
4758
4758
+
"thiserror 2.0.18",
4760
4759
"time",
4761
4760
"tokio",
4762
4761
"tokio-util",
···
4873
4872
"serde",
4874
4873
"serde_json",
4875
4874
"serde_qs",
4876
4876
-
"thiserror 2.0.16",
4875
4875
+
"thiserror 2.0.18",
4877
4876
"tinyjson",
4878
4877
"tokio",
4879
4878
"tokio-tungstenite 0.27.0",
···
5022
5021
5023
5022
[[package]]
5024
5023
name = "thiserror"
5025
5025
-
version = "2.0.16"
5024
5024
+
version = "2.0.18"
5026
5025
source = "registry+https://github.com/rust-lang/crates.io-index"
5027
5027
-
checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0"
5026
5026
+
checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4"
5028
5027
dependencies = [
5029
5029
-
"thiserror-impl 2.0.16",
5028
5028
+
"thiserror-impl 2.0.18",
5030
5029
]
5031
5030
5032
5031
[[package]]
···
5042
5041
5043
5042
[[package]]
5044
5043
name = "thiserror-impl"
5045
5045
-
version = "2.0.16"
5044
5044
+
version = "2.0.18"
5046
5045
source = "registry+https://github.com/rust-lang/crates.io-index"
5047
5047
-
checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960"
5046
5046
+
checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5"
5048
5047
dependencies = [
5049
5048
"proc-macro2",
5050
5049
"quote",
···
5452
5451
"native-tls",
5453
5452
"rand 0.9.3",
5454
5453
"sha1",
5455
5455
-
"thiserror 2.0.16",
5454
5454
+
"thiserror 2.0.18",
5456
5455
"url",
5457
5456
"utf-8",
5458
5457
]
···
5470
5469
"log",
5471
5470
"rand 0.9.3",
5472
5471
"sha1",
5473
5473
-
"thiserror 2.0.16",
5472
5472
+
"thiserror 2.0.18",
5474
5473
"utf-8",
5475
5474
]
5476
5475
···
5517
5516
"serde_qs",
5518
5517
"sha2",
5519
5518
"tempfile",
5520
5520
-
"thiserror 2.0.16",
5519
5519
+
"thiserror 2.0.18",
5521
5520
"tikv-jemallocator",
5522
5521
"tokio",
5523
5522
"tokio-util",
···
6159
6158
"nom",
6160
6159
"oid-registry",
6161
6160
"rusticata-macros",
6162
6162
-
"thiserror 2.0.16",
6161
6161
+
"thiserror 2.0.18",
6163
6162
"time",
6164
6163
]
6165
6164
···
14
14
15
15
[workspace.dependencies]
16
16
clap = { version = "4.5.56", features = ["derive", "env"] }
17
17
+
thiserror = "2.0.18"
···
27
27
serde = { version = "1.0.215", features = ["derive"] }
28
28
serde_json = "1.0.139"
29
29
serde_with = { version = "3.12.0", features = ["hex"] }
30
30
+
thiserror = { workspace = true }
30
31
tinyjson = "2.5.1"
31
32
tokio-util = "0.7.13"
32
33
tower-http = { version = "0.6.2", features = ["cors"] }
···
60
60
/// Saved jsonl from jetstream to use instead of a live subscription
61
61
#[arg(short, long)]
62
62
fixture: Option<PathBuf>,
63
63
+
/// Don't change the database jetstream cursor when using a fixture
64
64
+
#[arg(long, requires("fixture"))]
65
65
+
fixture_preserve_cursor: bool,
63
66
/// run a scan across the target id table and write all key -> ids to id -> keys
64
67
#[arg(long, action)]
65
68
repair_target_ids: bool,
···
88
91
println!("starting with storage backend: {:?}...", args.backend);
89
92
90
93
let fixture = args.fixture;
94
94
+
let fixture_preserve_cursor = args.fixture_preserve_cursor;
91
95
if let Some(ref p) = fixture {
92
92
-
println!("using fixture at {p:?}...");
96
96
+
println!("using fixture at {p:?}, preserving cursor? {fixture_preserve_cursor:?}...");
93
97
}
94
98
95
99
let stream = jetstream_url(&args.jetstream);
···
105
109
StorageBackend::Memory => run(
106
110
MemStorage::new(),
107
111
fixture,
112
112
+
fixture_preserve_cursor,
108
113
None,
109
114
args.did_web_domain,
110
115
stream,
···
141
146
let r = run(
142
147
rocks,
143
148
fixture,
149
149
+
fixture_preserve_cursor,
144
150
args.data,
145
151
args.did_web_domain,
146
152
stream,
···
163
169
fn run(
164
170
mut storage: impl LinkStorage,
165
171
fixture: Option<PathBuf>,
172
172
+
fixture_preserve_cursor: bool,
166
173
data_dir: Option<PathBuf>,
167
174
did_web_domain: Option<String>,
168
175
stream: String,
···
194
201
let stay_alive = stay_alive.clone();
195
202
let staying_alive = stay_alive.clone();
196
203
move || {
197
197
-
if let Err(e) = consume(storage, qsize, fixture, stream, staying_alive) {
204
204
+
if let Err(e) = consume(
205
205
+
storage,
206
206
+
qsize,
207
207
+
fixture,
208
208
+
fixture_preserve_cursor,
209
209
+
stream,
210
210
+
staying_alive,
211
211
+
) {
198
212
eprintln!("jetstream finished with error: {e}");
199
213
}
200
214
stay_alive.drop_guard();
···
6
6
use anyhow::Result;
7
7
use jetstream::consume_jetstream;
8
8
use jsonl_file::consume_jsonl_file;
9
9
-
use links::collect_links;
9
9
+
use links::{parse_any_link, record::walk_record, CollectedLink};
10
10
use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
11
11
use std::path::PathBuf;
12
12
use std::sync::atomic::{AtomicU32, Ordering};
···
19
19
mut store: impl LinkStorage,
20
20
qsize: Arc<AtomicU32>,
21
21
fixture: Option<PathBuf>,
22
22
+
fixture_preserve_cursor: bool,
22
23
stream: String,
23
24
staying_alive: CancellationToken,
24
25
) -> Result<()> {
···
43
44
"number of links per message"
44
45
);
45
46
47
47
+
let mut fixture_cursor = None;
46
48
let (receiver, consumer_handle) = if let Some(f) = fixture {
47
49
let (sender, receiver) = flume::bounded(21);
50
50
+
if fixture_preserve_cursor {
51
51
+
fixture_cursor = store.get_cursor()?;
52
52
+
if fixture_cursor.is_none() {
53
53
+
anyhow::bail!(
54
54
+
"--fixture-preserve-cursor was set but the database has no \
55
55
+
existing cursor to preserve. either drop the flag (cursor \
56
56
+
will be set to the last event in the fixture, current default \
57
57
+
behavior) or run a live jetstream session first."
58
58
+
)
59
59
+
}
60
60
+
}
48
61
(
49
62
receiver,
50
63
thread::spawn(move || consume_jsonl_file(f, sender)),
51
64
)
52
65
} else {
53
53
-
let (sender, receiver) = flume::bounded(32_768); // eek
66
66
+
let (sender, receiver) = flume::bounded(1024);
54
67
let cursor = store.get_cursor().unwrap();
55
68
(
56
69
receiver,
···
61
74
for update in receiver.iter() {
62
75
if let Some((action, ts)) = get_actionable(&update) {
63
76
{
64
64
-
store.push(&action, ts).unwrap();
77
77
+
store.push(&action, fixture_cursor.unwrap_or(ts)).unwrap();
65
78
qsize.store(receiver.len().try_into().unwrap(), Ordering::Relaxed);
66
79
}
67
80
} else {
···
99
112
};
100
113
match commit.get("operation")? {
101
114
JsonValue::String(op) if op == "create" => {
102
102
-
let links = collect_links(commit.get("record")?);
115
115
+
let mut links = vec![];
116
116
+
// 1. extract links (dids probably) from rkey, if there
117
117
+
if let Some(target) = parse_any_link(rkey) {
118
118
+
links.push(CollectedLink {
119
119
+
path: ".".into(),
120
120
+
target,
121
121
+
});
122
122
+
}
123
123
+
// 2. and from the record body
124
124
+
walk_record("", commit.get("record")?, &mut links);
125
125
+
103
126
counter!("consumer_events_actionable", "action_type" => "create_links", "collection" => collection.clone()).increment(1);
104
127
histogram!("consumer_events_actionable_links", "action_type" => "create_links", "collection" => collection.clone()).record(links.len() as f64);
105
128
for link in &links {
···
128
151
}
129
152
}
130
153
JsonValue::String(op) if op == "update" => {
131
131
-
let links = collect_links(commit.get("record")?);
154
154
+
let mut links = vec![];
155
155
+
// 1. extract links (dids probably) from rkey, if there
156
156
+
if let Some(target) = parse_any_link(rkey) {
157
157
+
links.push(CollectedLink {
158
158
+
path: ".".into(),
159
159
+
target,
160
160
+
});
161
161
+
}
162
162
+
// 2. and from the record body
163
163
+
walk_record("", commit.get("record")?, &mut links);
164
164
+
132
165
counter!("consumer_events_actionable", "action_type" => "update_links", "collection" => collection.clone()).increment(1);
133
166
histogram!("consumer_events_actionable_links", "action_type" => "update_links", "collection" => collection.clone()).record(links.len() as f64);
134
167
for link in &links {
···
334
367
Some((
335
368
ActionableEvent::DeactivateAccount("did:plc:l4jb3hkq7lrblferbywxkiol".into()),
336
369
1736451745611273
370
370
+
))
371
371
+
)
372
372
+
}
373
373
+
374
374
+
#[test]
375
375
+
fn test_create_vouch_indexes_did_rkey() {
376
376
+
let rec = r#"{
377
377
+
"did":"did:plc:voucher",
378
378
+
"time_us":1746460800000000,
379
379
+
"kind":"commit",
380
380
+
"commit":{"rev":"3lqrvouchcreate","operation":"create","collection":"sh.tangled.graph.vouch","rkey":"did:plc:vouchedfor","record":{
381
381
+
"$type":"sh.tangled.graph.vouch",
382
382
+
"createdAt":"2026-05-05T12:00:00.000Z"
383
383
+
}}
384
384
+
}"#.parse().unwrap();
385
385
+
let action = get_actionable(&rec);
386
386
+
assert_eq!(
387
387
+
action,
388
388
+
Some((
389
389
+
ActionableEvent::CreateLinks {
390
390
+
record_id: RecordId {
391
391
+
did: "did:plc:voucher".into(),
392
392
+
collection: "sh.tangled.graph.vouch".into(),
393
393
+
rkey: "did:plc:vouchedfor".into(),
394
394
+
},
395
395
+
links: vec![CollectedLink {
396
396
+
path: ".".into(),
397
397
+
target: Link::Did("did:plc:vouchedfor".into()),
398
398
+
}],
399
399
+
},
400
400
+
1746460800000000
401
401
+
))
402
402
+
)
403
403
+
}
404
404
+
405
405
+
#[test]
406
406
+
fn test_update_vouch_indexes_did_rkey() {
407
407
+
let rec = r#"{
408
408
+
"did":"did:plc:voucher",
409
409
+
"time_us":1746460800000001,
410
410
+
"kind":"commit",
411
411
+
"commit":{"rev":"3lqrvouchupdate","operation":"update","collection":"sh.tangled.graph.vouch","rkey":"did:plc:vouchedfor","record":{
412
412
+
"$type":"sh.tangled.graph.vouch",
413
413
+
"createdAt":"2026-05-05T12:00:00.000Z",
414
414
+
"reason":"https://atproto.com"
415
415
+
}}
416
416
+
}"#.parse().unwrap();
417
417
+
let action = get_actionable(&rec);
418
418
+
assert_eq!(
419
419
+
action,
420
420
+
Some((
421
421
+
ActionableEvent::UpdateLinks {
422
422
+
record_id: RecordId {
423
423
+
did: "did:plc:voucher".into(),
424
424
+
collection: "sh.tangled.graph.vouch".into(),
425
425
+
rkey: "did:plc:vouchedfor".into(),
426
426
+
},
427
427
+
new_links: vec![
428
428
+
CollectedLink {
429
429
+
path: ".".into(),
430
430
+
target: Link::Did("did:plc:vouchedfor".into()),
431
431
+
},
432
432
+
CollectedLink {
433
433
+
path: ".reason".into(),
434
434
+
target: Link::Uri("https://atproto.com".into()),
435
435
+
},
436
436
+
],
437
437
+
},
438
438
+
1746460800000001
337
439
))
338
440
)
339
441
}
···
1
1
+
#[derive(Debug, thiserror::Error, PartialEq)]
2
2
+
pub enum LinkSourceError {
3
3
+
#[error("Collection-path separator `:` is required")]
4
4
+
MissingSeparator,
5
5
+
#[error("Collection before `:` is required")]
6
6
+
MissingCollection,
7
7
+
#[error("Record path or `.` after `:'`is required")]
8
8
+
MissingPath,
9
9
+
#[error("Leading dot in path is not allowed")]
10
10
+
LeadingPathDot,
11
11
+
}
12
12
+
13
13
+
/// parse a record path (or rkey sentinel)
14
14
+
pub fn parse_path(input: &str) -> Result<String, LinkSourceError> {
15
15
+
match input {
16
16
+
"" => Err(LinkSourceError::MissingPath),
17
17
+
"." => Ok(".".to_string()),
18
18
+
p if p.starts_with('.') => Err(LinkSourceError::LeadingPathDot),
19
19
+
p => Ok(format!(".{p}")),
20
20
+
}
21
21
+
}
22
22
+
23
23
+
/// hacky version that will eventually be from a nicer library
24
24
+
///
25
25
+
/// syntax: `<NSID>:<RecordPath OR '.'>`
26
26
+
///
27
27
+
/// right now `NSID` is not validated, but it will be soon.
28
28
+
///
29
29
+
/// right now `RecordPath` is just a string, but it will be replaced by
30
30
+
/// tangled.org/microcosm.blue/RecordPath soon.
31
31
+
///
32
32
+
/// the special `.` path refers to the rkey instead of record contents
33
33
+
///
34
34
+
/// returns: (collection NISD, path), where `path` is always .-prefixed
35
35
+
pub fn parse_link_source(input: &str) -> Result<(String, String), LinkSourceError> {
36
36
+
let (collection, path) = input
37
37
+
.split_once(':')
38
38
+
.ok_or(LinkSourceError::MissingSeparator)?;
39
39
+
40
40
+
if collection.is_empty() {
41
41
+
return Err(LinkSourceError::MissingCollection);
42
42
+
}
43
43
+
44
44
+
let path = parse_path(path)?;
45
45
+
46
46
+
Ok((collection.to_string(), path))
47
47
+
}
48
48
+
49
49
+
#[cfg(test)]
50
50
+
mod tests {
51
51
+
use super::*;
52
52
+
53
53
+
#[test]
54
54
+
fn test_parse_link_source() {
55
55
+
for (case, expected) in [
56
56
+
("", Err(LinkSourceError::MissingSeparator)),
57
57
+
("a", Err(LinkSourceError::MissingSeparator)),
58
58
+
(":", Err(LinkSourceError::MissingCollection)),
59
59
+
("a:", Err(LinkSourceError::MissingPath)),
60
60
+
(":a", Err(LinkSourceError::MissingCollection)),
61
61
+
("a:b", Ok(("a".to_string(), ".b".to_string()))),
62
62
+
(
63
63
+
"app.bsky.feed.like:subject",
64
64
+
Ok(("app.bsky.feed.like".to_string(), ".subject".to_string())),
65
65
+
),
66
66
+
("a:.", Ok(("a".to_string(), ".".to_string()))),
67
67
+
(
68
68
+
"app.bsky.feed.like:subject.uri",
69
69
+
Ok(("app.bsky.feed.like".to_string(), ".subject.uri".to_string())),
70
70
+
),
71
71
+
(
72
72
+
"a:items[].uri",
73
73
+
Ok(("a".to_string(), ".items[].uri".to_string())),
74
74
+
),
75
75
+
("a:b:c", Ok(("a".to_string(), ".b:c".to_string()))),
76
76
+
("a:.foo", Err(LinkSourceError::LeadingPathDot)),
77
77
+
] {
78
78
+
let res = parse_link_source(case);
79
79
+
assert_eq!(res, expected);
80
80
+
}
81
81
+
}
82
82
+
83
83
+
#[test]
84
84
+
fn test_parse_path() {
85
85
+
for (case, expected) in [
86
86
+
("", Err(LinkSourceError::MissingPath)),
87
87
+
(".", Ok(".".to_string())),
88
88
+
("foo", Ok(".foo".to_string())),
89
89
+
("foo.bar", Ok(".foo.bar".to_string())),
90
90
+
("items[].uri", Ok(".items[].uri".to_string())),
91
91
+
(".foo", Err(LinkSourceError::LeadingPathDot)),
92
92
+
("..", Err(LinkSourceError::LeadingPathDot)),
93
93
+
] {
94
94
+
let res = parse_path(case);
95
95
+
assert_eq!(res, expected);
96
96
+
}
97
97
+
}
98
98
+
}
···
22
22
23
23
mod acceptable;
24
24
mod filters;
25
25
+
mod link_source;
25
26
26
27
use acceptable::{acceptable, ExtractAccept};
28
28
+
use link_source::{parse_link_source, parse_path};
27
29
28
30
const DEFAULT_CURSOR_LIMIT: u64 = 100;
29
31
const DEFAULT_CURSOR_LIMIT_MAX: u64 = 1000;
···
353
355
.filter(|s| !s.is_empty()),
354
356
);
355
357
356
356
-
let Some((collection, path)) = query.source.split_once(':') else {
357
357
-
return Err(http::StatusCode::BAD_REQUEST);
358
358
-
};
359
359
-
let path = format!(".{path}");
358
358
+
let (collection, path) =
359
359
+
parse_link_source(&query.source).map_err(|_| http::StatusCode::BAD_REQUEST)?; // TODO: better response errors!
360
360
361
361
-
let path_to_other = format!(".{}", query.path_to_other);
361
361
+
let path_to_other =
362
362
+
parse_path(&query.path_to_other).map_err(|_| http::StatusCode::BAD_REQUEST)?; // TODO: better response errors!
362
363
363
364
let paged = store
364
365
.get_many_to_many_counts(
365
366
&query.subject,
366
366
-
collection,
367
367
+
&collection,
367
368
&path,
368
369
&path_to_other,
369
370
limit,
···
442
443
query: axum_extra::extract::Query<GetItemsCountQuery>,
443
444
store: impl LinkReader,
444
445
) -> Result<impl IntoResponse, http::StatusCode> {
445
445
-
let Some((collection, path)) = query.source.split_once(':') else {
446
446
-
return Err(http::StatusCode::BAD_REQUEST);
447
447
-
};
448
448
-
let path = format!(".{path}");
446
446
+
let (collection, path) =
447
447
+
parse_link_source(&query.source).map_err(|_| http::StatusCode::BAD_REQUEST)?; // TODO: better response errors!
449
448
let total = store
450
450
-
.get_count(&query.subject, collection, &path)
449
449
+
.get_count(&query.subject, &collection, &path)
451
450
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
452
451
453
452
Ok(acceptable(
···
551
550
.map(|d| Did(d.to_string())),
552
551
);
553
552
554
554
-
let Some((collection, path)) = query.source.split_once(':') else {
555
555
-
return Err(http::StatusCode::BAD_REQUEST);
556
556
-
};
557
557
-
let path = format!(".{path}");
553
553
+
let (collection, path) =
554
554
+
parse_link_source(&query.source).map_err(|_| http::StatusCode::BAD_REQUEST)?; // TODO: better response errors!
558
555
559
556
let order = if query.reverse {
560
557
Order::OldestToNewest
···
565
562
let paged = store
566
563
.get_links(
567
564
&query.subject,
568
568
-
collection,
565
565
+
&collection,
569
566
&path,
570
567
order,
571
568
limit,
···
755
752
.filter(|s| !s.is_empty()),
756
753
);
757
754
758
758
-
let Some((collection, path)) = query.source.split_once(':') else {
759
759
-
return Err(http::StatusCode::BAD_REQUEST);
760
760
-
};
761
761
-
let path = format!(".{path}");
755
755
+
let (collection, path) =
756
756
+
parse_link_source(&query.source).map_err(|_| http::StatusCode::BAD_REQUEST)?; // TODO: better response errors!
762
757
763
763
-
let path_to_other = format!(".{}", query.path_to_other);
758
758
+
let path_to_other =
759
759
+
parse_path(&query.path_to_other).map_err(|_| http::StatusCode::BAD_REQUEST)?; // TODO: better response errors!
764
760
765
761
let paged = store
766
762
.get_many_to_many(
767
763
&query.subject,
768
768
-
collection,
764
764
+
&collection,
769
765
&path,
770
766
&path_to_other,
771
767
limit,
···
824
820
return Err(http::StatusCode::BAD_REQUEST);
825
821
}
826
822
827
827
-
let Some((collection, path)) = query.source.split_once(':') else {
828
828
-
return Err(http::StatusCode::BAD_REQUEST);
829
829
-
};
830
830
-
let path = format!(".{path}");
823
823
+
let (collection, path) =
824
824
+
parse_link_source(&query.source).map_err(|_| http::StatusCode::BAD_REQUEST)?; // TODO: better response errors!
831
825
832
826
let paged = store
833
833
-
.get_distinct_dids(&query.subject, collection, &path, limit, until)
827
827
+
.get_distinct_dids(&query.subject, &collection, &path, limit, until)
834
828
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
835
829
836
830
let cursor = paged.next.map(|next| {
···
1499
1499
assert_stats(storage.get_stats()?, 1..=1, 2..=2, 1..=1);
1500
1500
});
1501
1501
1502
1502
+
//////// rkey-indexed (path = ".") /////////
1503
1503
+
1504
1504
+
test_each_storage!(rkey_indexed_basic, |storage| {
1505
1505
+
storage.push(
1506
1506
+
&ActionableEvent::CreateLinks {
1507
1507
+
record_id: RecordId {
1508
1508
+
did: "did:plc:voucher".into(),
1509
1509
+
collection: "sh.tangled.graph.vouch".into(),
1510
1510
+
rkey: "did:plc:vouchedfor".into(),
1511
1511
+
},
1512
1512
+
links: vec![CollectedLink {
1513
1513
+
target: Link::Did("did:plc:vouchedfor".into()),
1514
1514
+
path: ".".into(),
1515
1515
+
}],
1516
1516
+
},
1517
1517
+
0,
1518
1518
+
)?;
1519
1519
+
1520
1520
+
assert_eq!(
1521
1521
+
storage.get_count("did:plc:vouchedfor", "sh.tangled.graph.vouch", ".")?,
1522
1522
+
1
1523
1523
+
);
1524
1524
+
assert_eq!(
1525
1525
+
storage.get_distinct_did_count("did:plc:vouchedfor", "sh.tangled.graph.vouch", ".")?,
1526
1526
+
1
1527
1527
+
);
1528
1528
+
assert_eq!(
1529
1529
+
storage.get_links(
1530
1530
+
"did:plc:vouchedfor",
1531
1531
+
"sh.tangled.graph.vouch",
1532
1532
+
".",
1533
1533
+
Order::NewestToOldest,
1534
1534
+
100,
1535
1535
+
None,
1536
1536
+
&HashSet::default(),
1537
1537
+
)?,
1538
1538
+
PagedAppendingCollection {
1539
1539
+
version: (1, 0),
1540
1540
+
items: vec![RecordId {
1541
1541
+
did: "did:plc:voucher".into(),
1542
1542
+
collection: "sh.tangled.graph.vouch".into(),
1543
1543
+
rkey: "did:plc:vouchedfor".into(),
1544
1544
+
}],
1545
1545
+
next: None,
1546
1546
+
total: 1,
1547
1547
+
}
1548
1548
+
);
1549
1549
+
assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1);
1550
1550
+
1551
1551
+
storage.push(
1552
1552
+
&ActionableEvent::DeleteRecord(RecordId {
1553
1553
+
did: "did:plc:voucher".into(),
1554
1554
+
collection: "sh.tangled.graph.vouch".into(),
1555
1555
+
rkey: "did:plc:vouchedfor".into(),
1556
1556
+
}),
1557
1557
+
0,
1558
1558
+
)?;
1559
1559
+
assert_eq!(
1560
1560
+
storage.get_count("did:plc:vouchedfor", "sh.tangled.graph.vouch", ".")?,
1561
1561
+
0
1562
1562
+
);
1563
1563
+
});
1564
1564
+
1565
1565
+
test_each_storage!(rkey_link_and_record_link_coexist, |storage| {
1566
1566
+
storage.push(
1567
1567
+
&ActionableEvent::CreateLinks {
1568
1568
+
record_id: RecordId {
1569
1569
+
did: "did:plc:voucher".into(),
1570
1570
+
collection: "sh.tangled.graph.vouch".into(),
1571
1571
+
rkey: "did:plc:vouchedfor".into(),
1572
1572
+
},
1573
1573
+
links: vec![
1574
1574
+
CollectedLink {
1575
1575
+
target: Link::Did("did:plc:vouchedfor".into()),
1576
1576
+
path: ".".into(),
1577
1577
+
},
1578
1578
+
CollectedLink {
1579
1579
+
target: Link::Uri("https://atproto.com".into()),
1580
1580
+
path: ".reason".into(),
1581
1581
+
},
1582
1582
+
],
1583
1583
+
},
1584
1584
+
0,
1585
1585
+
)?;
1586
1586
+
1587
1587
+
assert_eq!(
1588
1588
+
storage.get_count("did:plc:vouchedfor", "sh.tangled.graph.vouch", ".")?,
1589
1589
+
1
1590
1590
+
);
1591
1591
+
assert_eq!(
1592
1592
+
storage.get_count("https://atproto.com", "sh.tangled.graph.vouch", ".reason")?,
1593
1593
+
1
1594
1594
+
);
1595
1595
+
1596
1596
+
assert_eq!(storage.get_all_record_counts("did:plc:vouchedfor")?, {
1597
1597
+
let mut counts = HashMap::new();
1598
1598
+
let mut by_path = HashMap::new();
1599
1599
+
by_path.insert(".".into(), 1);
1600
1600
+
counts.insert("sh.tangled.graph.vouch".into(), by_path);
1601
1601
+
counts
1602
1602
+
});
1603
1603
+
assert_eq!(storage.get_all_record_counts("https://atproto.com")?, {
1604
1604
+
let mut counts = HashMap::new();
1605
1605
+
let mut by_path = HashMap::new();
1606
1606
+
by_path.insert(".reason".into(), 1);
1607
1607
+
counts.insert("sh.tangled.graph.vouch".into(), by_path);
1608
1608
+
counts
1609
1609
+
});
1610
1610
+
1611
1611
+
storage.push(
1612
1612
+
&ActionableEvent::DeleteRecord(RecordId {
1613
1613
+
did: "did:plc:voucher".into(),
1614
1614
+
collection: "sh.tangled.graph.vouch".into(),
1615
1615
+
rkey: "did:plc:vouchedfor".into(),
1616
1616
+
}),
1617
1617
+
0,
1618
1618
+
)?;
1619
1619
+
assert_eq!(
1620
1620
+
storage.get_count("did:plc:vouchedfor", "sh.tangled.graph.vouch", ".")?,
1621
1621
+
0
1622
1622
+
);
1623
1623
+
assert_eq!(
1624
1624
+
storage.get_count("https://atproto.com", "sh.tangled.graph.vouch", ".reason")?,
1625
1625
+
0
1626
1626
+
);
1627
1627
+
});
1628
1628
+
1502
1629
//////// many-to-many /////////
1503
1630
1504
1631
test_each_storage!(get_m2m_counts_empty, |storage| {
···
243
243
244
244
<p>A DID like <code>did:plc:hdhoaan3xa3jiuq4fg4mefid</code>, or an AT-URI like <code>at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3lgu4lg6j2k2v</code>, or a URI like <code>https://example.com</code>.</p>
245
245
246
246
+
<h3>Source</h3>
247
247
+
248
248
+
<p>A <em>link source</em>, made of a collection and path, like <code>app.bsky.feed.like:subject<code>. The <code>:</code> separates them.
249
249
+
246
250
<h3>Collection</h3>
247
251
248
252
<p>A record NSID like <code>app.bsky.feed.like</code>.</p>
249
253
250
254
<h3>Path</h3>
251
255
252
252
-
<p>A (currently-very-very-hacky) json-path-ish representation of the source of a link in a record. Records may contain multiple links with different meanings, so this specifies which specific link is of interest. Like <code>.subject.uri</code>.</p>
256
256
+
<p>A (currently-hacky) json-path-ish representation of the source of a link in a record. Records may contain multiple links with different meanings, so this specifies which specific link is of interest. Like <code>.subject.uri</code>.</p>
257
257
+
258
258
+
<p>A special path, <code>.</code>, represents <em>the record's <code>rkey</code></em>. Tangled's vouch system puts the vouch subject in the <code>rkey</code> instead of inside the actual record. Its link source looks like this: <code>sh.tangled.graph.vouch:.</code></p>
253
259
254
260
<h3>Cursor</h3>
255
261
···
4
4
edition = "2021"
5
5
6
6
[dependencies]
7
7
-
anyhow = "1.0.95"
8
7
fluent-uri = "0.3.2"
9
9
-
nom = "7.1.3"
10
8
thiserror = "2.0.9"
11
9
tinyjson = "2.5.1"
···
7
7
// for now, just working through the rules laid out in the docs in order,
8
8
// without much regard for efficiency for now.
9
9
10
10
+
// newer specs say max 2048 chars
11
11
+
if s.len() > 2048 {
12
12
+
return None;
13
13
+
}
14
14
+
10
15
// The entire URI is made up of a subset of ASCII, containing letters (A-Z, a-z),
11
16
// digits (0-9), period, underscore, colon, percent sign, or hyphen (._:%-)
12
17
if !s
···
68
73
#[cfg(test)]
69
74
mod tests {
70
75
use super::*;
76
76
+
77
77
+
#[test]
78
78
+
fn test_did_too_long() {
79
79
+
let long = concat!(
80
80
+
"did:long:zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
81
81
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
82
82
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
83
83
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
84
84
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
85
85
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
86
86
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
87
87
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
88
88
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
89
89
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
90
90
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
91
91
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
92
92
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
93
93
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
94
94
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
95
95
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
96
96
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
97
97
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
98
98
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
99
99
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
100
100
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
101
101
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
102
102
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
103
103
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
104
104
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
105
105
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
106
106
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
107
107
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
108
108
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
109
109
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
110
110
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
111
111
+
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
112
112
+
);
113
113
+
assert_eq!(parse_did(long), None);
114
114
+
}
71
115
72
116
#[test]
73
117
fn test_did_parse() {
···
120
120
assert_eq!(
121
121
parse_any_link("did:plc:44ybard66vv44zksje25o7dz"),
122
122
Some(Link::Did("did:plc:44ybard66vv44zksje25o7dz".into()))
123
123
-
)
123
123
+
);
124
124
+
125
125
+
assert_eq!(
126
126
+
parse_any_link("tel:5551234567"),
127
127
+
Some(Link::Uri("tel:5551234567".into())),
128
128
+
);
129
129
+
130
130
+
assert_eq!(parse_any_link("3jwdwj2ctlk26"), None);
131
131
+
assert_eq!(parse_any_link("self"), None);
132
132
+
assert_eq!(parse_any_link(""), None);
124
133
}
125
134
126
135
#[test]