continuous-integration/drone/push Build is passing
Details
|
2 months ago | |
---|---|---|
.chglog | 1 year ago | |
.dockerfiles | 1 year ago | |
client | 1 year ago | |
cmd | 1 year ago | |
examples | 1 year ago | |
grafana | 5 years ago | |
logs | 1 year ago | |
tools | 1 year ago | |
.dockerignore | 1 year ago | |
.drone.yml | 7 months ago | |
.gitignore | 1 year ago | |
.goreleaser.yml | 1 year ago | |
CHANGELOG.md | 2 months ago | |
Dockerfile | 1 year ago | |
LICENSE | 6 years ago | |
Makefile | 1 year ago | |
README.md | 1 year ago | |
docker-compose.yml | 5 years ago | |
go.mod | 1 year ago | |
go.sum | 1 year ago | |
metrics.go | 1 year ago | |
metrics_test.go | 5 years ago | |
msgbus.go | 2 months ago | |
msgbus_test.go | 2 months ago | |
options.go | 1 year ago | |
preflight.sh | 1 year ago | |
queue.go | 2 months ago | |
queue_test.go | 2 months ago | |
utils.go | 1 year ago | |
version.go | 1 year ago | |
version_test.go | 1 year ago |
README.md
msgbus
A real-time message bus server and library written in Go.
Features
- Simple HTTP API
- Simple command-line client
- In memory queues
- WebSockets for real-time messages
- Pull and Push model
Install
$ go install git.mills.io/prologic/msgbus/cmd/...
Use Cases
- As a simple generic webhook
You can use msgbus as a simple generic webhook. For example in my dockerfiles repo I have hooked up Prometheus's AlertManager to send alert notifications to an IRC channel using some simple shell scripts.
See: alert
- As a general-purpose message / event bus that supports pub/sub as well as pulling messages synchronously.
Usage (library)
Install the package into your project:
$ go get git.mills.io/prologic/msgbus
Use the MessageBus
type either directly:
package main
import (
"log"
"git.mills.io/prologic/msgbus"
)
func main() {
m := msgbus.New()
m.Put("foo", m.NewMessage([]byte("Hello World!")))
msg, ok := m.Get("foo")
if !ok {
log.Printf("No more messages in queue: foo")
} else {
log.Printf
"Received message: id=%s topic=%s payload=%s",
msg.ID, msg.Topic, msg.Payload,
)
}
}
Running this example should yield something like this:
$ go run examples/hello.go
2017/08/09 03:01:54 [msgbus] PUT id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] NotifyAll id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] GET topic=foo
2017/08/09 03:01:54 Received message: id=%!s(uint64=0) topic=foo payload=Hello World!
See the godoc for further documentation and other examples.
Usage (tool)
Run the message bus daemon/server:
$ msgbusd
2017/08/07 01:11:16 [msgbus] Subscribe id=[::1]:55341 topic=foo
2017/08/07 01:11:22 [msgbus] PUT id=0 topic=foo payload=hi
2017/08/07 01:11:22 [msgbus] NotifyAll id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] PUT id=1 topic=foo payload=bye
2017/08/07 01:11:26 [msgbus] NotifyAll id=1 topic=foo payload=bye
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
Subscribe to a topic using the message bus client:
$ msgbus sub foo
2017/08/07 01:11:22 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] received message: id=1 topic=foo payload=bye
Send a few messages with the message bus client:
$ msgbus pub foo hi
$ msgbus pub foo bye
You can also manually pull messages using the client:
$ msgbus pull foo
2017/08/07 01:11:33 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:33 [msgbus] received message: id=1 topic=foo payload=bye
This is slightly different from a listening subscriber (using websockets) where messages are pulled directly.
Usage (HTTP)
Run the message bus daemon/server:
$ msgbusd
2018/03/25 13:21:18 msgbusd listening on :8000
Send a message with using curl
:
$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
Pull the messages off the "hello" queue using curl
:
$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}
Decode the payload:
$ echo 'eyJtZXNzYWdlIjogImhlbGxvIn0=' | base64 -d
{"message": "hello"}
API
GET /
List all known topics/queues.
Example:
$ curl -q -o - http://localhost:8000/ | jq '.'
{
"hello": {
"name": "hello",
"ttl": 60000000000,
"seq": 1,
"created": "2018-05-07T23:44:25.681392205-07:00"
}
}
POST|PUT /topic
Post a new message to the queue named by <topic>
.
NB: Either POST
or PUT
methods can be used here.
Example:
$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
message successfully published to hello with sequence 1
GET /topic
Get the next message of the queue named by <topic>
.
- If the topic is not found. Returns:
404 Not Found
- If the Websockets
Upgrade
header is found, upgrades to a websocket channel and subscribes to the topic<topic>
. Each new message published to the topic<topic>
are instantly published to all subscribers.
Example:
$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}
DELETE /topic
Deletes a queue named by <topic>
.
Not implemented.
Related Projects
- je -- A distributed job execution engine for the execution of batch jobs, workflows, remediations and more.
License
msgbus is licensed under the MIT License