Browse Source

finalize interpod protocol

servusdei2018 1 week ago
committed by James Mills
parent
commit
bcbecf8947
Signed by: prologic GPG Key ID: AC4C014F1440EBD6
  1. 5
      internal/follow_handlers.go
  2. 149
      internal/interpod_protocol.go
  3. 4
      internal/post_handler.go

5
internal/follow_handlers.go

@ -173,6 +173,11 @@ func (s *Server) UnfollowHandler() httprouter.Handle {
s.cache.GetByUser(ctx.User, true)
// Update user subscriptions.
if s.config.Features.IsEnabled(FeatureIPP) {
s.UpdateIPPSubscriptions(ctx.User)
}
ctx.Error = false
ctx.Message = s.tr(ctx, "MsgUnfollowSuccess", trdata)
s.render("error", w, ctx)

149
internal/interpod_protocol.go

@ -17,18 +17,44 @@ const (
IPPSubEndpoint = "/ipp/sub"
)
// Users is a list of users that subscribe to a Peer.
type Users []string
// Add ...
func (u *Users) Add(username string) {
// Avoid duplications.
for _, user := range *u {
if user == username {
return
}
}
users := append(*u, username)
u = &users
}
// Remove ...
func (u *Users) Remove(username string) {
var filtered Users
for _, user := range *u {
if user != username {
filtered.Add(user)
}
}
u = &filtered
}
// IPPStore ...
type IPPStore struct {
sync.RWMutex
subscribers map[string]bool
subscriptions map[*Peer]bool
subscriptions map[*Peer]Users
}
// NewIPPStore ...
func NewIPPStore() *IPPStore {
return &IPPStore{
subscribers: make(map[string]bool),
subscriptions: make(map[*Peer]bool),
subscriptions: make(map[*Peer]Users),
}
}
@ -59,23 +85,36 @@ func (i *IPPStore) GetSubscribers() map[string]bool {
return subscribers
}
// AddSubscription adds a subscription (Pod subscribed to) to the store.
func (i *IPPStore) AddSubscription(peer *Peer) {
// AddSubscription adds a user to a peer subscription in the store.
func (i *IPPStore) AddSubscription(peer *Peer, username string) {
i.Lock()
defer i.Unlock()
i.subscriptions[peer] = true
// Add the user to the subscriber list.
list := i.subscriptions[peer]
list.Add(username)
i.subscriptions[peer] = list
}
// RemoveSubscription removes a subscription (Pod subscribed to) from the store.
func (i *IPPStore) RemoveSubscription(peer *Peer) {
// RemoveSubscription removes a user from a peer subscription in the store.
func (i *IPPStore) RemoveSubscription(peer *Peer, username string) {
i.Lock()
defer i.Unlock()
delete(i.subscriptions, peer)
// Remove the user from the subscriber list.
list := i.subscriptions[peer]
list.Remove(username)
i.subscriptions[peer] = list
// If there are no longer any subscribers, unsubscribe.
if len(list) == 0 {
delete(i.subscriptions, peer)
}
}
// GetSubscriptions returns a list of all subscriptions (Peerings pods subscribed to)
func (i *IPPStore) GetSubscriptions() map[*Peer]bool {
subscriptions := make(map[*Peer]bool)
func (i *IPPStore) GetSubscriptions() map[*Peer]Users {
subscriptions := make(map[*Peer]Users)
i.RLock()
for k, v := range i.subscriptions {
@ -98,6 +137,19 @@ func (i *IPPStore) IsSubscribedTo(peer *Peer) bool {
return false
}
// GetPeerSubscribers returns the amount of users subscribed to a peer.
func (i *IPPStore) GetPeerSubscribers(peer *Peer) int {
i.RLock()
defer i.RUnlock()
users, ok := i.subscriptions[peer]
if !ok {
return 0
}
return len(users)
}
// IPPPubHandler handles publish events received from peer pods.
//
// The parameter :url: is passed as a header value, corresponding to
@ -123,8 +175,19 @@ func (s *Server) IPPPubHandler() httprouter.Handle {
return
}
w.WriteHeader(http.StatusAccepted)
w.Write([]byte(http.StatusText(http.StatusAccepted)))
// Send acceptance to valid peers, and withold acceptance
// from unsubscribed peers.
//
// Without acceptance, the pod that published this event will
// remove this pod from his list of subscribers, effectively
// unsubscribing us.
for peer := range s.ippStore.subscriptions {
if strings.HasPrefix(uri, NormalizeURL(peer.URI)) {
w.WriteHeader(http.StatusAccepted)
w.Write([]byte(http.StatusText(http.StatusAccepted)))
break
}
}
// Ignore blacklisted feeds, as well as local feeds.
if s.cache.conf.BlacklistedFeed(uri) || isLocalUrl(uri) {
@ -199,18 +262,20 @@ func (s *Server) IPPSubHandler() httprouter.Handle {
//
// Publish events are sent concurrently, in order to avoid a slow pod
// causing an upstream latency issue.
func (s *Server) PublishIPP(twter types.Twter) {
func (s *Server) PublishIPP(user *User) {
s.tasks.DispatchFunc(func() error {
var resp *http.Response
client := http.Client{
Timeout: 5 * time.Second,
}
uri := URLForUser(s.config.BaseURL, user.Username)
// Send a publish event to all subscribers.
for sub := range s.ippStore.GetSubscribers() {
go func(sub string) {
go func(sub, uri string) {
req, _ := http.NewRequest(http.MethodPost, sub, nil)
req.Header.Set("x-ipp-uri", sub)
req.Header.Set("x-ipp-uri", uri)
resp, _ = client.Do(req)
// The receiving pod has received the request but doesn't
@ -218,49 +283,38 @@ func (s *Server) PublishIPP(twter types.Twter) {
//
// This can happen:
// 1) If the other pod has IPP disabled
// 2) If we sent a bad IPP request
// 3) The receiver wasn't a pod at all
// 2) The other pod isn't subscribed to us (anymore)
// 3) If we sent a bad IPP request
// 4) The receiver wasn't a pod at all
if resp.StatusCode != http.StatusAccepted {
s.ippStore.RemoveSubscriber(sub)
}
resp.Body.Close()
}(sub)
}(sub, uri)
}
return nil
})
}
// SubscribeIPP subscribes this pod to another pod's IPP notifications.
func (s *Server) SubscribeIPP(feeds types.Feeds) {
var isLocalUrl = IsLocalURLFactory(s.config)
func (s *Server) SubscribeIPP(peer *Peer) {
var resp *http.Response
client := http.Client{
Timeout: 5 * time.Second,
}
// Subscribe to each feed to start receiving publish events from
// each one.
for feed := range feeds {
// Don't subscribe to our own pod.
if isLocalUrl(feed.URL) {
continue
}
// Validate URL.
host, err := url.Parse(feed.URL)
if err != nil {
continue
}
// Make a subscription request.
req, _ := http.NewRequest(http.MethodPost, host.Host+IPPSubEndpoint, nil)
req.Header.Set("x-ipp-callback", s.config.LocalURL().Host+IPPPubEndpoint)
resp, _ = client.Do(req)
resp.Body.Close()
}
// Make a subscription request to the peer.
req, _ := http.NewRequest(http.MethodPost, peer.URI+IPPSubEndpoint, nil)
req.Header.Set("x-ipp-callback", s.config.BaseURL+IPPPubEndpoint)
resp, _ = client.Do(req)
resp.Body.Close()
}
// UpdateIPPSubscriptions updates the IPPStore regarding a User's
// followed feeds.
func (s *Server) UpdateIPPSubscriptions(user *User) {
var matchingPeers Peers
var otherPeers Peers
// First get a list of peering Pods
peeringPods := s.cache.GetPeers()
@ -268,18 +322,31 @@ func (s *Server) UpdateIPPSubscriptions(user *User) {
// Next get the User's Sources (feeds they follow)
followingFeeds := user.Sources()
// Split peered pods into pods that this user subscribes to (matchingPeers),
// and peers that this user doesn't subscribe to (otherPeers).
for followingFeed := range followingFeeds {
followingURL := NormalizeURL(followingFeed.URL)
for _, peeringPod := range peeringPods {
if !peeringPod.IsZero() && strings.HasPrefix(NormalizeURL(peeringPod.URI), followingURL) {
matchingPeers = append(matchingPeers, peeringPod)
} else {
otherPeers = append(otherPeers, peeringPod)
}
}
}
// Ensure subscription to followed peers.
for _, matchingPeer := range matchingPeers {
if !s.ippStore.IsSubscribedTo(matchingPeer) {
s.ippStore.AddSubscription(matchingPeer)
s.ippStore.AddSubscription(matchingPeer, user.Username)
// If we haven't subscribed to this peer in the past, send a
// subscription request.
if s.ippStore.GetPeerSubscribers(matchingPeer) == 1 {
go s.SubscribeIPP(matchingPeer)
}
}
// Ensure unsubscription from unfollowed peers.
for _, otherPeer := range matchingPeers {
s.ippStore.RemoveSubscription(otherPeer, user.Username)
}
}

4
internal/post_handler.go

@ -79,7 +79,7 @@ func (s *Server) PostHandler() httprouter.Handle {
// further.
if r.Method == http.MethodDelete {
if s.config.Features.IsEnabled(FeatureIPP) {
s.PublishIPP(ctx.User.Twter(s.config))
s.PublishIPP(ctx.User)
}
return
}
@ -161,7 +161,7 @@ func (s *Server) PostHandler() httprouter.Handle {
// Publish Inter-Pod Protocol.
if s.config.Features.IsEnabled(FeatureIPP) {
s.PublishIPP(ctx.User.Twter(s.config))
s.PublishIPP(ctx.User)
}
// WebMentions ...

Loading…
Cancel
Save