|
|
|
@ -1,9 +1,11 @@
|
|
|
|
|
// Copyright 2020-present Yarn.social
|
|
|
|
|
// SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
|
|
|
|
|
|
// Package main archives twts into a bluge index
|
|
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"net/url"
|
|
|
|
|
"os"
|
|
|
|
@ -108,51 +110,104 @@ func main() {
|
|
|
|
|
log.WithError(err).Error("error opening index writer")
|
|
|
|
|
os.Exit(2)
|
|
|
|
|
}
|
|
|
|
|
defer writer.Close()
|
|
|
|
|
|
|
|
|
|
worker := func(wg *sync.WaitGroup, twts <-chan types.Twt, bar *progressbar.ProgressBar, writer *bluge.Writer) {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
count := 0
|
|
|
|
|
batch := bluge.NewBatch()
|
|
|
|
|
|
|
|
|
|
for twt := range twts {
|
|
|
|
|
doc := bluge.NewDocument(twt.Hash())
|
|
|
|
|
doc.AddField(bluge.NewKeywordField("author", twt.Twter().DomainNick()).StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewKeywordField("nick", twt.Twter().Nick).StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewKeywordField("feed", twt.Twter().URI).StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewTextField("text", twt.FormatText(types.LiteralFmt, nil)).HighlightMatches().StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewKeywordField("conv", twt.Subject().Text()).StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewDateTimeField("created", twt.Created()).Sortable().StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewTextField("subject", twt.Subject().String()).HighlightMatches().StoreValue())
|
|
|
|
|
for _, tag := range twt.Tags() {
|
|
|
|
|
tf := bluge.NewTextField("tags", tag.Text())
|
|
|
|
|
reader, err := writer.Reader()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.WithError(err).Error("error opening index reader")
|
|
|
|
|
os.Exit(2)
|
|
|
|
|
}
|
|
|
|
|
defer reader.Close()
|
|
|
|
|
|
|
|
|
|
indexTwt := func(twt types.Twt, writer *bluge.Writer) *bluge.Document {
|
|
|
|
|
doc := bluge.NewDocument(twt.Hash())
|
|
|
|
|
doc.AddField(bluge.NewKeywordField("author", twt.Twter().DomainNick()).StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewKeywordField("nick", twt.Twter().Nick).StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewKeywordField("feed", twt.Twter().URI).StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewTextField("text", twt.FormatText(types.LiteralFmt, nil)).HighlightMatches().StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewKeywordField("conv", twt.Subject().Text()).StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewDateTimeField("created", twt.Created()).Sortable().StoreValue())
|
|
|
|
|
doc.AddField(bluge.NewTextField("subject", twt.Subject().String()).HighlightMatches().StoreValue())
|
|
|
|
|
for _, tag := range twt.Tags() {
|
|
|
|
|
tf := bluge.NewTextField("tags", tag.Text())
|
|
|
|
|
tf.WithAnalyzer(analyzer.NewWebAnalyzer())
|
|
|
|
|
tf.SearchTermPositions()
|
|
|
|
|
tf.StoreValue()
|
|
|
|
|
doc.AddField(tf)
|
|
|
|
|
}
|
|
|
|
|
for _, link := range twt.Links() {
|
|
|
|
|
if _, err := url.Parse(link.Target()); err == nil {
|
|
|
|
|
log.Debugf("Indexing link: %s", link.Target())
|
|
|
|
|
tf := bluge.NewTextField("links", link.Target())
|
|
|
|
|
tf.WithAnalyzer(analyzer.NewWebAnalyzer())
|
|
|
|
|
tf.SearchTermPositions()
|
|
|
|
|
tf.StoreValue()
|
|
|
|
|
doc.AddField(tf)
|
|
|
|
|
}
|
|
|
|
|
for _, link := range twt.Links() {
|
|
|
|
|
if _, err := url.Parse(link.Target()); err == nil {
|
|
|
|
|
log.Debugf("Indexing link: %s", link.Target())
|
|
|
|
|
tf := bluge.NewTextField("links", link.Target())
|
|
|
|
|
tf.WithAnalyzer(analyzer.NewWebAnalyzer())
|
|
|
|
|
tf.SearchTermPositions()
|
|
|
|
|
tf.StoreValue()
|
|
|
|
|
doc.AddField(tf)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for _, mention := range twt.Mentions() {
|
|
|
|
|
if mention.Twter().Domain() != "" {
|
|
|
|
|
log.Debugf("Indexing mention: %s", mention.Twter().DomainNick())
|
|
|
|
|
doc.AddField(bluge.NewKeywordField("mentions", mention.Twter().DomainNick()).SearchTermPositions().StoreValue())
|
|
|
|
|
}
|
|
|
|
|
for _, mention := range twt.Mentions() {
|
|
|
|
|
if mention.Twter().Domain() != "" {
|
|
|
|
|
log.Debugf("Indexing mention: %s", mention.Twter().DomainNick())
|
|
|
|
|
doc.AddField(bluge.NewKeywordField("mentions", mention.Twter().DomainNick()).SearchTermPositions().StoreValue())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
doc.AddField(bluge.NewCompositeFieldExcluding("_all", []string{"_id"}))
|
|
|
|
|
|
|
|
|
|
return doc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
isTwtIndexed := func(twt types.Twt, reader *bluge.Reader) (bool, error) {
|
|
|
|
|
q := bluge.NewTermQuery(twt.Hash()).SetField("_id")
|
|
|
|
|
req := bluge.NewTopNSearch(1, q).WithStandardAggregations()
|
|
|
|
|
|
|
|
|
|
iterator, err := reader.Search(context.Background(), req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return false, fmt.Errorf("error executing search: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match, err := iterator.Next()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return false, fmt.Errorf("error iterating matches: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if match == nil {
|
|
|
|
|
return false, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var docID string
|
|
|
|
|
|
|
|
|
|
if err := match.VisitStoredFields(func(field string, value []byte) bool {
|
|
|
|
|
if field == "_id_" {
|
|
|
|
|
docID = string(value)
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return false, fmt.Errorf("error visiting stored fields: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return twt.Hash() == docID, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
worker := func(wg *sync.WaitGroup, twts <-chan types.Twt, bar *progressbar.ProgressBar, writer *bluge.Writer) {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
count := 0
|
|
|
|
|
batch := bluge.NewBatch()
|
|
|
|
|
|
|
|
|
|
doc.AddField(bluge.NewCompositeFieldExcluding("_all", []string{"_id"}))
|
|
|
|
|
for twt := range twts {
|
|
|
|
|
ok, err := isTwtIndexed(twt, reader)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.WithError(err).Errorf("error reading index")
|
|
|
|
|
os.Exit(1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
batch.Update(doc.ID(), doc)
|
|
|
|
|
if !ok {
|
|
|
|
|
doc := indexTwt(twt, writer)
|
|
|
|
|
batch.Update(doc.ID(), doc)
|
|
|
|
|
count++
|
|
|
|
|
}
|
|
|
|
|
bar.Add64(1)
|
|
|
|
|
count++
|
|
|
|
|
|
|
|
|
|
if count > batchSize {
|
|
|
|
|
if err := writer.Batch(batch); err != nil {
|
|
|
|
@ -164,7 +219,7 @@ func main() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if err := writer.Batch(batch); err != nil {
|
|
|
|
|
log.WithError(err).Error("error executing batch")
|
|
|
|
|
log.WithError(err).Error("error executing last batch")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -193,11 +248,12 @@ func main() {
|
|
|
|
|
os.Exit(2)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
reader, err := bluge.OpenReader(config)
|
|
|
|
|
reader, err = bluge.OpenReader(config)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.WithError(err).Error("error opening index reader")
|
|
|
|
|
os.Exit(2)
|
|
|
|
|
}
|
|
|
|
|
defer reader.Close()
|
|
|
|
|
|
|
|
|
|
count, err := reader.Count()
|
|
|
|
|
if err != nil {
|
|
|
|
|