Monorepo for Tangled tangled.org
6

Configure Feed

Select the types of activity you want to include in your feed.

1package spindle 2 3import ( 4 "context" 5 "crypto/rand" 6 "encoding/hex" 7 "errors" 8 "fmt" 9 "log/slog" 10 "net" 11 "net/http" 12 "strings" 13 "time" 14 15 "github.com/bluesky-social/indigo/service/tap" 16 "tangled.org/core/api/tangled" 17 "tangled.org/core/spindle/config" 18) 19 20func randomAdminPassword() (string, error) { 21 var b [32]byte 22 if _, err := rand.Read(b[:]); err != nil { 23 return "", fmt.Errorf("generate tap admin password: %w", err) 24 } 25 return hex.EncodeToString(b[:]), nil 26} 27 28func assertLoopbackBind(bind string) error { 29 host, _, err := net.SplitHostPort(bind) 30 if err != nil { 31 return fmt.Errorf("parse tap bind %q: %w", bind, err) 32 } 33 if host == "" { 34 return fmt.Errorf("embedded mode requires loopback host in tap bind %q", bind) 35 } 36 if strings.EqualFold(host, "localhost") { 37 return nil 38 } 39 ip := net.ParseIP(host) 40 if ip == nil || !ip.IsLoopback() { 41 return fmt.Errorf("embedded tap bind %q must be loopback like 127.0.0.1 or ::1", bind) 42 } 43 return nil 44} 45 46type embeddedTap struct { 47 tap *tap.Tap 48 logger *slog.Logger 49} 50 51func startEmbeddedTap(ctx context.Context, cfg *config.Config, logger *slog.Logger) (*embeddedTap, error) { 52 if err := assertLoopbackBind(cfg.Server.Tap.Bind); err != nil { 53 return nil, err 54 } 55 56 tcfg := tap.Config{ 57 DatabaseURL: "sqlite://" + cfg.Server.Tap.DBPath, 58 DBMaxConns: 32, 59 PLCURL: cfg.Server.PlcUrl, 60 RelayUrl: cfg.Server.Tap.RelayUrl, 61 FirehoseParallelism: 4, 62 ResyncParallelism: 2, 63 OutboxParallelism: 1, 64 FirehoseCursorSaveInterval: time.Second, 65 RepoFetchTimeout: 5 * time.Minute, 66 IdentityCacheSize: 50_000, 67 EventCacheSize: 10_000, 68 SignalCollection: tangled.RepoNSID, 69 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID}, 70 AdminPassword: cfg.Server.Tap.AdminPassword, 71 RetryTimeout: 60 * time.Second, 72 } 73 74 t, err := tap.New(tcfg) 75 if err != nil { 76 return nil, fmt.Errorf("tap.New: %w", err) 77 } 78 79 go func() { 80 if err := t.Firehose.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { 81 logger.Error("firehose terminated", "err", err) 82 } 83 }() 84 t.Run(ctx) 85 go func() { 86 logger.Info("tap http server listening", "bind", cfg.Server.Tap.Bind) 87 if err := t.Server.Start(cfg.Server.Tap.Bind); err != nil && !errors.Is(err, http.ErrServerClosed) { 88 logger.Error("tap http server terminated", "err", err) 89 } 90 }() 91 92 if err := waitForListener(ctx, cfg.Server.Tap.Bind, time.Now().Add(10*time.Second)); err != nil { 93 logger.Warn("tap http server unreachable before timeout", "bind", cfg.Server.Tap.Bind, "err", err) 94 } 95 96 return &embeddedTap{tap: t, logger: logger}, nil 97} 98 99func waitForListener(ctx context.Context, addr string, deadline time.Time) error { 100 if ctx.Err() != nil { 101 return ctx.Err() 102 } 103 if time.Now().After(deadline) { 104 return fmt.Errorf("timed out waiting for %s", addr) 105 } 106 c, err := net.DialTimeout("tcp", addr, 100*time.Millisecond) 107 if err == nil { 108 c.Close() 109 return nil 110 } 111 time.Sleep(50 * time.Millisecond) 112 return waitForListener(ctx, addr, deadline) 113} 114 115func (e *embeddedTap) Shutdown() { 116 if e == nil || e.tap == nil { 117 return 118 } 119 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 120 defer cancel() 121 if err := e.tap.Server.Shutdown(shutdownCtx); err != nil { 122 e.logger.Error("tap server shutdown failed", "err", err) 123 } 124 if err := e.tap.CloseDb(shutdownCtx); err != nil { 125 e.logger.Error("tap db close failed", "err", err) 126 } 127}