Browse Source

Add RotateFeeds job (#504)

Co-authored-by: James Mills <prologic@shortcircuit.net.au>
Reviewed-on: #504
Co-authored-by: James Mills <james@mills.io>
Co-committed-by: James Mills <james@mills.io>
pull/514/head
James Mills 3 months ago
parent
commit
421911588a
  1. 18
      internal/api.go
  2. 304
      internal/cache.go
  3. 2
      internal/config.go
  4. 9
      internal/conversation_handler.go
  5. 329
      internal/external_handlers.go
  6. 307
      internal/handlers.go
  7. 55
      internal/jobs.go
  8. 1
      internal/langs/active.en.toml
  9. 22
      internal/manage_handlers.go
  10. 16
      internal/options.go
  11. 4
      internal/permalink_handler.go
  12. 2
      internal/search_handler.go
  13. 1
      internal/server.go
  14. 2
      internal/theme/templates/managePod.html
  15. 1
      internal/theme/templates/settings.html
  16. 11
      internal/twt.go
  17. 79
      internal/utils.go

18
internal/api.go

@ -1083,8 +1083,8 @@ func (a *API) ConversationEndpoint() httprouter.Handle {
return
}
twt, ok := a.cache.Lookup(hash)
if !ok {
twt, inCache := a.cache.Lookup(hash)
if !inCache {
// If the twt is not in the cache look for it in the archive
if a.archive.Has(hash) {
twt, err = a.archive.Get(hash)
@ -1101,7 +1101,10 @@ func (a *API) ConversationEndpoint() httprouter.Handle {
return
}
twts := FilterTwts(loggedInUser, a.cache.GetTwtsInConversation(hash, twt))
twts := a.cache.GetByUserView(loggedInUser, fmt.Sprintf("subject:(#%s)", hash), false)
if !inCache {
twts = append(twts, twt)
}
sort.Sort(sort.Reverse(twts))
var pagedTwts types.Twts
@ -1241,9 +1244,12 @@ func (a *API) ExternalProfileEndpoint() httprouter.Handle {
}
if !a.cache.IsCached(uri) {
sources := make(types.Feeds)
sources[types.Feed{Nick: nick, URL: uri}] = true
a.cache.FetchTwts(a.config, a.archive, sources, nil)
a.tasks.DispatchFunc(func() error {
sources := make(types.Feeds)
sources[types.Feed{Nick: nick, URL: uri}] = true
a.cache.FetchTwts(a.config, a.archive, sources, nil)
return nil
})
}
twts := FilterTwts(loggedInUser, a.cache.GetByURL(uri))

304
internal/cache.go

@ -21,24 +21,21 @@ import (
const (
feedCacheFile = "cache"
feedCacheVersion = 8 // increase this if breaking changes occur to cache file.
feedCacheVersion = 11 // increase this if breaking changes occur to cache file.
localViewKey = "local"
discoverViewKey = "discover"
)
// FilterFunc...
// FilterFunc ...
type FilterFunc func(twt types.Twt) bool
// GroupFunc ...
type GroupFunc func(twt types.Twt) []string
func FilterOutFeedsAndBotsFactory(conf *Config) FilterFunc {
seen := make(map[string]bool)
isLocal := IsLocalURLFactory(conf)
return func(twt types.Twt) bool {
if seen[twt.Hash()] {
return false
}
seen[twt.Hash()] = true
twter := twt.Twter()
if strings.HasPrefix(twter.URL, "https://feeds.twtxt.net") {
return false
@ -53,62 +50,134 @@ func FilterOutFeedsAndBotsFactory(conf *Config) FilterFunc {
}
}
func FilterByMentionFactory(u *User) FilterFunc {
return func(twt types.Twt) bool {
for _, mention := range twt.Mentions() {
if u.Is(mention.Twter().URL) {
return true
}
}
return false
}
}
func GroupBySubject(twt types.Twt) []string {
subject := strings.ToLower(twt.Subject().String())
if subject == "" {
return nil
}
return []string{subject}
}
func GroupByTag(twt types.Twt) (res []string) {
var tagsList types.TagList = twt.Tags()
seenTags := make(map[string]bool)
for _, tag := range tagsList {
tagText := strings.ToLower(tag.Text())
if _, seenTag := seenTags[tagText]; !seenTag {
res = append(res, tagText)
seenTags[tagText] = true
}
}
return
}
func FilterTwtsBy(twts types.Twts, f FilterFunc) (res types.Twts) {
for _, twt := range twts {
if f(twt) {
res = append(res, twt)
}
}
return
}
func GroupTwtsBy(twts types.Twts, g GroupFunc) (res map[string]types.Twts) {
res = make(map[string]types.Twts)
for _, twt := range twts {
for _, key := range g(twt) {
res[key] = append(res[key], twt)
}
}
return
}
func MergeTwts(old, new types.Twts, max int) types.Twts {
twts := UniqTwts(append(old, new...))
sort.Sort(twts)
var offset int
if len(twts) > max {
offset = len(twts) - max
}
return twts[offset:]
}
func UniqTwts(twts types.Twts) (res types.Twts) {
seenTwts := make(map[string]bool)
for _, twt := range twts {
if _, seenTwt := seenTwts[twt.Hash()]; !seenTwt {
res = append(res, twt)
seenTwts[twt.Hash()] = true
}
}
return
}
// Cached ...
type Cached struct {
mu sync.RWMutex
cache types.TwtMap
mu sync.RWMutex
Twts types.Twts
LastModified string
}
func NewCached(twts types.Twts, lastModified string) *Cached {
return &Cached{
cache: make(types.TwtMap),
Twts: twts,
LastModified: lastModified,
}
}
// Lookup ...
func (cached *Cached) Lookup(hash string) (types.Twt, bool) {
cached.mu.RLock()
twt, ok := cached.cache[hash]
cached.mu.RUnlock()
if ok {
return twt, true
// Update ...
func (cached *Cached) Update(url, lastmodiied string, twts types.Twts, maxSize int) {
cached.mu.Lock()
defer cached.mu.Unlock()
if len(twts) == 0 {
return
}
for _, twt := range cached.Twts {
if twt.Hash() == hash {
cached.mu.Lock()
if cached.cache == nil {
cached.cache = make(map[string]types.Twt)
}
cached.cache[hash] = twt
cached.mu.Unlock()
return twt, true
}
if len(twts) >= maxSize {
cached.Twts = twts[:]
cached.LastModified = lastmodiied
return
}
return types.NilTwt, false
cached.Twts = MergeTwts(cached.Twts, twts, maxSize)
cached.LastModified = lastmodiied
}
// Cache ...
type Cache struct {
mu sync.RWMutex
mu sync.RWMutex
conf *Config
Version int
All *Cached
Twts map[string]*Cached
List *Cached
Map map[string]types.Twt
Feeds map[string]*Cached
Views map[string]*Cached
}
func NewCache(conf *Config) *Cache {
return &Cache{
conf: conf,
Twts: make(map[string]*Cached),
Map: make(map[string]types.Twt),
Feeds: make(map[string]*Cached),
Views: make(map[string]*Cached),
}
}
@ -169,7 +238,7 @@ func LoadCache(conf *Config) (*Cache, error) {
// Remove invalid cache file.
os.Remove(fn)
cache.Version = feedCacheVersion
cache.Twts = make(map[string]*Cached)
cache.Feeds = make(map[string]*Cached)
return cache, nil
}
}
@ -179,7 +248,7 @@ func LoadCache(conf *Config) (*Cache, error) {
log.Errorf("Cache version mismatch. Expect = %d, Got = %d. Removing old cache.", feedCacheVersion, cache.Version)
os.Remove(fn)
cache.Version = feedCacheVersion
cache.Twts = make(map[string]*Cached)
cache.Feeds = make(map[string]*Cached)
}
return cache, nil
@ -209,12 +278,12 @@ func (cache *Cache) FetchTwts(conf *Config, archive Archiver, feeds types.Feeds,
metrics.Gauge("cache", "sources").Set(float64(len(feeds)))
seen := make(map[string]bool)
seenFeeds := make(map[string]bool)
for feed := range feeds {
// Skip feeds we've already fetched by URI
// (but possibly referenced by different alias)
// Also skil feeds that are blacklisted.
if _, ok := seen[feed.URL]; ok {
if _, seenFeed := seenFeeds[feed.URL]; seenFeed {
continue
}
if cache.conf.BlacklistedFeed(feed.URL) {
@ -223,7 +292,7 @@ func (cache *Cache) FetchTwts(conf *Config, archive Archiver, feeds types.Feeds,
}
wg.Add(1)
seen[feed.URL] = true
seenFeeds[feed.URL] = true
fetchers <- struct{}{}
// anon func takes needed variables as arg, avoiding capture of iterator variables
@ -302,9 +371,7 @@ func (cache *Cache) FetchTwts(conf *Config, archive Archiver, feeds types.Feeds,
archiveTwts(old)
archiveTwts(twts)
cache.mu.Lock()
cache.Twts[feed.URL] = NewCached(twts, "")
cache.mu.Unlock()
cache.UpdateFeed(feed.URL, "", twts)
twtsch <- twts
return
@ -338,7 +405,7 @@ func (cache *Cache) FetchTwts(conf *Config, archive Archiver, feeds types.Feeds,
}
cache.mu.RLock()
if cached, ok := cache.Twts[feed.URL]; ok {
if cached, ok := cache.Feeds[feed.URL]; ok {
if cached.LastModified != "" {
headers.Set("If-Modified-Since", cached.LastModified)
}
@ -357,8 +424,8 @@ func (cache *Cache) FetchTwts(conf *Config, archive Archiver, feeds types.Feeds,
if actualurl != feed.URL {
log.WithError(err).Warnf("feed for %s changed from %s to %s", feed.Nick, feed.URL, actualurl)
cache.mu.Lock()
if cached, ok := cache.Twts[feed.URL]; ok {
cache.Twts[actualurl] = cached
if cached, ok := cache.Feeds[feed.URL]; ok {
cache.Feeds[actualurl] = cached
}
cache.mu.Unlock()
feed.URL = actualurl
@ -434,13 +501,11 @@ func (cache *Cache) FetchTwts(conf *Config, archive Archiver, feeds types.Feeds,
archiveTwts(twts)
lastmodified := res.Header.Get("Last-Modified")
cache.mu.Lock()
cache.Twts[feed.URL] = NewCached(twts, lastmodified)
cache.mu.Unlock()
cache.UpdateFeed(feed.URL, lastmodified, twts)
case http.StatusNotModified: // 304
cache.mu.RLock()
if _, ok := cache.Twts[feed.URL]; ok {
twts = cache.Twts[feed.URL].Twts
if _, ok := cache.Feeds[feed.URL]; ok {
twts = cache.Feeds[feed.URL].Twts
}
cache.mu.RUnlock()
}
@ -460,8 +525,8 @@ func (cache *Cache) FetchTwts(conf *Config, archive Archiver, feeds types.Feeds,
// Bust and repopulate twts for GetAll()
cache.Refresh()
metrics.Gauge("cache", "feeds").Set(float64(cache.Feeds()))
metrics.Gauge("cache", "twts").Set(float64(cache.Count()))
metrics.Gauge("cache", "feeds").Set(float64(cache.FeedCount()))
metrics.Gauge("cache", "twts").Set(float64(cache.TwtCount()))
}
// Lookup ...
@ -469,26 +534,24 @@ func (cache *Cache) Lookup(hash string) (types.Twt, bool) {
cache.mu.RLock()
defer cache.mu.RUnlock()
for _, cached := range cache.Twts {
twt, ok := cached.Lookup(hash)
if ok {
return twt, true
}
twt, ok := cache.Map[hash]
if ok {
return twt, true
}
return types.NilTwt, false
}
func (cache *Cache) Feeds() int {
func (cache *Cache) FeedCount() int {
cache.mu.RLock()
defer cache.mu.RUnlock()
return len(cache.Twts)
return len(cache.Feeds)
}
func (cache *Cache) Count() int {
func (cache *Cache) TwtCount() int {
cache.mu.RLock()
defer cache.mu.RUnlock()
return len(cache.All.Twts)
return len(cache.List.Twts)
}
// Refresh ...
@ -496,11 +559,12 @@ func (cache *Cache) Refresh() {
var allTwts types.Twts
cache.mu.RLock()
for _, cached := range cache.Twts {
for _, cached := range cache.Feeds {
allTwts = append(allTwts, cached.Twts...)
}
cache.mu.RUnlock()
allTwts = UniqTwts(allTwts)
sort.Sort(allTwts)
//
@ -512,30 +576,60 @@ func (cache *Cache) Refresh() {
discoverTwts types.Twts
)
hash := make(map[string]types.Twt)
isLocalURL := IsLocalURLFactory(cache.conf)
filterOutFeedsAndBots := FilterOutFeedsAndBotsFactory(cache.conf)
for _, twt := range allTwts {
hash[twt.Hash()] = twt
if isLocalURL(twt.Twter().URL) {
localTwts = append(localTwts, twt)
}
if filterOutFeedsAndBots(twt) {
discoverTwts = append(discoverTwts, twt)
}
}
tags := GroupTwtsBy(allTwts, GroupByTag)
subjects := GroupTwtsBy(allTwts, GroupBySubject)
cache.mu.Lock()
cache.List = NewCached(allTwts, "")
cache.Map = hash
cache.Views = map[string]*Cached{
localViewKey: NewCached(localTwts, ""),
discoverViewKey: NewCached(discoverTwts, ""),
}
cache.All = NewCached(allTwts, "")
for k, v := range tags {
cache.Views["tag:"+k] = NewCached(v, "")
}
for k, v := range subjects {
cache.Views["subject:"+k] = NewCached(v, "")
}
cache.mu.Unlock()
}
// UpdateFeed ...
func (cache *Cache) UpdateFeed(url, lastmodified string, twts types.Twts) {
cache.mu.RLock()
cached, ok := cache.Feeds[url]
cache.mu.RUnlock()
if !ok {
cache.mu.Lock()
cache.Feeds[url] = NewCached(twts, lastmodified)
cache.mu.Unlock()
} else {
cached.Update(url, lastmodified, twts, cache.conf.MaxCacheItems)
}
}
// GetAll ...
func (cache *Cache) GetAll(refresh bool) types.Twts {
cache.mu.RLock()
cached := cache.All
cached := cache.List
cache.mu.RUnlock()
if cached != nil && !refresh {
@ -543,21 +637,15 @@ func (cache *Cache) GetAll(refresh bool) types.Twts {
}
cache.Refresh()
return cache.All.Twts
return cache.List.Twts
}
// FilterBy ...
func (cache *Cache) FilterBy(f FilterFunc) types.Twts {
var filteredtwts types.Twts
allTwts := cache.GetAll(false)
for _, twt := range allTwts {
if f(twt) {
filteredtwts = append(filteredtwts, twt)
}
}
return FilterTwtsBy(cache.GetAll(false), f)
}
return filteredtwts
func (cache *Cache) GroupBy(g GroupFunc) (res map[string]types.Twts) {
return GroupTwtsBy(cache.GetAll(false), g)
}
// GetMentions ...
@ -572,23 +660,7 @@ func (cache *Cache) GetMentions(u *User, refresh bool) types.Twts {
return cached.Twts
}
var twts types.Twts
seen := make(map[string]bool)
allTwts := cache.GetAll(false)
// Search for @mentions in the cache against all Twts (local, followed and even external if any)
for _, twt := range allTwts {
for _, mention := range twt.Mentions() {
if u.Is(mention.Twter().URL) && !seen[twt.Hash()] {
twts = append(twts, twt)
seen[twt.Hash()] = true
}
}
}
sort.Sort(twts)
twts := cache.FilterBy(FilterByMentionFactory(u))
cache.mu.Lock()
cache.Views[key] = NewCached(twts, "")
@ -602,7 +674,7 @@ func (cache *Cache) IsCached(url string) bool {
cache.mu.RLock()
defer cache.mu.RUnlock()
_, ok := cache.Twts[url]
_, ok := cache.Feeds[url]
return ok
}
@ -651,7 +723,7 @@ func (cache *Cache) GetByUserView(u *User, view string, refresh bool) types.Twts
return cache.GetByView(view)
}
key := fmt.Sprintf("%s:%s", view, u.Username)
key := fmt.Sprintf("%s:%s", u.Username, view)
cache.mu.RLock()
cached, ok := cache.Views[key]
@ -676,54 +748,12 @@ func (cache *Cache) GetByURL(url string) types.Twts {
cache.mu.RLock()
defer cache.mu.RUnlock()
if cached, ok := cache.Twts[url]; ok {
if cached, ok := cache.Feeds[url]; ok {
return cached.Twts
}
return types.Twts{}
}
// GetTwtsInConversation ...
func (cache *Cache) GetTwtsInConversation(hash string, replyTo types.Twt) types.Twts {
subject := fmt.Sprintf("(#%s)", hash)
return cache.GetBySubject(subject, replyTo)
}
// GetBySubject ...
func (cache *Cache) GetBySubject(subject string, replyTo types.Twt) types.Twts {
var result types.Twts
allTwts := cache.GetAll(false)
seen := make(map[string]bool)
for _, twt := range allTwts {
if twt.Subject().String() == subject && !seen[twt.Hash()] {
result = append(result, twt)
seen[twt.Hash()] = true
}
}
if !seen[replyTo.Hash()] {
result = append(result, replyTo)
}
return result
}
// GetByTag ...
func (cache *Cache) GetByTag(tag string) types.Twts {
var result types.Twts
allTwts := cache.GetAll(false)
seen := make(map[string]bool)
for _, twt := range allTwts {
var tags types.TagList = twt.Tags()
if HasString(UniqStrings(tags.Tags()), tag) && !seen[twt.Hash()] {
result = append(result, twt)
seen[twt.Hash()] = true
}
}
return result
}
// DeleteUserViews ...
func (cache *Cache) DeleteUserViews(u *User) {
cache.mu.Lock()
@ -737,7 +767,7 @@ func (cache *Cache) DeleteUserViews(u *User) {
func (cache *Cache) DeleteFeeds(feeds types.Feeds) {
cache.mu.Lock()
for feed := range feeds {
delete(cache.Twts, feed.URL)
delete(cache.Feeds, feed.URL)
}
cache.mu.Unlock()
cache.Refresh()

2
internal/config.go

@ -91,7 +91,7 @@ type Config struct {
MaxUploadSize int64
MaxTwtLength int
MaxCacheTTL time.Duration
FetchInterval time.Duration
FetchInterval string
MaxCacheItems int
OpenProfiles bool
OpenRegistrations bool

9
internal/conversation_handler.go

@ -32,8 +32,8 @@ func (s *Server) ConversationHandler() httprouter.Handle {
var err error
twt, ok := s.cache.Lookup(hash)
if !ok {
twt, inCache := s.cache.Lookup(hash)
if !inCache {
// If the twt is not in the cache look for it in the archive
if s.archive.Has(hash) {
twt, err = s.archive.Get(hash)
@ -93,7 +93,10 @@ func (s *Server) ConversationHandler() httprouter.Handle {
)
}
twts := s.cache.GetTwtsInConversation(hash, twt)
twts := s.cache.GetByUserView(ctx.User, fmt.Sprintf("subject:(#%s)", hash), false)
if !inCache {
twts = append(twts, twt)
}
sort.Sort(sort.Reverse(twts))
var pagedTwts types.Twts

329
internal/external_handlers.go

@ -0,0 +1,329 @@
package internal
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"git.mills.io/yarnsocial/yarn/types"
"github.com/julienschmidt/httprouter"
log "github.com/sirupsen/logrus"
"github.com/vcraescu/go-paginator"
"github.com/vcraescu/go-paginator/adapter"
)
// ExternalHandler ...
func (s *Server) ExternalHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
ctx := NewContext(s, r)
ctx.Translate(s.translator)
uri := r.URL.Query().Get("uri")
nick := r.URL.Query().Get("nick")
if uri == "" {
ctx.Error = true
ctx.Message = s.tr(ctx, "ErrorNoExternalFeed")
s.render("error", w, ctx)
return
}
if nick == "" {
log.Warn("no nick given to external profile request")
}
if !s.cache.IsCached(uri) {
s.tasks.DispatchFunc(func() error {
sources := make(types.Feeds)
sources[types.Feed{Nick: nick, URL: uri}] = true
s.cache.FetchTwts(s.config, s.archive, sources, nil)
return nil
})
}
twts := FilterTwts(ctx.User, s.cache.GetByURL(uri))
var pagedTwts types.Twts
page := SafeParseInt(r.FormValue("p"), 1)
pager := paginator.New(adapter.NewSliceAdapter(twts), s.config.TwtsPerPage)
pager.SetPage(page)
if err := pager.Results(&pagedTwts); err != nil {
log.WithError(err).Error("error sorting and paging twts")
ctx.Error = true
ctx.Message = s.tr(ctx, "ErrorLoadingTimeline")
s.render("error", w, ctx)
return
}
ctx.Twts = pagedTwts
ctx.Pager = &pager
if len(ctx.Twts) > 0 {
ctx.Twter = ctx.Twts[0].Twter()
} else {
ctx.Twter = types.Twter{Nick: nick, URL: uri}
}
if ctx.Twter.Avatar == "" {
avatar := GetExternalAvatar(s.config, ctx.Twter)
if avatar != "" {
ctx.Twter.Avatar = URLForExternalAvatar(s.config, uri)
}
}
// If no &nick= provided try to guess a suitable nick
// from the feed or some heuristics from the feed's URI
// (borrowed from Yarns)
if nick == "" {
if ctx.Twter.Nick != "" {
nick = ctx.Twter.Nick
} else {
// TODO: Move this logic into types/lextwt and types/retwt
if u, err := url.Parse(uri); err == nil {
if strings.HasSuffix(u.Path, "/twtxt.txt") {
if rest := strings.TrimSuffix(u.Path, "/twtxt.txt"); rest != "" {
nick = strings.Trim(rest, "/")
} else {
nick = u.Hostname()
}
} else if strings.HasSuffix(u.Path, ".txt") {
base := filepath.Base(u.Path)
if name := strings.TrimSuffix(base, filepath.Ext(base)); name != "" {
nick = name
} else {
nick = u.Hostname()
}
} else {
nick = Slugify(uri)
}
}
}
}
following := make(map[string]string)
for followingNick, followingTwter := range ctx.Twter.Follow {
following[followingNick] = followingTwter.URL
}
ctx.Profile = types.Profile{
Type: "External",
Username: nick,
Tagline: ctx.Twter.Tagline,
Avatar: URLForExternalAvatar(s.config, uri),
URL: uri,
Following: following,
NFollowing: ctx.Twter.Following,
NFollowers: ctx.Twter.Followers,
ShowFollowing: true,
ShowFollowers: true,
Follows: ctx.User.Follows(uri),
FollowedBy: ctx.User.FollowedBy(uri),
Muted: ctx.User.HasMuted(uri),
}
if len(twts) > 0 {
ctx.Profile.LastPostedAt = twts[0].Created()
}
trdata := map[string]interface{}{}
trdata["Nick"] = nick
trdata["URL"] = uri
ctx.Title = s.tr(ctx, "PageExternalProfileTitle", trdata)
s.render("externalProfile", w, ctx)
}
}
// ExternalFollowingHandler ...
func (s *Server) ExternalFollowingHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
ctx := NewContext(s, r)
ctx.Translate(s.translator)
uri := r.URL.Query().Get("uri")
nick := r.URL.Query().Get("nick")
if uri == "" {
ctx.Error = true
ctx.Message = s.tr(ctx, "ErrorNoExternalFeed")
s.render("error", w, ctx)
return
}
if nick == "" {
log.Warn("no nick given to external profile request")
}
if !s.cache.IsCached(uri) {
sources := make(types.Feeds)
sources[types.Feed{Nick: nick, URL: uri}] = true
s.cache.FetchTwts(s.config, s.archive, sources, nil)
}
twts := s.cache.GetByURL(uri)
if len(twts) > 0 {
ctx.Twter = twts[0].Twter()
} else {
ctx.Twter = types.Twter{Nick: nick, URL: uri}
}
if ctx.Twter.Avatar == "" {
avatar := GetExternalAvatar(s.config, ctx.Twter)
if avatar != "" {
ctx.Twter.Avatar = URLForExternalAvatar(s.config, uri)
}
}
// If no &nick= provided try to guess a suitable nick
// from the feed or some heuristics from the feed's URI
// (borrowed from Yarns)
if nick == "" {
if ctx.Twter.Nick != "" {
nick = ctx.Twter.Nick
} else {
// TODO: Move this logic into types/lextwt and types/retwt
if u, err := url.Parse(uri); err == nil {
if strings.HasSuffix(u.Path, "/twtxt.txt") {
if rest := strings.TrimSuffix(u.Path, "/twtxt.txt"); rest != "" {
nick = strings.Trim(rest, "/")
} else {
nick = u.Hostname()
}
} else if strings.HasSuffix(u.Path, ".txt") {
base := filepath.Base(u.Path)
if name := strings.TrimSuffix(base, filepath.Ext(base)); name != "" {
nick = name
} else {
nick = u.Hostname()
}
} else {
nick = Slugify(uri)
}
}
}
}
following := make(map[string]string)
for followingNick, followingTwter := range ctx.Twter.Follow {
following[followingNick] = followingTwter.URL
}
ctx.Profile = types.Profile{
Type: "External",
Username: nick,
Tagline: ctx.Twter.Tagline,
Avatar: URLForExternalAvatar(s.config, uri),
URL: uri,
Following: following,
NFollowing: ctx.Twter.Following,
NFollowers: ctx.Twter.Followers,
ShowFollowing: true,
ShowFollowers: true,
Follows: ctx.User.Follows(uri),
FollowedBy: ctx.User.FollowedBy(uri),
Muted: ctx.User.HasMuted(uri),
}
if len(twts) > 0 {
ctx.Profile.LastPostedAt = twts[0].Created()
}
if r.Header.Get("Accept") == "application/json" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(ctx.Profile.Following); err != nil {
log.WithError(err).Error("error encoding user for display")
http.Error(w, "Bad Request", http.StatusBadRequest)
}
return
}
trdata := map[string]interface{}{}
trdata["Nick"] = nick
trdata["URL"] = uri
ctx.Title = s.tr(ctx, "PageExternalFollowingTitle", trdata)
s.render("externalFollowing", w, ctx)
}
}
// ExternalAvatarHandler ...
func (s *Server) ExternalAvatarHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
w.Header().Set("Cache-Control", "public, no-cache, must-revalidate")
uri := r.URL.Query().Get("uri")
if uri == "" {
log.Warn("no uri provided for external avatar")
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
slug := Slugify(uri)
fn := filepath.Join(s.config.Data, externalDir, fmt.Sprintf("%s.png", slug))
w.Header().Set("Content-Type", "image/png")
if !FileExists(fn) {
log.Warnf("no external avatar found for %s", slug)
http.Error(w, "External avatar not found", http.StatusNotFound)
return
}
fileInfo, err := os.Stat(fn)
if err != nil {
log.WithError(err).Error("os.Stat() error")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
etag := fmt.Sprintf("W/\"%s-%s\"", r.RequestURI, fileInfo.ModTime().Format(time.RFC3339))
if match := r.Header.Get("If-None-Match"); match != "" {
if strings.Contains(match, etag) {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.Header().Set("Etag", etag)
if r.Method == http.MethodHead {
return
}
if r.Method == http.MethodHead {
return
}
f, err := os.Open(fn)
if err != nil {
log.WithError(err).Error("error opening avatar file")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
defer f.Close()
if _, err := io.Copy(w, f); err != nil {
log.WithError(err).Error("error writing avatar response")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
}
}

307
internal/handlers.go

@ -1228,313 +1228,6 @@ func (s *Server) FollowingHandler() httprouter.Handle {
}
}
// ExternalHandler ...
func (s *Server) ExternalHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
ctx := NewContext(s, r)
ctx.Translate(s.translator)
uri := r.URL.Query().Get("uri")
nick := r.URL.Query().Get("nick")
if uri == "" {
ctx.Error = true
ctx.Message = s.tr(ctx, "ErrorNoExternalFeed")
s.render("error", w, ctx)
return
}
if nick == "" {
log.Warn("no nick given to external profile request")
}
if !s.cache.IsCached(uri) {
sources := make(types.Feeds)
sources[types.Feed{Nick: nick, URL: uri}] = true
s.cache.FetchTwts(s.config, s.archive, sources, nil)
}
twts := FilterTwts(ctx.User, s.cache.GetByURL(uri))
var pagedTwts types.Twts
page := SafeParseInt(r.FormValue("p"), 1)
pager := paginator.New(adapter.NewSliceAdapter(twts), s.config.TwtsPerPage)
pager.SetPage(page)
if err := pager.Results(&pagedTwts); err != nil {
log.WithError(err).Error("error sorting and paging twts")
ctx.Error = true
ctx.Message = s.tr(ctx, "ErrorLoadingTimeline")
s.render("error", w, ctx)
return
}
ctx.Twts = pagedTwts
ctx.Pager = &pager
if len(ctx.Twts) > 0 {
ctx.Twter = ctx.Twts[0].Twter()
} else {
ctx.Twter = types.Twter{Nick: nick, URL: uri}
}
if ctx.Twter.Avatar == "" {
avatar := GetExternalAvatar(s.config, ctx.Twter)
if avatar != "" {
ctx.Twter.Avatar = URLForExternalAvatar(s.config, uri)
}
}
// If no &nick= provided try to guess a suitable nick
// from the feed or some heuristics from the feed's URI
// (borrowed from Yarns)
if nick == "" {
if ctx.Twter.Nick != "" {
nick = ctx.Twter.Nick
} else {
// TODO: Move this logic into types/lextwt and types/retwt
if u, err := url.Parse(uri); err == nil {
if strings.HasSuffix(u.Path, "/twtxt.txt") {
if rest := strings.TrimSuffix(u.Path, "/twtxt.txt"); rest != "" {
nick = strings.Trim(rest, "/")
} else {
nick = u.Hostname()
}
} else if strings.HasSuffix(u.Path, ".txt") {
base := filepath.Base(u.Path)
if name := strings.TrimSuffix(base, filepath.Ext(base)); name != "" {
nick = name
} else {
nick = u.Hostname()
}
} else {
nick = Slugify(uri)
}
}
}
}
following := make(map[string]string)
for followingNick, followingTwter := range ctx.Twter.Follow {
following[followingNick] = followingTwter.URL
}
ctx.Profile = types.Profile{
Type: "External",
Username: nick,
Tagline: ctx.Twter.Tagline,
Avatar: URLForExternalAvatar(s.config, uri),
URL: uri,
Following: following,
NFollowing: ctx.Twter.Following,
NFollowers: ctx.Twter.Followers,
ShowFollowing: true,
ShowFollowers: true,
Follows: ctx.User.Follows(uri),
FollowedBy: ctx.User.FollowedBy(uri),
Muted: ctx.User.HasMuted(uri),
}
if len(twts) > 0 {
ctx.Profile.LastPostedAt = twts[0].Created()
}
trdata := map[string]interface{}{}
trdata["Nick"] = nick
trdata["URL"] = uri
ctx.Title = s.tr(ctx, "PageExternalProfileTitle", trdata)
s.render("externalProfile", w, ctx)
}
}
// ExternalFollowingHandler ...
func (s *Server) ExternalFollowingHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
ctx := NewContext(s, r)
ctx.Translate(s.translator)
uri := r.URL.Query().Get("uri")
nick := r.URL.Query().Get("nick")
if uri == "" {
ctx.Error = true
ctx.Message = s.tr(ctx, "ErrorNoExternalFeed")
s.render("error", w, ctx)
return
}
if nick == "" {
log.Warn("no nick given to external profile request")
}
if !s.cache.IsCached(uri) {
sources := make(types.Feeds)
sources[types.Feed{Nick: nick, URL: uri}] = true
s.cache.FetchTwts(s.config, s.archive, sources, nil)
}
twts := s.cache.GetByURL(uri)
if len(twts) > 0 {
ctx.Twter = twts[0].Twter()
} else {
ctx.Twter = types.Twter{Nick: nick, URL: uri}
}
if ctx.Twter.Avatar == "" {
avatar := GetExternalAvatar(s.config, ctx.Twter)
if avatar != "" {
ctx.Twter.Avatar = URLForExternalAvatar(s.config, uri)
}
}
// If no &nick= provided try to guess a suitable nick
// from the feed or some heuristics from the feed's URI
// (borrowed from Yarns)
if nick == "" {
if ctx.Twter.Nick != "" {
nick = ctx.Twter.Nick
} else {
// TODO: Move this logic into types/lextwt and types/retwt
if u, err := url.Parse(uri); err == nil {
if strings.HasSuffix(u.Path, "/twtxt.txt") {
if rest := strings.TrimSuffix(u.Path, "/twtxt.txt"); rest != "" {
nick = strings.Trim(rest, "/")
} else {
nick = u.Hostname()
}
} else if strings.HasSuffix(u.Path, ".txt") {
base := filepath.Base(u.Path)
if name := strings.TrimSuffix(base, filepath.Ext(base)); name != "" {
nick = name
} else {
nick = u.Hostname()
}
} else {
nick = Slugify(uri)
}
}
}
}
following := make(map[string]string)
for followingNick, followingTwter := range ctx.Twter.Follow {
following[followingNick] = followingTwter.URL
}
ctx.Profile = types.Profile{
Type: "External",
Username: nick,
Tagline: ctx.Twter.Tagline,
Avatar: URLForExternalAvatar(s.config, uri),
URL: uri,
Following: following,
NFollowing: ctx.Twter.Following,
NFollowers: ctx.Twter.Followers,
ShowFollowing: true,
ShowFollowers: true,
Follows: ctx.User.Follows(uri),
FollowedBy: ctx.User.FollowedBy(uri),
Muted: ctx.User.HasMuted(uri),
}
if len(twts) > 0 {
ctx.Profile.LastPostedAt = twts[0].Created()
}
if r.Header.Get("Accept") == "application/json" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(ctx.Profile.Following); err != nil {
log.WithError(err).Error("error encoding user for display")
http.Error(w, "Bad Request", http.StatusBadRequest)
}
return
}
trdata := map[string]interface{}{}
trdata["Nick"] = nick
trdata["URL"] = uri
ctx.Title = s.tr(ctx, "PageExternalFollowingTitle", trdata)
s.render("externalFollowing", w, ctx)
}
}
// ExternalAvatarHandler ...
func (s *Server) ExternalAvatarHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
w.Header().Set("Cache-Control", "public, no-cache, must-revalidate")
uri := r.URL.Query().Get("uri")
if uri == "" {
log.Warn("no uri provided for external avatar")
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
slug := Slugify(uri)
fn := filepath.Join(s.config.Data, externalDir, fmt.Sprintf("%s.png", slug))
w.Header().Set("Content-Type", "image/png")
if !FileExists(fn) {
log.Warnf("no external avatar found for %s", slug)
http.Error(w, "External avatar not found", http.StatusNotFound)
return
}
fileInfo, err := os.Stat(fn)
if err != nil {
log.WithError(err).Error("os.Stat() error")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
etag := fmt.Sprintf("W/\"%s-%s\"", r.RequestURI, fileInfo.ModTime().Format(time.RFC3339))
if match := r.Header.Get("If-None-Match"); match != "" {
if strings.Contains(match, etag) {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.Header().Set("Etag", etag)
if r.Method == http.MethodHead {
return
}
if r.Method == http.MethodHead {
return
}
f, err := os.Open(fn)
if err != nil {
log.WithError(err).Error("error opening avatar file")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
defer f.Close()
if _, err := io.Copy(w, f); err != nil {
log.WithError(err).Error("error writing avatar response")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
}
}
// ResetPasswordHandler ...
func (s *Server) ResetPasswordHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {

55
internal/jobs.go

@ -2,8 +2,11 @@ package internal
import (
"fmt"
"os"
"path/filepath"
"git.mills.io/yarnsocial/yarn/types"
"github.com/dustin/go-humanize"
"github.com/robfig/cron"
log "github.com/sirupsen/logrus"
)
@ -26,18 +29,20 @@ var (
func InitJobs(conf *Config) {
Jobs = map[string]JobSpec{
"SyncStore": NewJobSpec("@every 1m", NewSyncStoreJob),
"UpdateFeeds": NewJobSpec(fmt.Sprintf("@every %s", conf.FetchInterval), NewUpdateFeedsJob),
"UpdateFeeds": NewJobSpec(conf.FetchInterval, NewUpdateFeedsJob),
"UpdateFeedSources": NewJobSpec("@every 15m", NewUpdateFeedSourcesJob),
"DeleteOldSessions": NewJobSpec("@hourly", NewDeleteOldSessionsJob),
"Stats": NewJobSpec("@daily", NewStatsJob),
"Stats": NewJobSpec("@daily", NewStatsJob),
"RotateFeeds": NewJobSpec("0 0 2 * * 0", NewRotateFeedsJob),
"CreateBots": NewJobSpec("", NewCreateBotsJob),
"CreateAdminFeeds": NewJobSpec("", NewCreateAdminFeedsJob),
}
StartupJobs = map[string]JobSpec{
"RotateFeeds": Jobs["RotateFeeds"],
"UpdateFeeds": Jobs["UpdateFeeds"],
"UpdateFeedSources": Jobs["UpdateFeedSources"],
"CreateBots": Jobs["CreateBots"],
@ -134,7 +139,7 @@ func (job *StatsJob) Run() {
text := fmt.Sprintf(
"🧮 USERS:%d FEEDS:%d TWTS:%d ARCHIVED:%d CACHE:%d FOLLOWERS:%d FOLLOWING:%d",
len(users), len(feeds), twts, archiveSize, job.cache.Count(), len(followers), len(following),
len(users), len(feeds), twts, archiveSize, job.cache.TwtCount(), len(followers), len(following),
)
if _, err := AppendSpecial(job.conf, job.db, "stats", text); err != nil {
@ -198,7 +203,7 @@ func (job *UpdateFeedsJob) Run() {
log.Infof("updating %d sources", len(sources))
job.cache.FetchTwts(job.conf, job.archive, sources, publicFollowers)
log.Info("syncing feed cache ", len(job.cache.Twts))
log.Info("syncing feed cache")
if err := job.cache.Store(job.conf); err != nil {
log.WithError(err).Warn("error saving feed cache")
return
@ -325,3 +330,45 @@ func (job *DeleteOldSessionsJob) Run() {
}
}
}
type RotateFeedsJob struct {
conf *Config
cache *Cache
archive Archiver
db Store
}
func NewRotateFeedsJob(conf *Config, cache *Cache, archive Archiver, db Store) cron.Job {
return &RotateFeedsJob{conf: conf, cache: cache, archive: archive, db: db}
}
func (job *RotateFeedsJob) Run() {
feeds, err := GetAllFeeds(job.conf)
if err != nil {
log.WithError(err).Warn("unable to get all local feeds")
return
}
for _, feed := range feeds {
fn := filepath.Join(job.conf.Data, feedsDir, feed)
stat, err := os.Stat(fn)
if err != nil {
log.WithError(err).Error("error getting feed size")
continue
}
if stat.Size() > job.conf.MaxFetchLimit {
log.Infof(
"rotating %s with size %s > %s",
feed, humanize.Bytes(uint64(stat.Size())),
humanize.Bytes(uint64(job.conf.MaxFetchLimit)),
)
if err := RotateFeed(job.conf, feed); err != nil {
log.WithError(err).Error("error rotating feed")
} else {
log.Infof("rotated feed %s", feed)
}
}
}
}

1
internal/langs/active.en.toml

@ -144,6 +144,7 @@ ManageFeedSummary = "Manage <b>{{ .Username}}</b> details"
ManageFeedTitle = "Manage feed"
ManagePodLinkTitle = "Manage Pod"
ManageUsersLinkTitle = "Manage Users"
ManageRefreshCacheTitle = "Refresh Cache"
MeLinkTitle = "me"
MenuAbout = "About"
MenuAbuse = "Abuse"

22
internal/manage_handlers.go

@ -471,3 +471,25 @@ func (s *Server) RstUserHandler() httprouter.Handle {
s.render("error", w, ctx)
}
}
// RefreshCacheHandler ...
func (s *Server) RefreshCacheHandler() httprouter.Handle {
isAdminUser := IsAdminUserFactory(s.config)
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
ctx := NewContext(s, r)
if !isAdminUser(ctx.User) {
ctx.Error = true
ctx.Message = "You are not a Pod Owner!"
s.render("403", w, ctx)
return
}
s.cache.Refresh()
ctx.Error = false
ctx.Message = "Successfully refreshed cache"
s.render("error", w, ctx)
}
}

16
internal/options.go

@ -5,8 +5,6 @@ import (
"regexp"
"runtime"
"time"
log "github.com/sirupsen/logrus"
)
const (
@ -86,9 +84,8 @@ const (
DefaultMaxCacheTTL = time.Hour * 24 * 10 // 10 days 28 days 28 days 28 days
// DefaultFetchInterval is the default interval used by the global feed cache
// to control when to actually fetch and update feeds. This accepts `time.Duration`
// as parsed by `time.ParseDuration()`.
DefaultFetchInterval = "5m"
// to control when to actually fetch and update feeds.
DefaultFetchInterval = "@every 5m"
// DefaultMaxCacheItems is the default maximum cache items (per feed source)
// of twts in memory
@ -124,7 +121,7 @@ const (
DefaultSMTPFrom = InvalidConfigValue
// DefaultMaxFetchLimit is the maximum fetch fetch limit in bytes
DefaultMaxFetchLimit = 1 << 21 // ~2MB (or more than enough for a year)
DefaultMaxFetchLimit = 1 << 20 // ~1MB (or more than enough for months)
// DefaultAPISessionTime is the server's default session time for API tokens
DefaultAPISessionTime = 240 * time.Hour // 10 days
@ -376,12 +373,7 @@ func WithMaxCacheTTL(maxCacheTTL time.Duration) Option {
// Accepts a string as parsed by `time.ParseDuration`
func WithFetchInterval(fetchInterval string) Option {
return func(cfg *Config) error {
d, err := time.ParseDuration(fetchInterval)
if err != nil {
log.WithError(err).Errorf("error parsing fetch interval %s", fetchInterval)
return err
}
cfg.FetchInterval = d
cfg.FetchInterval = fetchInterval
return nil
}
}

4
internal/permalink_handler.go

@ -35,8 +35,8 @@ func (s *Server) PermalinkHandler() httprouter.Handle {
var err error
twt, ok := s.cache.Lookup(hash)
if !ok {
twt, inCache := s.cache.Lookup(hash)
if !inCache {
// If the twt is not in the cache look for it in the archive
if s.archive.Has(hash) {
twt, err = s.archive.Get(hash)