A real-time message bus server and library written in Go with strong consistency and reliability guarantees.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
James Mills f7bc71d0d0
Fix execution of notify step in CI
3 months ago
.chglog Add release tools, GoReleaser config and chglog config 11 months ago
.dockerfiles Improve the Docker image 11 months ago
client check for context cancelled errors using errors.Is (#39) 10 months ago
cmd Refactor client with correct reconnecting behaviour, contexts and a clean design (#37) 10 months ago
examples Fix Subscribe() deadlock (#34) 10 months ago
grafana Add grafana dashboard 5 years ago
logs Add support for a write-ahead-log (WAL) to persist messages (#33) 10 months ago
tools Fix Subscribe() deadlock (#34) 10 months ago
.dockerignore Improve the Docker image 11 months ago
.drone.yml Fix execution of notify step in CI 3 months ago
.gitignore Add support for a write-ahead-log (WAL) to persist messages (#33) 10 months ago
.goreleaser.yml Add release tools, GoReleaser config and chglog config 11 months ago
CHANGELOG.md Update CHANGELOG for v0.1.20 10 months ago
Dockerfile Improve the Docker image 11 months ago
LICENSE Add LICENSE 6 years ago
Makefile Fix Subscribe() deadlock (#34) 10 months ago
README.md Add support for subscribers to start from an index (#26) 10 months ago
docker-compose.yml Fixed tests, Added Drone CI config and Docker Stackfile 5 years ago
go.mod Set a unique client id per subscriber connection (#35) 10 months ago
go.sum Set a unique client id per subscriber connection (#35) 10 months ago
metrics.go Add support for subscribers to start from an index (#26) 10 months ago
metrics_test.go Added support for counter vectors 5 years ago
msgbus.go Fix panic updating metrics when not enabled 10 months ago
msgbus_test.go Fix Subscribe() deadlock (#34) 10 months ago
options.go Fix Subscribe() deadlock (#34) 10 months ago
preflight.sh Fix Makefile 11 months ago
queue.go Add support for subscribers to start from an index (#26) 10 months ago
queue_test.go Add support for subscribers to start from an index (#26) 10 months ago
utils.go Set a unique client id per subscriber connection (#35) 10 months ago
version.go Fix version 11 months ago
version_test.go Fix version test 11 months ago

README.md

msgbus

Build Status Go Reference

A real-time message bus server and library written in Go.

Features

  • Simple HTTP API
  • Simple command-line client
  • In memory queues
  • WebSockets for real-time messages
  • Pull and Push model

Install

$ go install git.mills.io/prologic/msgbus/cmd/...

Use Cases

  • As a simple generic webhook

You can use msgbus as a simple generic webhook. For example in my dockerfiles repo I have hooked up Prometheus's AlertManager to send alert notifications to an IRC channel using some simple shell scripts.

See: alert

  • As a general-purpose message / event bus that supports pub/sub as well as pulling messages synchronously.

Usage (library)

Install the package into your project:

$ go get git.mills.io/prologic/msgbus

Use the MessageBus type either directly:

package main

import (
    "log"

    "git.mills.io/prologic/msgbus"
)

func main() {
    m := msgbus.New()
    m.Put("foo", m.NewMessage([]byte("Hello World!")))

    msg, ok := m.Get("foo")
    if !ok {
        log.Printf("No more messages in queue: foo")
    } else {
        log.Printf
	    "Received message: id=%s topic=%s payload=%s",
	    msg.ID, msg.Topic, msg.Payload,
	)
    }
}

Running this example should yield something like this:

$ go run examples/hello.go
2017/08/09 03:01:54 [msgbus] PUT id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] NotifyAll id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] GET topic=foo
2017/08/09 03:01:54 Received message: id=%!s(uint64=0) topic=foo payload=Hello World!

See the godoc for further documentation and other examples.

Usage (tool)

Run the message bus daemon/server:

$ msgbusd
2017/08/07 01:11:16 [msgbus] Subscribe id=[::1]:55341 topic=foo
2017/08/07 01:11:22 [msgbus] PUT id=0 topic=foo payload=hi
2017/08/07 01:11:22 [msgbus] NotifyAll id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] PUT id=1 topic=foo payload=bye
2017/08/07 01:11:26 [msgbus] NotifyAll id=1 topic=foo payload=bye
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo

Subscribe to a topic using the message bus client:

$ msgbus sub foo
2017/08/07 01:11:22 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] received message: id=1 topic=foo payload=bye

Send a few messages with the message bus client:

$ msgbus pub foo hi
$ msgbus pub foo bye

You can also manually pull messages using the client:

$ msgbus pull foo
2017/08/07 01:11:33 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:33 [msgbus] received message: id=1 topic=foo payload=bye

This is slightly different from a listening subscriber (using websockets) where messages are pulled directly.

Usage (HTTP)

Run the message bus daemon/server:

$ msgbusd
2018/03/25 13:21:18 msgbusd listening on :8000

Send a message with using curl:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello

Pull the messages off the "hello" queue using curl:

$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}

Decode the payload:

$ echo 'eyJtZXNzYWdlIjogImhlbGxvIn0=' | base64 -d
{"message": "hello"}

API

GET /

List all known topics/queues.

Example:

$ curl -q -o - http://localhost:8000/ | jq '.'
{
  "hello": {
    "name": "hello",
    "ttl": 60000000000,
    "seq": 1,
    "created": "2018-05-07T23:44:25.681392205-07:00"
  }
}

POST|PUT /topic

Post a new message to the queue named by <topic>.

NB: Either POST or PUT methods can be used here.

Example:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
message successfully published to hello with sequence 1

GET /topic

Get the next message of the queue named by <topic>.

  • If the topic is not found. Returns: 404 Not Found
  • If the Websockets Upgrade header is found, upgrades to a websocket channel and subscribes to the topic <topic>. Each new message published to the topic <topic> are instantly published to all subscribers.

Example:

$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}

DELETE /topic

Deletes a queue named by <topic>.

Not implemented.

  • je -- A distributed job execution engine for the execution of batch jobs, workflows, remediations and more.

License

msgbus is licensed under the MIT License