Browse Source

Persist the WebSub state to disk periodically and reload on startup

pull/867/head
James Mills 2 months ago
parent
commit
74d8b81c37
Signed by: prologic
GPG Key ID: AC4C014F1440EBD6
  1. 1
      .gitignore
  2. 91
      internal/indieweb/websub.go
  3. 23
      internal/server.go

1
.gitignore vendored

@ -30,6 +30,7 @@
/data/msgs
/data/cache
/data/logo.*
/data/*.json
/data/blogscache
/data/msgscache
/data/feedsources

91
internal/indieweb/websub.go

@ -10,6 +10,7 @@ import (
"io"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
@ -25,6 +26,15 @@ const (
defaultWebSubQueueSize = 100
)
func fileExists(fn string) bool {
if _, err := os.Stat(fn); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
func generateRandomChallengeString() string {
b := make([]byte, 16)
_, _ = rand.Read(b)
@ -126,6 +136,7 @@ type WebSubStats struct {
type WebSub struct {
sync.RWMutex
fn string
endpoint string
// WebSub Subscribers to this Hub
@ -146,6 +157,9 @@ type WebSub struct {
verify chan *verification
verifyTicker *time.Ticker
cleanupTicker *time.Ticker
stateTicker *time.Ticker
// Notify is the callback called when processing inbound notifications requests
// from a hub we're subscribed to as a client for a given topic
Notify func(topic string) error
@ -156,8 +170,9 @@ type WebSub struct {
ValidateTopic func(topic string) bool
}
func NewWebSub(endpoint string) *WebSub {
func NewWebSub(fn, endpoint string) *WebSub {
ws := &WebSub{
fn: fn,
endpoint: endpoint,
subscriptions: make(map[string]*Subscription),
subscribers: make(map[string]Subscribers),
@ -190,16 +205,74 @@ func NewWebSub(endpoint string) *WebSub {
}
}()
ws.cleanupTicker = time.NewTicker(5 * time.Minute)
go func() {
c := time.Tick(5 * time.Minute)
for range c {
for range ws.cleanupTicker.C {
ws.cleanup()
}
}()
ws.stateTicker = time.NewTicker(5 * time.Minute)
go func() {
for range ws.stateTicker.C {
ws.Save()
}
}()
return ws
}
func (ws *WebSub) Load() error {
if !fileExists(ws.fn) {
return nil
}
ws.Lock()
defer ws.Unlock()
state := struct {
Subscribers map[string]Subscribers
Subscriptions map[string]*Subscription
}{}
data, err := os.ReadFile(ws.fn)
if err != nil {
os.Remove(ws.fn)
return fmt.Errorf("error loading state: %w", err)
}
if err := json.Unmarshal(data, &state); err != nil {
os.Remove(ws.fn)
return fmt.Errorf("error deserializing state: %w", err)
}
ws.subscribers = state.Subscribers
ws.subscriptions = state.Subscriptions
return nil
}
func (ws *WebSub) Save() error {
state := struct {
Subscribers map[string]Subscribers
Subscriptions map[string]*Subscription
}{
Subscribers: ws.subscribers,
Subscriptions: ws.subscriptions,
}
data, err := json.Marshal(state)
if err != nil {
return fmt.Errorf("error serializing state: %s", err)
}
if err := os.WriteFile(ws.fn, data, 0644); err != nil {
return fmt.Errorf("error saving state: %w", err)
}
return nil
}
func (ws *WebSub) cleanup() {
ws.Lock()
defer ws.Unlock()
@ -526,10 +599,14 @@ func (ws *WebSub) DebugEndpoint(w http.ResponseWriter, r *http.Request) {
ws.RLock()
defer ws.RUnlock()
doc := map[string]interface{}{
"endpoint": ws.endpoint,
"subscribers": ws.subscribers,
"subscriptions": ws.subscriptions,
doc := struct {
Endpoint string
Subscribers map[string]Subscribers
Subscriptions map[string]*Subscription
}{
Endpoint: ws.endpoint,
Subscribers: ws.subscribers,
Subscriptions: ws.subscriptions,
}
data, err := json.Marshal(doc)

23
internal/server.go

@ -151,6 +151,7 @@ func (s *Server) AddShutdownHook(f func()) {
// Shutdown ...
func (s *Server) Shutdown(ctx context.Context) error {
websub.Save()
s.cron.Stop()
s.tasks.Stop()
@ -615,8 +616,15 @@ func (s *Server) processNotification(topic string) error {
return nil
}
func (s *Server) setupWebSub() {
websub = indieweb.NewWebSub(fmt.Sprintf("%s/websub", s.config.BaseURL))
func (s *Server) setupWebSub() error {
fn := filepath.Join(s.config.Data, "websub.json")
endpoint := fmt.Sprintf("%s/websub", s.config.BaseURL)
websub = indieweb.NewWebSub(fn, endpoint)
if err := websub.Load(); err != nil {
log.WithError(err).Warnf("error loading websub state")
}
websub.Notify = s.processNotification
websub.ValidateTopic = func(topic string) bool {
u, err := url.Parse(topic)
@ -646,6 +654,8 @@ func (s *Server) setupWebSub() {
return false
}
return nil
}
func (s *Server) setupJobs() error {
@ -1077,6 +1087,12 @@ func NewServer(bind string, options ...Option) (*Server, error) {
server.AppendTwt = AppendTwtFactory(config, cache, db)
server.FilterTwts = FilterTwtsFactory(config)
if err := server.setupWebSub(); err != nil {
log.WithError(err).Error("error setting up websub")
return nil, err
}
log.Infof("started websub processor")
if err := server.setupJobs(); err != nil {
log.WithError(err).Error("error setting up background jobs")
return nil, err
@ -1090,9 +1106,6 @@ func NewServer(bind string, options ...Option) (*Server, error) {
server.setupWebMentions()
log.Infof("started webmentions processor")
server.setupWebSub()
log.Infof("started websub processor")
server.setupMetrics()
log.Infof("serving metrics endpoint at %s/metrics", server.config.BaseURL)

Loading…
Cancel
Save