@ -3,6 +3,7 @@ package saltyim
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"os"
@ -18,12 +19,47 @@ import (
"go.mills.io/saltyim/internal/exec"
)
const (
DefaultEnvPath = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
ServiceUser = "salty"
)
var (
ErrNoMessages = errors . New ( "error: no messages found" )
)
type addrCache map [ string ] * Addr
type Message struct {
Text string
Key * keys . EdX25519PublicKey
}
// TODO: Support shell quoting and escapes?
func parseExtraEnvs ( extraenvs string ) map [ string ] string {
env := make ( map [ string ] string )
for _ , extraenv := range strings . Split ( extraenvs , " " ) {
tokens := strings . SplitN ( extraenv , "=" , 2 )
switch len ( tokens ) {
case 1 :
env [ tokens [ 0 ] ] = ""
case 2 :
env [ tokens [ 0 ] ] = tokens [ 1 ]
}
}
return env
}
// PackMessage formts an outoing message in the Message Format
// <timestamp>\t(<sender>) <message>
func PackMessage ( me * Addr , msg string ) [ ] byte {
return [ ] byte ( fmt . Sprint ( time . Now ( ) . UTC ( ) . Format ( time . RFC3339 ) , "\t" , me . Formatted ( ) , "\t" , strings . TrimSpace ( msg ) , "\n" ) )
return [ ] byte (
fmt . Sprint (
time . Now ( ) . UTC ( ) . Format ( time . RFC3339 ) , "\t" ,
me . Formatted ( ) , "\t" ,
strings . TrimSpace ( msg ) , "\n" ,
) ,
)
}
// Send sends the encrypted message `msg` to the Endpoint `endpoint` using a
@ -41,45 +77,39 @@ func Send(endpoint, msg string) error {
// and Sedngina and Receiving messages to/from Salty IM Users.
type Client struct {
me * Addr
id * Identity
key * keys . EdX25519Key
cache addrCache
}
func ( c * Client ) String ( ) string {
b := & bytes . Buffer { }
fmt . Fprintln ( b , "Me: " , c . me )
fmt . Fprintln ( b , "Endpoint: " , c . me . Endpoint ( ) )
fmt . Fprintln ( b , "Key: " , c . key )
return b . String ( )
}
// NewClient reeturns a new Salty IM client for sending and receiving
// encrypted messages to other Salty IM users as well as decrypting
// and displaying messages of the user's own inbox.
func NewClient ( me * Addr , options ... IdentityOption ) ( * Client , error ) {
ident , err := GetIdentity ( options ... )
id , err := GetIdentity ( options ... )
if err != nil {
return nil , fmt . Errorf ( "error opening identity %s: %w" , ident . Source ( ) , err )
return nil , fmt . Errorf ( "error opening identity %s: %w" , id . Source ( ) , err )
}
if me == nil || me . IsZero ( ) {
me = ident . addr
me = id . addr
}
if me == nil || me . IsZero ( ) {
return nil , fmt . Errorf ( "unable to find your user addressn in %s" , ident . Source ( ) )
return nil , fmt . Errorf ( "unable to find your user addressn in %s" , id . Source ( ) )
}
if err := me . Refresh ( ) ; err != nil {
return nil , fmt . Errorf ( "error looking up user endpoint %s: %w" , me . HashURI ( ) , err )
log . WithError ( err ) . Warn ( "error looking up user endpoint" )
}
log . Debugf ( "Using identity %s with public key %s" , ident . Source ( ) , ident . key )
log . Debugf ( "Using identity %s with public key %s" , id . Source ( ) , id . key )
log . Debugf ( "Salty Addr is %s" , me )
log . Debugf ( "Endpoint is %s" , me . Endpoint ( ) )
return & Client {
me : me ,
key : ident . key ,
id : id ,
key : id . key ,
cache : make ( addrCache ) ,
} , nil
}
@ -100,37 +130,45 @@ func (cli *Client) getAddr(user string) (*Addr, error) {
return addr , nil
}
type Message struct {
Text string
Key * keys . EdX25519PublicKey
}
func ( cli * Client ) processMessage ( msg * msgbus . Message , extraenvs , prehook , posthook string ) ( Message , error ) {
var data [ ] byte
func ( cli * Client ) handleMessage ( prehook , posthook string , msgs chan Message ) msgbus . HandlerFunc {
return func ( msg * msgbus . Message ) error {
if prehook != "" {
out , err := exec . RunCmd ( exec . DefaultRunCmdTimeout , prehook , bytes . NewBuffer ( msg . Payload ) )
defer func ( ) {
if posthook != "" {
out , err := exec . RunCmd ( exec . DefaultRunCmdTimeout , cli . Env ( extraenvs ) , posthook , bytes . NewBuffer ( data ) )
if err != nil {
log . WithError ( err ) . Debugf ( "error running pre-hook %s" , pre hook )
log . WithError ( err ) . Debugf ( "error running post-hook %s" , post hook )
}
log . Debugf ( "pre -hook: %q" , out )
log . Debugf ( "post -hook: %q" , out )
}
} ( )
data , senderKey , err := salty . Decrypt ( cli . key , msg . Payload )
if prehook != "" {
out , err := exec . RunCmd ( exec . DefaultRunCmdTimeout , cli . Env ( extraenvs ) , prehook , bytes . NewBuffer ( msg . Payload ) )
if err != nil {
fmt . Fprintf ( os . Stderr , "error decrypting message" )
return err
log . WithError ( err ) . Debugf ( "error running pre-hook %s" , prehook )
}
log . Debugf ( "pre-hook: %q" , out )
}
msgs <- Message { Text : string ( data ) , Key : senderKey }
unencrypted , senderKey , err := salty . Decrypt ( cli . key , msg . Payload )
if err != nil {
return Message { } , fmt . Errorf ( "error decrypting message: %w" , err )
}
data = unencrypted [ : ]
if posthook != "" {
out , err := exec . RunCmd ( exec . DefaultRunCmdTimeout , posthook , bytes . NewBuffer ( data ) )
if err != nil {
log . WithError ( err ) . Debugf ( "error running post-hook %s" , posthook )
}
log . Debugf ( "post-hook: %q" , out )
return Message { Text : string ( data ) , Key : senderKey } , nil
}
func ( cli * Client ) messageHandler ( extraenvs , prehook , posthook string , msgs chan Message ) msgbus . HandlerFunc {
return func ( msg * msgbus . Message ) error {
message , err := cli . processMessage ( msg , extraenvs , prehook , posthook )
if err != nil {
return fmt . Errorf ( "error processing message: %w" , err )
}
msgs <- message
return nil
}
}
@ -138,13 +176,104 @@ func (cli *Client) handleMessage(prehook, posthook string, msgs chan Message) ms
func ( cli * Client ) Me ( ) * Addr { return cli . me }
func ( cli * Client ) Key ( ) * keys . EdX25519PublicKey { return cli . key . PublicKey ( ) }
// Read subscribers to this user's inbox for new messages
func ( cli * Client ) Read ( ctx context . Context , prehook , posthook string ) chan Message {
func ( cli * Client ) Env ( extraenvs string ) [ ] string {
Path := DefaultEnvPath
GoPath := os . Getenv ( "GOPATH" )
if GoPath != "" {
Path = fmt . Sprintf ( "%s/bin:%s" , GoPath , Path )
}
env := [ ] string {
fmt . Sprintf ( "PATH=%s" , Path ) ,
fmt . Sprintf ( "PWD=%s" , os . Getenv ( "PWD" ) ) ,
fmt . Sprintf ( "HOME=%s" , os . Getenv ( "HOME" ) ) ,
fmt . Sprintf ( "SALTY_USER=%s" , cli . me . String ( ) ) ,
fmt . Sprintf ( "SALTY_IDENTITY=%s" , cli . id . Source ( ) ) ,
}
for key , val := range parseExtraEnvs ( extraenvs ) {
log . Debugf ( "key: %q" , key )
log . Debugf ( "val: %q" , val )
val = os . ExpandEnv ( val )
if val == "" {
val = os . Getenv ( key )
}
if val != "" {
env = append ( env , fmt . Sprintf ( "%s=%s" , key , val ) )
}
}
log . Debugf ( "env: #%v" , env )
return env
}
func ( cli * Client ) String ( ) string {
b := & bytes . Buffer { }
fmt . Fprintln ( b , "Me: " , cli . me )
fmt . Fprintln ( b , "Endpoint: " , cli . me . Endpoint ( ) )
fmt . Fprintln ( b , "Key: " , cli . key )
return b . String ( )
}
// Drain drains this user's inbox by simulteneiously reading until empty anda
// subscribing to the inbox for new messages.
func ( cli * Client ) Drain ( ctx context . Context , extraenvs , prehook , posthook string ) chan Message {
msgs := make ( chan Message )
go func ( ) {
for {
msg , err := cli . Read ( extraenvs , prehook , posthook )
if err != nil {
if err == ErrNoMessages {
break
}
log . WithError ( err ) . Warn ( "error reading inbox" )
} else {
msgs <- msg
}
time . Sleep ( time . Millisecond * 100 )
}
} ( )
go func ( ) {
for msg := range cli . Subscribe ( ctx , extraenvs , prehook , posthook ) {
msgs <- msg
}
} ( )
go func ( ) {
<- ctx . Done ( )
close ( msgs )
} ( )
return msgs
}
// Read reads a single message from this user's inbox
func ( cli * Client ) Read ( extraenvs , prehook , posthook string ) ( Message , error ) {
uri , inbox := SplitInbox ( cli . me . Endpoint ( ) . String ( ) )
bus := msgbus_client . NewClient ( uri , nil )
msg , err := bus . Pull ( inbox )
if err != nil {
return Message { } , fmt . Errorf ( "error reading inbox: %w" , err )
}
if msg == nil {
return Message { } , ErrNoMessages
}
return cli . processMessage ( msg , extraenvs , prehook , posthook )
}
// Subscribe subscribers to this user's inbox for new messages
func ( cli * Client ) Subscribe ( ctx context . Context , extraenvs , prehook , posthook string ) chan Message {
uri , inbox := SplitInbox ( cli . me . Endpoint ( ) . String ( ) )
bus := msgbus_client . NewClient ( uri , nil )
msgs := make ( chan Message )
s := bus . Subscribe ( inbox , cli . handleMessage ( prehook , posthook , msgs ) )
s := bus . Subscribe ( inbox , cli . messageHandler ( extraenvs , prehook , posthook , msgs ) )
s . Start ( )
log . Debugf ( "Connected to %s/%s" , uri , inbox )