Compare commits

...

3 Commits

  1. 10
      internal/follow_handlers.go
  2. 227
      internal/interpod_protocol.go
  3. 5
      internal/login_handlers.go
  4. 4
      internal/post_handler.go
  5. 6
      internal/server.go

10
internal/follow_handlers.go

@ -57,6 +57,11 @@ func (s *Server) FollowHandler() httprouter.Handle {
s.cache.GetByUser(ctx.User, true)
// Update user subscriptions.
if s.config.Features.IsEnabled(FeatureIPP) {
s.UpdateIPPSubscriptions(ctx.User)
}
ctx.Error = false
ctx.Message = s.tr(ctx, "MsgFollowUserSuccess", trdata)
s.render("error", w, ctx)
@ -168,6 +173,11 @@ func (s *Server) UnfollowHandler() httprouter.Handle {
s.cache.GetByUser(ctx.User, true)
// Update user subscriptions.
if s.config.Features.IsEnabled(FeatureIPP) {
s.UpdateIPPSubscriptions(ctx.User)
}
ctx.Error = false
ctx.Message = s.tr(ctx, "MsgUnfollowSuccess", trdata)
s.render("error", w, ctx)

227
internal/interpod_protocol.go

@ -3,6 +3,7 @@ package internal
import (
"net/http"
"net/url"
"strings"
"sync"
"time"
@ -16,45 +17,137 @@ const (
IPPSubEndpoint = "/ipp/sub"
)
// Users is a list of users that subscribe to a Peer.
type Users []string
// Add ...
func (u *Users) Add(username string) {
// Avoid duplications.
for _, user := range *u {
if user == username {
return
}
}
users := append(*u, username)
u = &users
}
// Remove ...
func (u *Users) Remove(username string) {
var filtered Users
for _, user := range *u {
if user != username {
filtered.Add(user)
}
}
u = &filtered
}
// IPPStore ...
type IPPStore struct {
sync.RWMutex
subscribers map[string]bool
subscribers map[string]bool
subscriptions map[*Peer]Users
}
// Init ...
func (i *IPPStore) Init() {
i.Lock()
defer i.Unlock()
i.subscribers = make(map[string]bool)
// NewIPPStore ...
func NewIPPStore() *IPPStore {
return &IPPStore{
subscribers: make(map[string]bool),
subscriptions: make(map[*Peer]Users),
}
}
// Add adds a subscriber to the store.
func (i *IPPStore) Add(url string) {
// AddSubscriber adds a subscriber to the store.
func (i *IPPStore) AddSubscriber(url string) {
i.Lock()
defer i.Unlock()
i.subscribers[url] = true
}
// Remove removes a subscriber from the store.
func (i *IPPStore) Remove(url string) {
// RemoveSubscriber removes a subscriber from the store.
func (i *IPPStore) RemoveSubscriber(url string) {
i.Lock()
defer i.Unlock()
delete(i.subscribers, url)
}
// Get returns a list of all subscribers.
func (i *IPPStore) Get() map[string]bool {
// GetSubscribers returns a list of all subscribers.
func (i *IPPStore) GetSubscribers() map[string]bool {
subscribers := make(map[string]bool)
i.RLock()
for k, v := range i.subscribers {
subscribers[k] = v
}
i.RUnlock()
return subscribers
}
// AddSubscription adds a user to a peer subscription in the store.
func (i *IPPStore) AddSubscription(peer *Peer, username string) {
i.Lock()
defer i.Unlock()
// Add the user to the subscriber list.
list := i.subscriptions[peer]
list.Add(username)
i.subscriptions[peer] = list
}
// RemoveSubscription removes a user from a peer subscription in the store.
func (i *IPPStore) RemoveSubscription(peer *Peer, username string) {
i.Lock()
defer i.Unlock()
// Remove the user from the subscriber list.
list := i.subscriptions[peer]
list.Remove(username)
i.subscriptions[peer] = list
// If there are no longer any subscribers, unsubscribe.
if len(list) == 0 {
delete(i.subscriptions, peer)
}
}
// GetSubscriptions returns a list of all subscriptions (Peerings pods subscribed to)
func (i *IPPStore) GetSubscriptions() map[*Peer]Users {
subscriptions := make(map[*Peer]Users)
i.RLock()
for k, v := range i.subscriptions {
subscriptions[k] = v
}
i.RUnlock()
return subscriptions
}
// IsSubscribedTo returns true if this pod is subscribed to the given `pod`.
func (i *IPPStore) IsSubscribedTo(peer *Peer) bool {
i.RLock()
defer i.RUnlock()
return i.subscribers
if _, found := i.subscriptions[peer]; found {
return true
}
return false
}
// NewIPPStore ...
func NewIPPStore() *IPPStore {
var store IPPStore
store.Init()
return &store
// GetPeerSubscribers returns the amount of users subscribed to a peer.
func (i *IPPStore) GetPeerSubscribers(peer *Peer) int {
i.RLock()
defer i.RUnlock()
users, ok := i.subscriptions[peer]
if !ok {
return 0
}
return len(users)
}
// IPPPubHandler handles publish events received from peer pods.
@ -82,8 +175,19 @@ func (s *Server) IPPPubHandler() httprouter.Handle {
return
}
w.WriteHeader(http.StatusAccepted)
w.Write([]byte(http.StatusText(http.StatusAccepted)))
// Send acceptance to valid peers, and withold acceptance
// from unsubscribed peers.
//
// Without acceptance, the pod that published this event will
// remove this pod from his list of subscribers, effectively
// unsubscribing us.
for peer := range s.ippStore.subscriptions {
if strings.HasPrefix(uri, NormalizeURL(peer.URI)) {
w.WriteHeader(http.StatusAccepted)
w.Write([]byte(http.StatusText(http.StatusAccepted)))
break
}
}
// Ignore blacklisted feeds, as well as local feeds.
if s.cache.conf.BlacklistedFeed(uri) || isLocalUrl(uri) {
@ -149,7 +253,7 @@ func (s *Server) IPPSubHandler() httprouter.Handle {
w.WriteHeader(http.StatusOK)
w.Write([]byte(http.StatusText(http.StatusOK)))
s.subscribers.Add(callback)
s.ippStore.AddSubscriber(callback)
}
}
@ -158,18 +262,20 @@ func (s *Server) IPPSubHandler() httprouter.Handle {
//
// Publish events are sent concurrently, in order to avoid a slow pod
// causing an upstream latency issue.
func (s *Server) PublishIPP(twter types.Twter) {
func (s *Server) PublishIPP(user *User) {
s.tasks.DispatchFunc(func() error {
var resp *http.Response
client := http.Client{
Timeout: 5 * time.Second,
}
uri := URLForUser(s.config.BaseURL, user.Username)
// Send a publish event to all subscribers.
for sub := range s.subscribers.Get() {
go func(sub string) {
for sub := range s.ippStore.GetSubscribers() {
go func(sub, uri string) {
req, _ := http.NewRequest(http.MethodPost, sub, nil)
req.Header.Set("x-ipp-uri", sub)
req.Header.Set("x-ipp-uri", uri)
resp, _ = client.Do(req)
// The receiving pod has received the request but doesn't
@ -177,43 +283,70 @@ func (s *Server) PublishIPP(twter types.Twter) {
//
// This can happen:
// 1) If the other pod has IPP disabled
// 2) If we sent a bad IPP request
// 3) The receiver wasn't a pod at all
// 2) The other pod isn't subscribed to us (anymore)
// 3) If we sent a bad IPP request
// 4) The receiver wasn't a pod at all
if resp.StatusCode != http.StatusAccepted {
s.subscribers.Remove(sub)
s.ippStore.RemoveSubscriber(sub)
}
resp.Body.Close()
}(sub)
}(sub, uri)
}
return nil
})
}
// SubscribeIPP subscribes this pod to another pod's IPP notifications.
func (s *Server) SubscribeIPP(feeds types.Feeds) {
var isLocalUrl = IsLocalURLFactory(s.config)
func (s *Server) SubscribeIPP(peer *Peer) {
var resp *http.Response
client := http.Client{
Timeout: 5 * time.Second,
}
// Subscribe to each feed to start receiving publish events from
// each one.
for feed := range feeds {
// Don't subscribe to our own pod.
if isLocalUrl(feed.URL) {
continue
// Make a subscription request to the peer.
req, _ := http.NewRequest(http.MethodPost, peer.URI+IPPSubEndpoint, nil)
req.Header.Set("x-ipp-callback", s.config.BaseURL+IPPPubEndpoint)
resp, _ = client.Do(req)
resp.Body.Close()
}
// UpdateIPPSubscriptions updates the IPPStore regarding a User's
// followed feeds.
func (s *Server) UpdateIPPSubscriptions(user *User) {
var matchingPeers Peers
var otherPeers Peers
// First get a list of peering Pods
peeringPods := s.cache.GetPeers()
// Next get the User's Sources (feeds they follow)
followingFeeds := user.Sources()
// Split peered pods into pods that this user subscribes to (matchingPeers),
// and peers that this user doesn't subscribe to (otherPeers).
for followingFeed := range followingFeeds {
followingURL := NormalizeURL(followingFeed.URL)
for _, peeringPod := range peeringPods {
if !peeringPod.IsZero() && strings.HasPrefix(NormalizeURL(peeringPod.URI), followingURL) {
matchingPeers = append(matchingPeers, peeringPod)
} else {
otherPeers = append(otherPeers, peeringPod)
}
}
// Validate URL.
host, err := url.Parse(feed.URL)
if err != nil {
continue
}
// Ensure subscription to followed peers.
for _, matchingPeer := range matchingPeers {
s.ippStore.AddSubscription(matchingPeer, user.Username)
// If we haven't subscribed to this peer in the past, send a
// subscription request.
if s.ippStore.GetPeerSubscribers(matchingPeer) == 1 {
go s.SubscribeIPP(matchingPeer)
}
// Make a subscription request.
req, _ := http.NewRequest(http.MethodPost, host.Host+IPPSubEndpoint, nil)
req.Header.Set("x-ipp-callback", s.config.LocalURL().Host+IPPPubEndpoint)
resp, _ = client.Do(req)
resp.Body.Close()
}
// Ensure unsubscription from unfollowed peers.
for _, otherPeer := range matchingPeers {
s.ippStore.RemoveSubscription(otherPeer, user.Username)
}
}

5
internal/login_handlers.go

@ -91,6 +91,11 @@ func (s *Server) LoginHandler() httprouter.Handle {
_ = sess.(*session.Session).Set("persist", "1")
}
// Update user subscriptions.
if s.config.Features.IsEnabled(FeatureIPP) {
s.UpdateIPPSubscriptions(user)
}
http.Redirect(w, r, r.FormValue("referer"), http.StatusFound)
}
}

4
internal/post_handler.go

@ -79,7 +79,7 @@ func (s *Server) PostHandler() httprouter.Handle {
// further.
if r.Method == http.MethodDelete {
if s.config.Features.IsEnabled(FeatureIPP) {
s.PublishIPP(ctx.User.Twter(s.config))
s.PublishIPP(ctx.User)
}
return
}
@ -161,7 +161,7 @@ func (s *Server) PostHandler() httprouter.Handle {
// Publish Inter-Pod Protocol.
if s.config.Features.IsEnabled(FeatureIPP) {
s.PublishIPP(ctx.User.Twter(s.config))
s.PublishIPP(ctx.User)
}
// WebMentions ...

6
internal/server.go

@ -87,7 +87,7 @@ type Server struct {
translator *Translator
// Inter-Pod Protocol Store
subscribers *IPPStore
ippStore *IPPStore
// Factory Functions
AppendTwt AppendTwtFunc
@ -769,7 +769,7 @@ func NewServer(bind string, options ...Option) (*Server, error) {
api := NewAPI(router, config, cache, archive, db, pm, tasks)
ippstore := NewIPPStore()
ippStore := NewIPPStore()
var handler http.Handler
@ -831,7 +831,7 @@ func NewServer(bind string, options ...Option) (*Server, error) {
translator: translator,
// Inter-Pod Protocol Store
subscribers: ippstore,
ippStore: ippStore,
}
// Factory functions that require access to the Pod Config and Store

Loading…
Cancel
Save