Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/hex"
7 "errors"
8 "fmt"
9 "log/slog"
10 "net"
11 "net/http"
12 "strings"
13 "sync/atomic"
14 "time"
15
16 "github.com/bluesky-social/indigo/service/tap"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/spindle/config"
19)
20
21func randomAdminPassword() (string, error) {
22 var b [32]byte
23 if _, err := rand.Read(b[:]); err != nil {
24 return "", fmt.Errorf("generate tap admin password: %w", err)
25 }
26 return hex.EncodeToString(b[:]), nil
27}
28
29func assertLoopbackBind(bind string) error {
30 host, _, err := net.SplitHostPort(bind)
31 if err != nil {
32 return fmt.Errorf("parse tap bind %q: %w", bind, err)
33 }
34 if host == "" {
35 return fmt.Errorf("embedded mode requires loopback host in tap bind %q", bind)
36 }
37 if strings.EqualFold(host, "localhost") {
38 return nil
39 }
40 ip := net.ParseIP(host)
41 if ip == nil || !ip.IsLoopback() {
42 return fmt.Errorf("embedded tap bind %q must be loopback like 127.0.0.1 or ::1", bind)
43 }
44 return nil
45}
46
47type embeddedTap struct {
48 tap *tap.Tap
49 logger *slog.Logger
50 closed atomic.Bool
51}
52
53func startEmbeddedTap(ctx context.Context, cfg *config.Config, logger *slog.Logger) (*embeddedTap, error) {
54 if err := assertLoopbackBind(cfg.Server.Tap.Bind); err != nil {
55 return nil, err
56 }
57
58 tcfg := tap.Config{
59 DatabaseURL: "sqlite://" + cfg.Server.Tap.DBPath,
60 DBMaxConns: 32,
61 PLCURL: cfg.Server.PlcUrl,
62 RelayUrl: cfg.Server.Tap.RelayUrl,
63 FirehoseParallelism: 4,
64 ResyncParallelism: 2,
65 OutboxParallelism: 1,
66 FirehoseCursorSaveInterval: time.Second,
67 RepoFetchTimeout: 5 * time.Minute,
68 IdentityCacheSize: 50_000,
69 EventCacheSize: 10_000,
70 SignalCollection: tangled.RepoNSID,
71 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID},
72 AdminPassword: cfg.Server.Tap.AdminPassword,
73 RetryTimeout: 60 * time.Second,
74 }
75
76 t, err := tap.New(tcfg)
77 if err != nil {
78 return nil, fmt.Errorf("tap.New: %w", err)
79 }
80
81 go func() {
82 if err := t.Firehose.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
83 logger.Error("firehose terminated", "err", err)
84 }
85 }()
86 t.Run(ctx)
87 go func() {
88 logger.Info("tap http server listening", "bind", cfg.Server.Tap.Bind)
89 if err := t.Server.Start(cfg.Server.Tap.Bind); err != nil && !errors.Is(err, http.ErrServerClosed) {
90 logger.Error("tap http server terminated", "err", err)
91 }
92 }()
93
94 if err := waitForListener(ctx, cfg.Server.Tap.Bind, time.Now().Add(10*time.Second)); err != nil {
95 logger.Warn("tap http server unreachable before timeout", "bind", cfg.Server.Tap.Bind, "err", err)
96 }
97
98 return &embeddedTap{tap: t, logger: logger}, nil
99}
100
101func waitForListener(ctx context.Context, addr string, deadline time.Time) error {
102 if ctx.Err() != nil {
103 return ctx.Err()
104 }
105 if time.Now().After(deadline) {
106 return fmt.Errorf("timed out waiting for %s", addr)
107 }
108 c, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
109 if err == nil {
110 c.Close()
111 return nil
112 }
113 time.Sleep(50 * time.Millisecond)
114 return waitForListener(ctx, addr, deadline)
115}
116
117func (e *embeddedTap) Shutdown() {
118 if e == nil || e.tap == nil {
119 return
120 }
121 if e.closed.Swap(true) {
122 return
123 }
124 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
125 defer cancel()
126 if err := e.tap.Server.Shutdown(shutdownCtx); err != nil {
127 e.logger.Error("tap server shutdown failed", "err", err)
128 }
129 if err := e.tap.CloseDb(shutdownCtx); err != nil {
130 e.logger.Error("tap db close failed", "err", err)
131 }
132}