Browse Source

Refactor IPP

James Mills 1 week ago
parent
commit
5d1d8dfc97
Signed by: prologic GPG Key ID: AC4C014F1440EBD6
  1. 110
      internal/interpod_protocol.go
  2. 6
      internal/server.go

110
internal/interpod_protocol.go

@ -3,6 +3,7 @@ package internal
import (
"net/http"
"net/url"
"strings"
"sync"
"time"
@ -19,42 +20,82 @@ const (
// IPPStore ...
type IPPStore struct {
sync.RWMutex
subscribers map[string]bool
subscribers map[string]bool
subscriptions map[*Peer]bool
}
// Init ...
func (i *IPPStore) Init() {
i.Lock()
defer i.Unlock()
i.subscribers = make(map[string]bool)
// NewIPPStore ...
func NewIPPStore() *IPPStore {
return &IPPStore{
subscribers: make(map[string]bool),
subscriptions: make(map[*Peer]bool),
}
}
// Add adds a subscriber to the store.
func (i *IPPStore) Add(url string) {
// AddSubscriber adds a subscriber to the store.
func (i *IPPStore) AddSubscriber(url string) {
i.Lock()
defer i.Unlock()
i.subscribers[url] = true
}
// Remove removes a subscriber from the store.
func (i *IPPStore) Remove(url string) {
// RemoveSubscriber removes a subscriber from the store.
func (i *IPPStore) RemoveSubscriber(url string) {
i.Lock()
defer i.Unlock()
delete(i.subscribers, url)
}
// Get returns a list of all subscribers.
func (i *IPPStore) Get() map[string]bool {
// GetSubscribers returns a list of all subscribers.
func (i *IPPStore) GetSubscribers() map[string]bool {
subscribers := make(map[string]bool)
i.RLock()
defer i.RUnlock()
return i.subscribers
for k, v := range i.subscribers {
subscribers[k] = v
}
i.RUnlock()
return subscribers
}
// NewIPPStore ...
func NewIPPStore() *IPPStore {
var store IPPStore
store.Init()
return &store
// AddSubscription adds a subscription (Pod subscribed to) to the store.
func (i *IPPStore) AddSubscription(peer *Peer) {
i.Lock()
defer i.Unlock()
i.subscriptions[peer] = true
}
// RemoveSubscription removes a subscription (Pod subscribed to) from the store.
func (i *IPPStore) RemoveSubscription(peer *Peer) {
i.Lock()
defer i.Unlock()
delete(i.subscriptions, peer)
}
// GetSubscriptions returns a list of all subscriptions (Peerings pods subscribed to)
func (i *IPPStore) GetSubscriptions() map[*Peer]bool {
subscriptions := make(map[*Peer]bool)
i.RLock()
for k, v := range i.subscriptions {
subscriptions[k] = v
}
i.RUnlock()
return subscriptions
}
// IsSubscribedTo returns true if this pod is subscribed to the given `pod`.
func (i *IPPStore) IsSubscribedTo(peer *Peer) bool {
i.RLock()
defer i.RUnlock()
if _, found := i.subscriptions[peer]; found {
return true
}
return false
}
// IPPPubHandler handles publish events received from peer pods.
@ -149,7 +190,7 @@ func (s *Server) IPPSubHandler() httprouter.Handle {
w.WriteHeader(http.StatusOK)
w.Write([]byte(http.StatusText(http.StatusOK)))
s.subscribers.Add(callback)
s.ippStore.AddSubscriber(callback)
}
}
@ -166,7 +207,7 @@ func (s *Server) PublishIPP(twter types.Twter) {
}
// Send a publish event to all subscribers.
for sub := range s.subscribers.Get() {
for sub := range s.ippStore.GetSubscribers() {
go func(sub string) {
req, _ := http.NewRequest(http.MethodPost, sub, nil)
req.Header.Set("x-ipp-uri", sub)
@ -180,7 +221,7 @@ func (s *Server) PublishIPP(twter types.Twter) {
// 2) If we sent a bad IPP request
// 3) The receiver wasn't a pod at all
if resp.StatusCode != http.StatusAccepted {
s.subscribers.Remove(sub)
s.ippStore.RemoveSubscriber(sub)
}
resp.Body.Close()
}(sub)
@ -217,3 +258,28 @@ func (s *Server) SubscribeIPP(feeds types.Feeds) {
resp.Body.Close()
}
}
func (s *Server) UpdateIPPSubscritpions(user *User) {
var matchingPeers Peers
// First get a list of peering Pods
peeringPods := s.cache.GetPeers()
// Next get the User's Sources (feeds they follow)
followingFeeds := user.Sources()
for followingFeed := range followingFeeds {
followingURL := NormalizeURL(followingFeed.URL)
for _, peeringPod := range peeringPods {
if !peeringPod.IsZero() && strings.HasPrefix(NormalizeURL(peeringPod.URI), followingURL) {
matchingPeers = append(matchingPeers, peeringPod)
}
}
}
for _, matchingPeer := range matchingPeers {
if !s.ippStore.IsSubscribedTo(matchingPeer) {
s.ippStore.AddSubscription(matchingPeer)
}
}
}

6
internal/server.go

@ -87,7 +87,7 @@ type Server struct {
translator *Translator
// Inter-Pod Protocol Store
subscribers *IPPStore
ippStore *IPPStore
// Factory Functions
AppendTwt AppendTwtFunc
@ -769,7 +769,7 @@ func NewServer(bind string, options ...Option) (*Server, error) {
api := NewAPI(router, config, cache, archive, db, pm, tasks)
ippstore := NewIPPStore()
ippStore := NewIPPStore()
var handler http.Handler
@ -831,7 +831,7 @@ func NewServer(bind string, options ...Option) (*Server, error) {
translator: translator,
// Inter-Pod Protocol Store
subscribers: ippstore,
ippStore: ippStore,
}
// Factory functions that require access to the Pod Config and Store

Loading…
Cancel
Save