Browse Source

feat: implement interpod publish protocol (#700)

I originally looked at the websub repo and did a couple iterations of trying to use it, but overall I had a hard time trying to implement a spec-compliant client, so I went off on a tangent and made a stripped down webhook-based substitute that I called the "Inter-Pod Publish Protocol."

Basically it adds two endpoints: "/ipp/pub" and "/ipp/sub". The publish endpoint is for other pods to send Publish events (basically a POST request with a header containing the URL of the updated feed), and the subscribe endpoint is for other pods to send a Subscribe event (basically a POST request with a header containing the URL of their publish endpoint).

By working together, pods can easily exchange real-time information regarding which feeds were updated and update their caches accordingly.

In addition to the endpoints I also implemented the IPP feature flag (`ipp`), which enables the endpoints and the behavior. Currently, when a twt is posted, edited, or deleted, there is already functionality in place to alert all the subscribing pods of the change.

So the stuff implemented are:
* PUB endpoint
* SUB endpoint
* automatically alert subscribers of new/deleted/edited twts

The only thing still missing is implementing a mechanism for making a Pod subscribe to antoher, and some tests, but once those are done, I'm sure it'll work nicely.

I was originally thinking I could stick auto-subscribe features into the update cache job, however due to the way it works, the IPP methods are methods of a Server, and the jobs don't have access to the Server; so I thought I'd commit what I had built and wait for your decision.

Co-authored-by: servusdei2018 <servusdei@programmer.net>
Reviewed-on: #700
Co-authored-by: servusdei2018 <servusdei2018@noreply@mills.io>
Co-committed-by: servusdei2018 <servusdei2018@noreply@mills.io>
servusdei2018 1 week ago
committed by James Mills
parent
commit
2279fe1bc7
  1. 1
      go.mod
  2. 23
      go.sum
  3. 5
      internal/features.go
  4. 203
      internal/interpod_protocol.go
  5. 8
      internal/post_handler.go
  6. 14
      internal/server.go

1
go.mod

@ -90,4 +90,5 @@ require (
gopkg.in/mail.v2 v2.3.1 // indirect
gopkg.in/yaml.v2 v2.4.0
gorm.io/gorm v1.22.2 // indirect
meow.tf/websub v1.0.0
)

23
go.sum

@ -157,6 +157,7 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creasty/defaults v1.5.2 h1:/VfB6uxpyp6h0fr7SPp7n8WJBoV8jfxQXPCnkVSjyls=
github.com/creasty/defaults v1.5.2/go.mod h1:FPZ+Y0WNrbqOVw+c6av63eyHUAl6pMHZwqLPvXUZGfY=
github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI=
@ -229,11 +230,17 @@ github.com/go-mail/mail v2.3.1+incompatible/go.mod h1:VPWjmmNyRsWXQZHVHT3g0YbIIN
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU=
github.com/go-playground/locales v0.14.0/go.mod h1:sawfccIbzZTqEDETgFXqTho0QybSa7l++s0DH+LDiLs=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/jYrnRPArHwAcmLoJZxyho=
github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE=
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
github.com/go-playground/validator/v10 v10.9.0 h1:NgTtmN58D0m8+UuxtYmGztBJB7VnPgjj221I1QHci2A=
github.com/go-playground/validator/v10 v10.9.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
@ -333,6 +340,7 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.4/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@ -423,6 +431,7 @@ github.com/jinzhu/now v1.1.2 h1:eVKgfIdy9b6zbWBMgFpfDPoAMifwSZagU9HmEU6zgiI=
github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@ -450,15 +459,20 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/labstack/echo/v4 v4.1.17/go.mod h1:Tn2yRQL/UclUalpb5rPdXDevbkJ+lp/2svdyFBg6CHQ=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
@ -574,6 +588,7 @@ github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@ -639,6 +654,8 @@ github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfm
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.21.0/go.mod h1:ZPhntP/xmq1nnND05hhpAh2QMhSsA4UN3MGZ6O2J3hM=
github.com/russross/blackfriday v1.6.0 h1:KqfZb0pUVN2lYqZUYRddxF4OR8ZMURnJIG5Y3VRLtww=
@ -749,6 +766,7 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
@ -797,6 +815,7 @@ golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200414173820-0848c9571904/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa h1:idItI2DDfCokpg0N51B2VtiLdJ4vAuXC9fnCb2gACo4=
@ -977,6 +996,7 @@ golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1232,6 +1252,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
@ -1273,6 +1294,8 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
meow.tf/websub v1.0.0 h1:DPzQnBkXfH/4g9OEabd8HulpiHrC1QYBBtuWG/38LRc=
meow.tf/websub v1.0.0/go.mod h1:0sWbOxaqyfmpna5Nphl8v86TPo2SNT6Jxot1E4VKjXg=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

5
internal/features.go

@ -18,6 +18,7 @@ const (
FeatureFoo
FeatureMovingAverageFeedRefresh
FeatureJumpTimelineAge
FeatureIPP
)
// Interface guards
@ -36,6 +37,8 @@ func (f FeatureType) String() string {
return "moving_average_feed_refresh"
case FeatureJumpTimelineAge:
return "jump_timeline_age"
case FeatureIPP:
return "ipp"
default:
return "invalid_feature"
}
@ -70,6 +73,8 @@ func FeatureFromString(s string) (FeatureType, error) {
return FeatureMovingAverageFeedRefresh, nil
case "jump_timeline_age":
return FeatureJumpTimelineAge, nil
case "ipp":
return FeatureIPP, nil
default:
fs := fmt.Sprintf("available features: %s", strings.Join(AvailableFeatures(), " "))
return FeatureInvalid, fmt.Errorf("Error unrecognised feature: %s (%s)", s, fs)

203
internal/interpod_protocol.go

@ -0,0 +1,203 @@
package internal
import (
"net/http"
"net/url"
"sync"
"time"
"git.mills.io/yarnsocial/yarn/types"
"github.com/julienschmidt/httprouter"
)
const (
IPPPubEndpoint = "/ipp/pub"
IPPSubEndpoint = "/ipp/sub"
)
// IPPStore ...
type IPPStore struct {
sync.RWMutex
subscribers map[string]bool
}
// Init ...
func (i *IPPStore) Init() {
i.Lock()
defer i.Unlock()
i.subscribers = make(map[string]bool)
}
// Add adds a subscriber to the store.
func (i *IPPStore) Add(url string) {
i.Lock()
defer i.Unlock()
i.subscribers[url] = true
}
// Remove removes a subscriber from the store.
func (i *IPPStore) Remove(url string) {
i.Lock()
defer i.Unlock()
delete(i.subscribers, url)
}
// Get returns a list of all subscribers.
func (i *IPPStore) Get() map[string]bool {
i.RLock()
defer i.RUnlock()
return i.subscribers
}
// NewIPPStore ...
func NewIPPStore() *IPPStore {
var store IPPStore
store.Init()
return &store
}
// IPPPubHandler handles publish events received from peer pods.
//
// The parameter :url: is passed as a header value, corresponding to
// the feed url that was just updated.
//
// To prevent malicious cache insertions with unwanted feeds, we
// only fetch feeds we have previously cached, from twters we have
// previously cached.
func (s *Server) IPPPubHandler() httprouter.Handle {
var isLocalUrl = IsLocalURLFactory(s.config)
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
if !s.config.Features.IsEnabled(FeatureIPP) {
return
}
// The URI of the feed that got updated.
uri := r.Header.Get("x-ipp-uri")
// Sanity check.
if uri == "" {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(http.StatusText(http.StatusBadRequest)))
return
}
w.WriteHeader(http.StatusAccepted)
w.Write([]byte(http.StatusText(http.StatusAccepted)))
// Ignore blacklisted feeds, as well as local feeds.
if s.cache.conf.BlacklistedFeed(uri) || isLocalUrl(uri) {
return
}
// Only refresh feeds that we have previously cached.
if !s.cache.IsCached(uri) {
return
}
// Pull the twter from the cache.
twter := s.cache.GetTwter(uri)
if twter == nil {
return
}
// Refresh the feed.
s.tasks.DispatchFunc(func() error {
sources := make(types.Feeds)
sources[types.Feed{Nick: twter.Nick, URL: twter.URI}] = true
s.cache.FetchFeeds(s.config, s.archive, sources, nil)
return nil
})
}
}
// IPPSubHandler handles subscription requests from pods that want
// to subscribe to publish events.
func (s *Server) IPPSubHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
if !s.config.Features.IsEnabled(FeatureIPP) {
return
}
// The other pod's IPP publish endpoint, where we should send
// publish events.
callback := r.Header.Get("x-ipp-callback")
// Validate URL.
_, err := url.Parse(callback)
if callback == "" || err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(http.StatusText(http.StatusBadRequest)))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(http.StatusText(http.StatusOK)))
s.subscribers.Add(callback)
}
}
// PublishIPP publishes an IPP notification to all subscribed pods,
// notifying them of an updated feed on this pod.
//
// Publish events are sent concurrently, in order to avoid a slow pod
// causing an upstream latency issue.
func (s *Server) PublishIPP(twter types.Twter) {
s.tasks.DispatchFunc(func() error {
var resp *http.Response
client := http.Client{
Timeout: 5 * time.Second,
}
// Send a publish event to all subscribers.
for sub := range s.subscribers.Get() {
go func(sub string) {
req, _ := http.NewRequest(http.MethodPost, sub, nil)
req.Header.Set("x-ipp-uri", sub)
resp, _ = client.Do(req)
// The receiving pod has received the request but doesn't
// recognize it, therefore stop sending it publish events.
//
// This can happen:
// 1) If the other pod has IPP disabled
// 2) If we sent a bad IPP request
// 3) The receiver wasn't a pod at all
if resp.StatusCode != http.StatusAccepted {
s.subscribers.Remove(sub)
}
resp.Body.Close()
}(sub)
}
return nil
})
}
// SubscribeIPP subscribes this pod to another pod's IPP notifications.
func (s *Server) SubscribeIPP(feeds types.Feeds) {
var isLocalUrl = IsLocalURLFactory(s.config)
var resp *http.Response
client := http.Client{
Timeout: 5 * time.Second,
}
// Subscribe to each feed to start receiving publish events from
// each one.
for feed := range feeds {
// Don't subscribe to our own pod.
if isLocalUrl(feed.URL) {
continue
}
// Validate URL.
host, err := url.Parse(feed.URL)
if err != nil {
continue
}
// Make a subscription request.
req, _ := http.NewRequest(http.MethodPost, host.Host+IPPSubEndpoint, nil)
req.Header.Set("x-ipp-callback", s.config.LocalURL().Host+IPPPubEndpoint)
resp, _ = client.Do(req)
resp.Body.Close()
}
}

8
internal/post_handler.go

@ -78,6 +78,9 @@ func (s *Server) PostHandler() httprouter.Handle {
// If we are simply deleting the last twt, we have no need to proceed
// further.
if r.Method == http.MethodDelete {
if s.config.Features.IsEnabled(FeatureIPP) {
s.PublishIPP(ctx.User.Twter(s.config))
}
return
}
}
@ -156,6 +159,11 @@ func (s *Server) PostHandler() httprouter.Handle {
// Refresh user views.
s.cache.GetByUser(ctx.User, true)
// Publish Inter-Pod Protocol.
if s.config.Features.IsEnabled(FeatureIPP) {
s.PublishIPP(ctx.User.Twter(s.config))
}
// WebMentions ...
// TODO: Use a queue here instead?
// TODO: Fix Webmentions

14
internal/server.go

@ -86,8 +86,10 @@ type Server struct {
// Translator
translator *Translator
// Factory Functions
// Inter-Pod Protocol Store
subscribers *IPPStore
// Factory Functions
AppendTwt AppendTwtFunc
FilterTwts FilterTwtsFunc
}
@ -673,13 +675,16 @@ func (s *Server) initRoutes() {
s.router.POST("/delete", httproutermiddleware.Handler("delete", s.am.MustAuth(s.DeleteHandler()), mdlw))
// Support / Report Abuse handlers
s.router.GET("/support", httproutermiddleware.Handler("support", s.SupportHandler(), mdlw))
s.router.POST("/support", httproutermiddleware.Handler("support", s.SupportHandler(), mdlw))
s.router.GET("/_captcha", httproutermiddleware.Handler("captcha", s.CaptchaHandler(), mdlw))
s.router.GET("/report", httproutermiddleware.Handler("report", s.ReportHandler(), mdlw))
s.router.POST("/report", httproutermiddleware.Handler("report", s.ReportHandler(), mdlw))
// InterPod Publishing Protocol
s.router.POST("/ipp/pub", httproutermiddleware.Handler("ipp_pub", s.IPPPubHandler(), mdlw))
s.router.POST("/ipp/sub", httproutermiddleware.Handler("ipp_sub", s.IPPSubHandler(), mdlw))
}
// NewServer ...
@ -764,6 +769,8 @@ func NewServer(bind string, options ...Option) (*Server, error) {
api := NewAPI(router, config, cache, archive, db, pm, tasks)
ippstore := NewIPPStore()
var handler http.Handler
csrfHandler := nosurf.New(router)
@ -822,6 +829,9 @@ func NewServer(bind string, options ...Option) (*Server, error) {
// Translator
translator: translator,
// Inter-Pod Protocol Store
subscribers: ippstore,
}
// Factory functions that require access to the Pod Config and Store

Loading…
Cancel
Save