/* Package testsink provides a partial implementation of the systemd-journald Unix socket. Datagrams received on its socket are decoded and stored for unit tests to examine. */ package testsink import ( "net" "slices" "sync" ) // Sink provides a Unix socket and captures messages sent to it using the // systemd-journald wire protocol. type Sink struct { sock *net.UnixConn stop chan struct{} lock sync.Mutex mcond *sync.Cond msgs []Message err error } // New returns a new Sink that is listening on the given path. func New(sockpath string) (*Sink, error) { sock, err := net.ListenUnixgram("unixgram", &net.UnixAddr{ Name: sockpath, Net: "unixgram", }) if err != nil { return nil, err } sink := &Sink{ sock: sock, stop: make(chan struct{}, 1), } sink.mcond = sync.NewCond(&sink.lock) go sink.stopper() go sink.recv() return sink, nil } // Stop listening and close the socket. func (sink *Sink) Stop() { // non-blocking write onto channel; we only need to read from it once in // order to stop the receiver, but using this rather than close ensures // Stop() can be called multiple times without negative side effects select { case sink.stop <- struct{}{}: default: } } func (sink *Sink) stopper() { <-sink.stop sink.sock.Close() } func (sink *Sink) recv() { buf := make([]byte, 131072) for { n, err := sink.sock.Read(buf) sink.lock.Lock() if n > 0 { sink.msgs = append(sink.msgs, Message{ Raw: slices.Clone(buf[:n]), }) } if err != nil { sink.err = err } sink.lock.Unlock() sink.mcond.Broadcast() if err != nil { return } } }