Browse Source

Initial working agent

pull/10/head
James Mills 4 years ago
parent
commit
6e27c57781
Signed by: prologic GPG Key ID: AC4C014F1440EBD6
  1. 198
      agent/agent.go
  2. 34
      agent/swarm.go
  3. 92
      agent/utils.go
  4. 32
      agent/utils_test.go
  5. 1
      cmd/autodock-agent/.gitignore
  6. 100
      cmd/autodock-agent/main.go
  7. 1
      config/config.go
  8. 2
      metrics/metrics.go
  9. 18
      server/server.go

198
agent/agent.go

@ -0,0 +1,198 @@
package agent
import (
"context"
"encoding/json"
"net/http"
"strings"
"time"
etypes "github.com/docker/docker/api/types/events"
dockerclient "github.com/docker/docker/client"
msgbusclient "github.com/prologic/msgbus/client"
log "github.com/sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prologic/autodock/config"
"github.com/prologic/autodock/events"
"github.com/prologic/autodock/metrics"
)
// Agent ...
type Agent struct {
cfg *config.Config
client *dockerclient.Client
msgbus *msgbusclient.Client
metrics *metrics.Metrics
}
var (
errChan chan (error)
eventChan chan *events.Message
eventErrChan chan (error)
restartChan chan (bool)
recoverChan chan (bool)
)
// NewAgent ...
func NewAgent(cfg *config.Config) (*Agent, error) {
s := &Agent{
cfg: cfg,
msgbus: msgbusclient.NewClient(cfg.MsgBusURL, nil),
metrics: metrics.NewMetrics(),
}
client, err := s.getDockerClient()
if err != nil {
return nil, err
}
s.client = client
// channel setup
errChan = make(chan error)
eventErrChan = make(chan error)
restartChan = make(chan bool)
recoverChan = make(chan bool)
eventChan = make(chan *events.Message)
// eventErrChan handler
// this handles event stream errors
go func() {
for range eventErrChan {
// error from swarm event stream; attempt to restart
log.Error("event stream fail; attempting to reconnect")
s.waitForSwarm()
restartChan <- true
}
}()
// errChan handler
// this is a general error handling channel
go func() {
for err := range errChan {
log.Error(err)
// HACK: check for errors from swarm and restart
// events. an example is "No primary manager elected"
// before the event handler is created and thus
// won't send the error there
if strings.Index(err.Error(), "500 Internal Server Error") > -1 {
log.Error("swarm error detected")
s.waitForSwarm()
restartChan <- true
}
}
}()
// restartChan handler
go func() {
for range restartChan {
log.Debug("starting event handling")
log.Debug("using event stream")
ctx, cancel := context.WithCancel(context.Background())
evtChan, evtErrChan := client.Events(ctx, types.EventsOptions{})
defer cancel()
go func(ch <-chan error) {
for {
err := <-ch
eventErrChan <- err
}
}(evtErrChan)
// since the event stream channel is receive
// only we wrap it to be able to send
// autodock events on the autodock chan
go func(ch <-chan etypes.Message) {
for {
msg := <-ch
m := &events.Message{msg}
eventChan <- m
}
}(evtChan)
// trigger initial load
eventChan <- &events.Message{
etypes.Message{
ID: "0",
Status: "autodock-start",
},
}
}
}()
go func() {
for e := range eventChan {
log.Debugf(
"event received: id=%s, type=%s status=%s action=%s",
e.ID, e.Type, e.Status, e.Action,
)
if e.ID == "" && e.Type == "" {
continue
}
topic := string(e.Type)
payload, err := json.Marshal(e)
if err != nil {
log.Errorf("error encoding event: %s", err)
} else {
// FIXME: We're doing a lot of copies here :/ string -> []byte
err := s.msgbus.Publish(topic, string(payload))
if err != nil {
log.Errorf("error publishing event %s: %s", topic, err)
} else {
s.metrics.EventsProcessed.Inc()
}
}
}
}()
// uptime ticker
t := time.NewTicker(time.Second * 1)
go func() {
for range t.C {
s.metrics.Uptime.Inc()
}
}()
// start event handler
restartChan <- true
return s, nil
}
func (s *Agent) waitForSwarm() {
log.Debug("waiting for event stream to become ready")
for {
if _, err := s.client.Info(context.Background()); err == nil {
log.Debug("event stream appears to have recovered; restarting handler")
return
}
log.Warn("event stream not yet ready; retrying")
time.Sleep(time.Second * 1)
}
}
// Run ...
func (s *Agent) Run() error {
http.Handle(
"/metrics",
prometheus.Handler(),
)
if err := http.ListenAndServe(s.cfg.Bind, nil); err != nil {
return err
}
return nil
}

34
agent/swarm.go

@ -0,0 +1,34 @@
package agent
import (
"golang.org/x/net/context"
)
type Node struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Addr string `json:"addr,omitempty"`
Containers string `json:"containers,omitempty"`
ReservedCPUs string `json:"reserved_cpus,omitempty"`
ReservedMemory string `json:"reserved_memory,omitempty"`
Labels []string `json:"labels,omitempty"`
}
func (s *Agent) getSwarmNodes() ([]*Node, error) {
client, err := s.getDockerClient()
if err != nil {
return nil, err
}
info, err := client.Info(context.Background())
if err != nil {
return nil, err
}
nodes, err := parseSwarmNodes(info.DriverStatus)
if err != nil {
return nil, err
}
return nodes, nil
}

92
agent/utils.go

@ -0,0 +1,92 @@
package agent
import (
"strings"
engineClient "github.com/docker/docker/client"
"github.com/prologic/autodock/client"
)
func (s *Agent) getDockerURL() string {
return client.GetDockerURL(s.cfg.DockerURL)
}
func (s *Agent) getDockerClient() (*engineClient.Client, error) {
return client.GetDockerClient(
s.cfg.DockerURL,
s.cfg.TLSCACert,
s.cfg.TLSCert,
s.cfg.TLSKey,
s.cfg.AllowInsecure,
)
}
// HACK: until we get a consumable endpoint from swarm we must parse the
// node list from /info
func parseSwarmNodes(driverStatus [][2]string) ([]*Node, error) {
nodes := []*Node{}
var node *Node
nodeComplete := false
name := ""
addr := ""
containers := ""
reservedCPUs := ""
reservedMemory := ""
labels := []string{}
for _, l := range driverStatus {
if len(l) != 2 {
continue
}
label := l[0]
data := l[1]
// cluster info label i.e. "Filters" or "Strategy"
if strings.Index(label, "\u0008") > -1 {
continue
}
if strings.Index(label, " └") == -1 {
name = label
addr = data
}
// node info like "Containers"
switch label {
case " └ Containers":
containers = data
case " └ Reserved CPUs":
reservedCPUs = data
case " └ Reserved Memory":
reservedMemory = data
case " └ Labels":
lbls := strings.Split(data, ",")
labels = lbls
nodeComplete = true
default:
continue
}
if nodeComplete {
node = &Node{
Name: name,
Addr: addr,
Containers: containers,
ReservedCPUs: reservedCPUs,
ReservedMemory: reservedMemory,
Labels: labels,
}
nodes = append(nodes, node)
// reset info
name = ""
addr = ""
containers = ""
reservedCPUs = ""
reservedMemory = ""
labels = []string{}
nodeComplete = false
}
}
return nodes, nil
}

32
agent/utils_test.go

@ -0,0 +1,32 @@
package agent
import (
"testing"
)
func TestParseSwarmNodes(t *testing.T) {
driverStatus := [][2]string{
[2]string{"\u0008Strategy", "spread"},
[2]string{"\u0008Filters", "affinity, health, constraint"},
[2]string{"\u0008Nodes", "1"},
[2]string{"localhost", "127.0.0.1:2375"},
[2]string{" └ Containers", "10"},
[2]string{" └ Reserved CPUs", "1 / 4"},
[2]string{" └ Reserved Memory", "2 / 8.083GiB"},
[2]string{" └ Labels", "executiondriver=native-0.2, kernelversion=3.16.0-4-amd64, operatingsystem=Debian GNU/Linux 8 (jessie), storagedriver=btrfs"},
[2]string{"remote", "1.2.3.4:2375"},
[2]string{" └ Containers", "3"},
[2]string{" └ Reserved CPUs", "0 / 4"},
[2]string{" └ Reserved Memory", "0 / 8.083GiB"},
[2]string{" └ Labels", "executiondriver=native-0.2, kernelversion=3.16.0-4-amd64, operatingsystem=Debian GNU/Linux 8 (jessie), storagedriver=aufs"},
}
nodes, err := parseSwarmNodes(driverStatus)
if err != nil {
t.Fatal(err)
}
if len(nodes) != 2 {
t.Fatalf("expected 2 nodes; received %d", len(nodes))
}
}

1
cmd/autodock-agent/.gitignore

@ -0,0 +1 @@
autodock

100
cmd/autodock-agent/main.go

@ -0,0 +1,100 @@
package main
import (
"fmt"
"os"
log "github.com/sirupsen/logrus"
pkgver "github.com/prologic/autodock/version"
"github.com/namsral/flag"
"github.com/prologic/autodock/agent"
"github.com/prologic/autodock/config"
)
func main() {
var (
dockerurl string
msgbusurl string
tlsverify bool
tlscacert string
tlscert string
tlskey string
tls bool
debug bool
version bool
bind string
)
flag.String(flag.DefaultConfigFlagname, "", "path to config file")
flag.BoolVar(&debug, "debug", false, "enable debug logging")
flag.BoolVar(&version, "v", false, "display version information")
flag.StringVar(
&bind, "bind", "0.0.0.0:8000",
"[int]:<port> to bind to for HTTP",
)
flag.StringVar(&msgbusurl, "msgbusurl", "", "MessageBus URL to connect to")
flag.StringVar(&dockerurl, "dockerurl", "", "Docker URL to connect to")
flag.BoolVar(&tls, "tls", false, "Use TLS; implied by --tlsverify")
flag.StringVar(
&tlscacert, "tlscacert", "",
"Trust certs signed only by this CA",
)
flag.StringVar(
&tlscert, "tlscert", "",
"Path to TLS certificate file",
)
flag.StringVar(
&tlskey, "tlskey", "",
"Path to TLS key file",
)
flag.BoolVar(
&tlsverify, "tlsverify", true,
"Use TLS and verify the remote",
)
flag.Parse()
if version {
fmt.Printf("autodock-agent v%s", pkgver.FullVersion())
os.Exit(0)
}
if msgbusurl == "" {
fmt.Printf("no message bus url specified")
os.Exit(1)
}
if debug {
log.SetLevel(log.DebugLevel)
}
cfg := &config.Config{
Debug: debug,
Bind: bind,
MsgBusURL: msgbusurl,
DockerURL: dockerurl,
TLSCACert: tlscacert,
TLSCert: tlscert,
TLSKey: tlskey,
AllowInsecure: !tlsverify,
}
srv, err := agent.NewAgent(cfg)
if err != nil {
log.Fatal(err)
}
if err := srv.Run(); err != nil {
log.Fatal(err)
}
}

1
config/config.go

@ -4,6 +4,7 @@ package config
type Config struct {
Debug bool
Bind string
MsgBusURL string
DockerURL string
TLSCACert string
TLSCert string

2
server/metrics.go → metrics/metrics.go

@ -1,4 +1,4 @@
package server
package metrics
import (
"github.com/prometheus/client_golang/prometheus"

18
server/server.go

@ -6,17 +6,19 @@ import (
"strings"
"time"
"github.com/prologic/autodock/config"
"github.com/prologic/autodock/events"
"github.com/prologic/autodock/proxy"
"github.com/docker/docker/api/types"
etypes "github.com/docker/docker/api/types/events"
"github.com/docker/docker/client"
"github.com/prologic/msgbus"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
etypes "github.com/docker/docker/api/types/events"
log "github.com/sirupsen/logrus"
"github.com/prologic/autodock/config"
"github.com/prologic/autodock/events"
"github.com/prologic/autodock/metrics"
"github.com/prologic/autodock/proxy"
)
const (
@ -28,7 +30,7 @@ type Server struct {
client *client.Client
msgbus *msgbus.MessageBus
proxy *proxy.Proxy
metrics *Metrics
metrics *metrics.Metrics
containerHash string
}
@ -45,7 +47,7 @@ func NewServer(cfg *config.Config) (*Server, error) {
s := &Server{
cfg: cfg,
msgbus: msgbus.NewMessageBus(&msgbus.Options{}),
metrics: NewMetrics(),
metrics: metrics.NewMetrics(),
containerHash: "",
}

Loading…
Cancel
Save