Monorepo for Tangled tangled.org
3

Configure Feed

Select the types of activity you want to include in your feed.

1use crate::command::{self, Spec}; 2use crate::nix_config::{SPINDLE_RUN_DIR, clean_store_paths, nix_executable}; 3use crate::protocol::{Message, v1}; 4use anyhow::{Context, Result}; 5use nix::unistd::{Group, chown}; 6use serde::de::DeserializeOwned; 7use serde::{Deserialize, Serialize}; 8use std::fmt::{self, Write as FmtWrite}; 9use std::fs::{self, File, OpenOptions}; 10use std::io::{self, Read}; 11use std::net::Shutdown; 12use std::os::unix::fs::OpenOptionsExt; 13use std::sync::{Arc, Mutex}; 14use std::time::Duration; 15use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 16use tokio::sync::{Semaphore, mpsc, oneshot, watch}; 17use tokio::task::{JoinError, JoinHandle, JoinSet}; 18use tokio_vsock::{VMADDR_CID_LOCAL, VsockAddr, VsockListener, VsockStream}; 19use tracing::{info, warn}; 20 21mod read_proxy; 22mod write_proxy; 23 24pub use read_proxy::ReadCacheProxy; 25pub use write_proxy::WriteCacheProxy; 26 27const UPLOAD_QUEUE_CAPACITY: usize = 128; 28const CONNECTION_WORKERS: usize = 4; 29const CACHE_ENQUEUE_IO_TIMEOUT: Duration = Duration::from_secs(2); 30const DEFAULT_CACHE_ENQUEUE_PORT: u32 = 10241; 31const SHUTTLE_CACHE_ENQUEUE_PORT_ENV: &str = "SHUTTLE_CACHE_VSOCK_PORT"; 32const NIX_BUILD_GROUP: &str = "nixbld"; 33const SPINDLE_HOOK_TOKEN: &str = "/run/spindle/hook-token"; 34 35#[derive(Clone, Debug, Default)] 36pub struct CacheStats { 37 pub pending: u32, 38 pub active: u32, 39 pub uploaded: u32, 40 pub failed: u32, 41 pub last_error: Option<String>, 42} 43 44#[derive(Debug, Default)] 45struct CacheState { 46 stats: CacheStats, 47 enqueue_active: u32, 48 stopped: bool, 49} 50 51impl CacheState { 52 fn snapshot(&self) -> CacheSnapshot { 53 CacheSnapshot { 54 stats: self.stats.clone(), 55 enqueue_active: self.enqueue_active, 56 stopped: self.stopped, 57 } 58 } 59} 60 61#[derive(Clone, Debug, Default)] 62struct CacheSnapshot { 63 stats: CacheStats, 64 enqueue_active: u32, 65 stopped: bool, 66} 67 68impl CacheSnapshot { 69 fn is_idle(&self) -> bool { 70 self.stats.pending == 0 && self.stats.active == 0 && self.enqueue_active == 0 71 } 72} 73 74#[derive(Clone)] 75pub struct CacheUploadManager { 76 inner: Arc<CacheUploadInner>, 77} 78 79struct CacheUploadInner { 80 cmd_tx: mpsc::Sender<Cmd>, 81 stats_rx: watch::Receiver<CacheSnapshot>, 82 handles: Mutex<Vec<JoinHandle<()>>>, 83} 84 85struct UploadJob { 86 paths: Vec<String>, 87 count: u32, 88} 89 90enum Cmd { 91 EnqueueStarted, 92 EnqueueFinished, 93 Enqueue { 94 paths: Vec<String>, 95 reply: oneshot::Sender<Result<usize, String>>, 96 }, 97 UploadStarted { 98 count: u32, 99 }, 100 UploadFinished { 101 count: u32, 102 error: Option<String>, 103 }, 104 UploadWorkerStopped, 105 Stop, 106} 107 108impl CacheUploadManager { 109 pub async fn start(upload_url: &str, event_tx: mpsc::Sender<Message>) -> Result<Option<Self>> { 110 if upload_url.is_empty() { 111 // nothing to upload to, so don't require the guest-local vsock 112 // listener (vsock_loopback) or the nix post-build hook 113 info!("no cache upload url configured, cache uploads disabled"); 114 return Ok(None); 115 } 116 let token = create_hook_token().context("create cache hook token")?; 117 let port = cache_enqueue_port(); 118 let listener = VsockListener::bind(VsockAddr::new(VMADDR_CID_LOCAL, port)) 119 .with_context(|| format!("listen on guest-local vsock port {port}"))?; 120 121 let (cmd_tx, cmd_rx) = mpsc::channel::<Cmd>(UPLOAD_QUEUE_CAPACITY); 122 let (upload_tx, upload_rx) = mpsc::channel::<UploadJob>(UPLOAD_QUEUE_CAPACITY); 123 let (stats_tx, stats_rx) = watch::channel(CacheSnapshot::default()); 124 125 let mut handles = Vec::with_capacity(3); 126 127 handles.push(tokio::spawn(async move { 128 cache_manager_loop(cmd_rx, upload_tx, stats_tx).await; 129 })); 130 131 let upload_cmd_tx = cmd_tx.clone(); 132 let upload_url = upload_url.to_owned(); 133 handles.push(tokio::spawn(async move { 134 upload_loop(upload_rx, upload_cmd_tx, upload_url).await; 135 })); 136 137 let accept_cmd_tx = cmd_tx.clone(); 138 handles.push(tokio::spawn(async move { 139 accept_loop(listener, token, event_tx, accept_cmd_tx).await; 140 })); 141 142 info!( 143 port, 144 workers = CONNECTION_WORKERS, 145 "cache upload queue ready" 146 ); 147 let inner = Arc::new(CacheUploadInner { 148 cmd_tx, 149 stats_rx, 150 handles: Mutex::new(handles), 151 }); 152 Ok(Some(Self { inner })) 153 } 154 155 pub async fn drain(&self, timeout: Option<Duration>) -> CacheStats { 156 let mut stats_rx = self.inner.stats_rx.clone(); 157 let wait = async { 158 loop { 159 let snapshot = stats_rx.borrow_and_update().clone(); 160 if snapshot.is_idle() || snapshot.stopped { 161 return snapshot.stats; 162 } 163 if stats_rx.changed().await.is_err() { 164 let mut stats = stats_rx.borrow().stats.clone(); 165 stats.last_error = Some("cache manager stopped".to_owned()); 166 return stats; 167 } 168 } 169 }; 170 171 match timeout { 172 Some(timeout) => match tokio::time::timeout(timeout, wait).await { 173 Ok(stats) => stats, 174 Err(_) => { 175 let mut stats = self.inner.stats_rx.borrow().stats.clone(); 176 stats.last_error = Some("cache drain timed out".to_owned()); 177 stats 178 } 179 }, 180 None => wait.await, 181 } 182 } 183} 184 185impl Drop for CacheUploadInner { 186 fn drop(&mut self) { 187 let _ = self.cmd_tx.try_send(Cmd::Stop); 188 if let Ok(mut handles) = self.handles.lock() { 189 for handle in handles.drain(..) { 190 handle.abort(); 191 } 192 } 193 let _ = fs::remove_file(SPINDLE_HOOK_TOKEN); 194 } 195} 196 197#[derive(Debug, Deserialize, Serialize)] 198struct EnqueueBuiltPathsRequest { 199 token: String, 200 paths: Vec<String>, 201} 202 203#[derive(Debug, Deserialize, Serialize)] 204struct EnqueueBuiltPathsResponse { 205 queued: usize, 206 #[serde(default, skip_serializing_if = "String::is_empty")] 207 error: String, 208} 209 210#[derive(Debug)] 211enum JsonLineError { 212 Empty, 213 TimedOut, 214 Io(io::Error), 215 Json(serde_json::Error), 216} 217 218impl JsonLineError { 219 fn enqueue_request_message(self) -> String { 220 match self { 221 Self::Empty => "empty cache enqueue request".to_owned(), 222 Self::TimedOut => "cache enqueue read timed out".to_owned(), 223 Self::Io(error) => error.to_string(), 224 Self::Json(error) => error.to_string(), 225 } 226 } 227} 228 229impl fmt::Display for JsonLineError { 230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 231 match self { 232 Self::Empty => f.write_str("empty message"), 233 Self::TimedOut => f.write_str("timed out"), 234 Self::Io(error) => error.fmt(f), 235 Self::Json(error) => error.fmt(f), 236 } 237 } 238} 239 240async fn cache_manager_loop( 241 mut cmd_rx: mpsc::Receiver<Cmd>, 242 upload_tx: mpsc::Sender<UploadJob>, 243 stats_tx: watch::Sender<CacheSnapshot>, 244) { 245 let mut state = CacheState::default(); 246 247 let publish_state = |state: &CacheState| { 248 let _ = stats_tx.send(state.snapshot()); 249 }; 250 251 while let Some(command) = cmd_rx.recv().await { 252 match command { 253 Cmd::EnqueueStarted => { 254 state.enqueue_active += 1; 255 publish_state(&state); 256 } 257 Cmd::EnqueueFinished => { 258 decrement_counter(&mut state.enqueue_active, 1, "cache enqueue active"); 259 publish_state(&state); 260 } 261 Cmd::Enqueue { paths, reply } => { 262 let result = enqueue_upload_job(&mut state, &upload_tx, paths); 263 publish_state(&state); 264 let _ = reply.send(result); 265 } 266 Cmd::UploadStarted { count } => { 267 decrement_counter(&mut state.stats.pending, count, "cache pending uploads"); 268 state.stats.active += count; 269 publish_state(&state); 270 } 271 Cmd::UploadFinished { count, error } => { 272 decrement_counter(&mut state.stats.active, count, "cache active uploads"); 273 if let Some(error) = error { 274 state.stats.failed += count; 275 state.stats.last_error = Some(error); 276 } else { 277 state.stats.uploaded += count; 278 } 279 publish_state(&state); 280 } 281 Cmd::UploadWorkerStopped => { 282 state.stopped = true; 283 publish_state(&state); 284 } 285 Cmd::Stop => { 286 state.stopped = true; 287 publish_state(&state); 288 break; 289 } 290 } 291 } 292 293 state.stopped = true; 294 publish_state(&state); 295} 296 297fn enqueue_upload_job( 298 state: &mut CacheState, 299 upload_tx: &mpsc::Sender<UploadJob>, 300 paths: Vec<String>, 301) -> Result<usize, String> { 302 if state.stopped { 303 return Err("cache manager stopped".to_owned()); 304 } 305 306 let count = paths.len() as u32; 307 if count == 0 { 308 return Ok(0); 309 } 310 311 match upload_tx.try_send(UploadJob { paths, count }) { 312 Ok(()) => { 313 state.stats.pending += count; 314 Ok(count as usize) 315 } 316 Err(mpsc::error::TrySendError::Full(_)) => Err("cache upload queue is full".to_owned()), 317 Err(mpsc::error::TrySendError::Closed(_)) => { 318 state.stopped = true; 319 Err("cache upload worker stopped".to_owned()) 320 } 321 } 322} 323 324fn decrement_counter(counter: &mut u32, count: u32, name: &'static str) { 325 match counter.checked_sub(count) { 326 Some(value) => *counter = value, 327 None => { 328 warn!(name, current = *counter, count, "cache counter underflow"); 329 *counter = 0; 330 } 331 } 332} 333 334async fn upload_loop( 335 mut upload_rx: mpsc::Receiver<UploadJob>, 336 cmd_tx: mpsc::Sender<Cmd>, 337 upload_url: String, 338) { 339 while let Some(job) = upload_rx.recv().await { 340 let cmd = Cmd::UploadStarted { count: job.count }; 341 if cmd_tx.send(cmd).await.is_err() { 342 break; 343 } 344 345 let error = upload_paths(&upload_url, &job.paths) 346 .await 347 .err() 348 .map(|error| error.to_string()); 349 350 let cmd = Cmd::UploadFinished { 351 count: job.count, 352 error, 353 }; 354 if cmd_tx.send(cmd).await.is_err() { 355 break; 356 } 357 } 358 359 let _ = cmd_tx.send(Cmd::UploadWorkerStopped).await; 360} 361 362// runs nix copy against the write cache proxy, which goes to the spindle 363// and spindle will then forward the request to the actual binary cache 364async fn upload_paths(upload_url: &str, paths: &[String]) -> Result<()> { 365 fn add_query_param(url: &str, key: &str, value: &str) -> String { 366 let separator = url.contains('?').then_some('&').unwrap_or('?'); 367 format!("{}{}{}={}", url, separator, key, value) 368 } 369 370 if paths.is_empty() || upload_url.is_empty() { 371 return Ok(()); 372 } 373 374 // we use zstd 3 because its the best usually. it is faster than no 375 // compression also because of IO savings 376 let dest_url = add_query_param(upload_url, "compression", "zstd"); 377 let dest_url = add_query_param(&dest_url, "compression-level", "3"); 378 let dest_url = add_query_param(&dest_url, "parallel-compression", "true"); 379 380 let spec = Spec::new(nix_executable()) 381 .args(["copy", "--to", &dest_url]) 382 .args(paths.iter().cloned()) 383 .timeout(Duration::from_secs(10 * 60)); 384 385 let output = command::run_capture(spec).await.context("run nix copy")?; 386 if !output.success() { 387 anyhow::bail!( 388 "nix copy failed: exit={} error={:?} output={}", 389 output.exit.exit_code, 390 output.exit.error, 391 output.combined_lossy(), 392 ); 393 } 394 395 info!(paths = paths.len(), %upload_url, "uploaded cache paths"); 396 Ok(()) 397} 398 399async fn accept_loop( 400 listener: VsockListener, 401 token: String, 402 event_tx: mpsc::Sender<Message>, 403 cmd_tx: mpsc::Sender<Cmd>, 404) { 405 let permits = Arc::new(Semaphore::new(CONNECTION_WORKERS)); 406 let mut tasks = JoinSet::new(); 407 loop { 408 tokio::select! { 409 accepted = listener.accept() => match accepted { 410 Ok((conn, _addr)) => { 411 let Ok(permit) = permits.clone().try_acquire_owned() else { 412 tasks.spawn(async move { 413 let mut conn = conn; 414 write_enqueue_response( 415 &mut conn, 416 0, 417 Some("cache enqueue workers are busy".to_owned()), 418 ) 419 .await; 420 }); 421 warn!("cache enqueue dropped because workers are busy"); 422 continue; 423 }; 424 425 let worker_cmd_tx = cmd_tx.clone(); 426 if let Err(error) = start_enqueue_request(&worker_cmd_tx) { 427 tasks.spawn(async move { 428 let mut conn = conn; 429 write_enqueue_response(&mut conn, 0, Some(error)).await; 430 }); 431 continue; 432 } 433 434 let worker_token = token.clone(); 435 let worker_event_tx = event_tx.clone(); 436 tasks.spawn(async move { 437 let _permit = permit; 438 handle_enqueue_conn( 439 conn, 440 &worker_token, 441 &worker_event_tx, 442 &worker_cmd_tx, 443 ) 444 .await; 445 let _ = worker_cmd_tx.send(Cmd::EnqueueFinished).await; 446 }); 447 } 448 Err(error) => { 449 if error.kind() != io::ErrorKind::Interrupted { 450 warn!(%error, "cache enqueue accept failed"); 451 } 452 } 453 }, 454 Some(result) = tasks.join_next(), if !tasks.is_empty() => { 455 log_enqueue_task_result(result); 456 } 457 } 458 } 459} 460 461fn log_enqueue_task_result(result: Result<(), JoinError>) { 462 if let Err(error) = result { 463 warn!(%error, "cache enqueue task failed"); 464 } 465} 466 467async fn handle_enqueue_conn( 468 mut conn: VsockStream, 469 expected_token: &str, 470 event_tx: &mpsc::Sender<Message>, 471 cmd_tx: &mpsc::Sender<Cmd>, 472) { 473 let req: EnqueueBuiltPathsRequest = match read_enqueue_request(&mut conn).await { 474 Ok(req) => req, 475 Err(error) => { 476 write_enqueue_response(&mut conn, 0, Some(error)).await; 477 return; 478 } 479 }; 480 481 if req.token != expected_token { 482 write_enqueue_response(&mut conn, 0, Some("invalid cache enqueue token".to_owned())).await; 483 return; 484 } 485 486 match enqueue_paths(cmd_tx, req.paths).await { 487 Ok(queued) => { 488 send_built_paths_event(event_tx, queued.event_paths).await; 489 write_enqueue_response(&mut conn, queued.count, None).await; 490 } 491 Err(error) => write_enqueue_response(&mut conn, 0, Some(error)).await, 492 } 493} 494 495fn start_enqueue_request(cmd_tx: &mpsc::Sender<Cmd>) -> Result<(), String> { 496 match cmd_tx.try_send(Cmd::EnqueueStarted) { 497 Ok(()) => Ok(()), 498 Err(mpsc::error::TrySendError::Full(_)) => Err("cache upload queue is full".to_owned()), 499 Err(mpsc::error::TrySendError::Closed(_)) => Err("cache upload worker stopped".to_owned()), 500 } 501} 502 503async fn read_enqueue_request(conn: &mut VsockStream) -> Result<EnqueueBuiltPathsRequest, String> { 504 read_json_line(conn) 505 .await 506 .map_err(JsonLineError::enqueue_request_message) 507} 508 509async fn read_json_line<T>(conn: &mut VsockStream) -> Result<T, JsonLineError> 510where 511 T: DeserializeOwned, 512{ 513 let mut data = Vec::new(); 514 let mut reader = BufReader::new(conn); 515 let bytes_read = tokio::time::timeout( 516 CACHE_ENQUEUE_IO_TIMEOUT, 517 reader.read_until(b'\n', &mut data), 518 ) 519 .await 520 .map_err(|_| JsonLineError::TimedOut)? 521 .map_err(JsonLineError::Io)?; 522 523 if bytes_read == 0 { 524 return Err(JsonLineError::Empty); 525 } 526 527 serde_json::from_slice(&data).map_err(JsonLineError::Json) 528} 529 530struct QueuedPaths { 531 count: usize, 532 event_paths: Vec<String>, 533} 534 535async fn enqueue_paths( 536 cmd_tx: &mpsc::Sender<Cmd>, 537 paths: Vec<String>, 538) -> Result<QueuedPaths, String> { 539 let paths = clean_store_paths(&paths); 540 let event_paths = paths.clone(); 541 if paths.is_empty() { 542 return Ok(QueuedPaths { 543 count: 0, 544 event_paths, 545 }); 546 } 547 548 let (reply, queued) = oneshot::channel(); 549 match cmd_tx.try_send(Cmd::Enqueue { paths, reply }) { 550 Ok(()) => match queued.await { 551 Ok(Ok(count)) => Ok(QueuedPaths { count, event_paths }), 552 Ok(Err(error)) => Err(error), 553 Err(_) => Err("cache upload worker stopped".to_owned()), 554 }, 555 Err(mpsc::error::TrySendError::Full(_)) => Err("cache upload queue is full".to_owned()), 556 Err(mpsc::error::TrySendError::Closed(_)) => Err("cache upload worker stopped".to_owned()), 557 } 558} 559 560async fn send_built_paths_event(event_tx: &mpsc::Sender<Message>, paths: Vec<String>) { 561 if paths.is_empty() { 562 return; 563 } 564 565 let msg = Message { 566 id: "built-paths".to_owned(), 567 built_paths: Some(v1::BuiltPaths { 568 paths, 569 reason: "post_build_hook".to_owned(), 570 }), 571 ..Default::default() 572 }; 573 let _ = event_tx.send(msg).await; 574} 575 576async fn write_enqueue_response(conn: &mut VsockStream, queued: usize, error: Option<String>) { 577 let response = EnqueueBuiltPathsResponse { 578 queued, 579 error: error.unwrap_or_default(), 580 }; 581 let _ = write_json_line(conn, &response).await; 582} 583 584async fn write_json_line<T>(conn: &mut VsockStream, value: &T) -> Result<(), JsonLineError> 585where 586 T: Serialize + ?Sized, 587{ 588 let data = serde_json::to_vec(value).map_err(JsonLineError::Json)?; 589 tokio::time::timeout(CACHE_ENQUEUE_IO_TIMEOUT, async { 590 conn.write_all(&data).await?; 591 conn.write_all(b"\n").await?; 592 VsockStream::shutdown(conn, Shutdown::Write) 593 }) 594 .await 595 .map_err(|_| JsonLineError::TimedOut)? 596 .map_err(JsonLineError::Io) 597} 598 599// we use a loopback vsock here since its better than having to do the whole http song and dance! 600pub async fn enqueue_built_paths(paths: &[String]) { 601 let paths = clean_store_paths(paths); 602 if paths.is_empty() { 603 return; 604 } 605 606 let token = match read_hook_token() { 607 Ok(token) => token, 608 Err(_) => return, 609 }; 610 611 if token.is_empty() { 612 return; 613 } 614 615 let mut conn = 616 match VsockStream::connect(VsockAddr::new(VMADDR_CID_LOCAL, cache_enqueue_port())).await { 617 Ok(conn) => conn, 618 Err(error) => { 619 warn!(paths = paths.len(), %error, "cache enqueue unavailable"); 620 return; 621 } 622 }; 623 624 let request = EnqueueBuiltPathsRequest { token, paths }; 625 match write_json_line(&mut conn, &request).await { 626 Ok(()) => {} 627 Err(JsonLineError::Json(error)) => { 628 warn!(%error, "cache enqueue encode failed"); 629 return; 630 } 631 Err(JsonLineError::TimedOut) => { 632 warn!("cache enqueue write timed out"); 633 return; 634 } 635 Err(error) => { 636 warn!(%error, "cache enqueue write failed"); 637 return; 638 } 639 } 640 641 let response: EnqueueBuiltPathsResponse = match read_json_line(&mut conn).await { 642 Ok(response) => response, 643 Err(JsonLineError::Empty) => { 644 warn!("cache enqueue ack was empty"); 645 return; 646 } 647 Err(JsonLineError::TimedOut) => { 648 warn!("cache enqueue ack timed out"); 649 return; 650 } 651 Err(error) => { 652 warn!(%error, "cache enqueue ack failed"); 653 return; 654 } 655 }; 656 657 if !response.error.is_empty() { 658 warn!(error = %response.error, "cache enqueue rejected"); 659 return; 660 } 661 662 info!(queued = response.queued, "cache paths enqueued"); 663} 664 665fn cache_enqueue_port() -> u32 { 666 std::env::var(SHUTTLE_CACHE_ENQUEUE_PORT_ENV) 667 .ok() 668 .and_then(|value| value.parse().ok()) 669 .unwrap_or(DEFAULT_CACHE_ENQUEUE_PORT) 670} 671 672fn create_hook_token() -> Result<String> { 673 use std::io::Write; 674 675 fs::create_dir_all(SPINDLE_RUN_DIR).with_context(|| format!("create {SPINDLE_RUN_DIR}"))?; 676 let token = random_token().context("generate hook token")?; 677 let mut file = OpenOptions::new() 678 .create(true) 679 .truncate(true) 680 .write(true) 681 .mode(0o640) 682 .open(SPINDLE_HOOK_TOKEN) 683 .with_context(|| format!("create {SPINDLE_HOOK_TOKEN}"))?; 684 allow_nix_build_group(SPINDLE_HOOK_TOKEN)?; 685 file.write_all(token.as_bytes()) 686 .with_context(|| format!("write {SPINDLE_HOOK_TOKEN}"))?; 687 file.write_all(b"\n") 688 .with_context(|| format!("write {SPINDLE_HOOK_TOKEN}"))?; 689 Ok(token) 690} 691 692fn allow_nix_build_group(path: &str) -> Result<()> { 693 let Some(group) = 694 Group::from_name(NIX_BUILD_GROUP).with_context(|| format!("lookup {NIX_BUILD_GROUP}"))? 695 else { 696 warn!( 697 group = NIX_BUILD_GROUP, 698 "nix build group not found; cache hook token remains root-only" 699 ); 700 return Ok(()); 701 }; 702 703 chown(path, None, Some(group.gid)).with_context(|| format!("chown {path} to {NIX_BUILD_GROUP}")) 704} 705 706fn read_hook_token() -> Result<String> { 707 fs::read_to_string(SPINDLE_HOOK_TOKEN) 708 .map(|token| token.trim().to_owned()) 709 .with_context(|| format!("read {SPINDLE_HOOK_TOKEN}")) 710} 711 712fn random_token() -> Result<String> { 713 let mut bytes = [0_u8; 32]; 714 File::open("/dev/urandom") 715 .context("open /dev/urandom")? 716 .read_exact(&mut bytes) 717 .context("read /dev/urandom")?; 718 719 let mut token = String::with_capacity(bytes.len() * 2); 720 for byte in bytes { 721 write!(&mut token, "{byte:02x}").unwrap(); 722 } 723 Ok(token) 724}