journal/testsink/testsink.go

83 lines
1.6 KiB
Go
Raw Permalink Normal View History

/*
Package testsink provides a partial implementation of the systemd-journald
Unix socket. Datagrams received on its socket are decoded and stored for unit
tests to examine.
*/
package testsink
import (
"net"
"slices"
"sync"
)
// Sink provides a Unix socket and captures messages sent to it using the
// systemd-journald wire protocol.
type Sink struct {
sock *net.UnixConn
stop chan struct{}
lock sync.Mutex
mcond *sync.Cond
msgs []Message
err error
}
// New returns a new Sink that is listening on the given path.
func New(sockpath string) (*Sink, error) {
sock, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
Name: sockpath,
Net: "unixgram",
})
if err != nil {
return nil, err
}
sink := &Sink{
sock: sock,
stop: make(chan struct{}, 1),
}
sink.mcond = sync.NewCond(&sink.lock)
go sink.stopper()
go sink.recv()
return sink, nil
}
// Stop listening and close the socket.
func (sink *Sink) Stop() {
// non-blocking write onto channel; we only need to read from it once in
// order to stop the receiver, but using this rather than close ensures
// Stop() can be called multiple times without negative side effects
select {
case sink.stop <- struct{}{}:
default:
}
}
func (sink *Sink) stopper() {
<-sink.stop
sink.sock.Close()
}
func (sink *Sink) recv() {
buf := make([]byte, 131072)
for {
n, err := sink.sock.Read(buf)
sink.lock.Lock()
if n > 0 {
sink.msgs = append(sink.msgs, Message{
Raw: slices.Clone(buf[:n]),
})
}
if err != nil {
sink.err = err
}
sink.lock.Unlock()
sink.mcond.Broadcast()
if err != nil {
return
}
}
}