···
48
48
tokio::select! {
49
49
v = serving => eprintln!("serving ended: {v:?}"),
50
50
v = storage.receive(batches) => eprintln!("storage consumer ended: {v:?}"),
51
51
+
v = storage.rw_loop() => eprintln!("storage rw-loop ended: {v:?}"),
51
52
};
52
53
53
54
println!("bye!");
···
2
2
use crate::store_types::{
3
3
ByCollectionKey, ByCollectionValue, ByCursorSeenKey, ByCursorSeenValue, ByIdKey, ByIdValue,
4
4
JetstreamCursorKey, JetstreamCursorValue, JetstreamEndpointKey, JetstreamEndpointValue,
5
5
-
ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemValue,
5
5
+
ModCursorKey, ModCursorValue, ModQueueItemKey, ModQueueItemPrefix, ModQueueItemStringValue,
6
6
+
ModQueueItemValue,
6
7
};
7
8
use crate::{CollectionSamples, CreateRecord, DeleteAccount, EventBatch, ModifyRecord, Nsid};
8
9
use fjall::{
···
86
87
Ok((me, js_cursor))
87
88
}
88
89
90
90
+
/// Jetstream event batch receiver: writes without any reads
91
91
+
///
92
92
+
/// Events that require reads like record updates or delets are written to a queue
89
93
pub async fn receive(&self, mut receiver: Receiver<EventBatch>) -> anyhow::Result<()> {
94
94
+
// TODO: see rw_loop: enforce single-thread.
90
95
loop {
91
96
let t_sleep = Instant::now();
92
97
sleep(Duration::from_secs_f64(0.3)).await; // TODO: minimize during replay
···
116
121
} else {
117
122
anyhow::bail!("receive channel closed");
118
123
}
124
124
+
}
125
125
+
}
126
126
+
127
127
+
/// Read-write loop reads from the queue for record-modifying events and does rollups
128
128
+
pub async fn rw_loop(&self) -> anyhow::Result<()> {
129
129
+
// TODO: lock so that only one rw loop can possibly be run. or even better, take a mutable resource thing to enforce at compile time.
130
130
+
loop {
131
131
+
sleep(Duration::from_secs_f64(1.)).await;
132
132
+
let _keyspace = self.partition.clone();
133
133
+
let partition = self.partition.clone();
134
134
+
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
135
135
+
let prefix = ModQueueItemPrefix::default().to_db_bytes()?;
136
136
+
let Some(pair) = partition.prefix(prefix).next() else {
137
137
+
eprintln!("mod queue empty.");
138
138
+
return Ok(())
139
139
+
};
140
140
+
let (key_bytes, val_bytes) = pair?;
141
141
+
let _mod_cursor: Cursor = db_complete::<ModQueueItemKey>(&key_bytes)?.into();
142
142
+
let mod_value: ModQueueItemValue = db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?;
143
143
+
match mod_value {
144
144
+
ModQueueItemValue::DeleteAccount(did) => {
145
145
+
eprintln!("delete account: {did:?} (not yet implemented)");
146
146
+
},
147
147
+
ModQueueItemValue::DeleteRecord(did, collection, rkey) => {
148
148
+
eprintln!("delete record: {did:?} {collection:?} {rkey:?} (not yet implemented)");
149
149
+
},
150
150
+
ModQueueItemValue::UpdateRecord(did, collection, rkey, record) => {
151
151
+
eprintln!("update record: {did:?} {collection:?} {rkey:?} {record:?} (not yet implemented)");
152
152
+
},
153
153
+
}
154
154
+
Ok(())
155
155
+
}).await??;
119
156
}
120
157
}
121
158
···
187
187
"mod_queue"
188
188
}
189
189
}
190
190
-
type ModQueueItemPrefix = DbStaticStr<_ModQueueItemStaticStr>;
190
190
+
pub type ModQueueItemPrefix = DbStaticStr<_ModQueueItemStaticStr>;
191
191
/// key format: ["mod_queue"|js_cursor]
192
192
pub type ModQueueItemKey = DbConcat<ModQueueItemPrefix, Cursor>;
193
193
impl ModQueueItemKey {