journal/conn_test.go

167 lines
3.8 KiB
Go
Raw Normal View History

package journal
import (
"bytes"
"encoding/binary"
"encoding/hex"
"io"
"net"
"path/filepath"
"strconv"
"testing"
)
type testingCommon interface {
TempDir() string
Fatalf(string, ...any)
}
// testConnector spawns a Conn with a local Unix datagram socket that checks
// incoming datagrams for well-formedness but otherwise discards them.
func testConnector(t testingCommon) *Conn {
sockPath := filepath.Join(t.TempDir(), "test-socket")
sock, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
Name: sockPath,
Net: "unixgram",
})
if err != nil {
t.Fatalf("testConnector: ListenUnix: %v", err)
}
go func() {
buf := make([]byte, 1<<16 /*enough for our tests */)
for {
n, err := sock.Read(buf)
switch err {
case nil:
case io.EOF:
return
default:
t.Fatalf("testConnector: Read: %v", err)
}
ok, pos, desc := checkWellFormedProto(buf[:n])
if !ok {
t.Fatalf("received malformed data at pos 0x%x: %s\n%s", pos, desc, hex.Dump(buf[:n]))
}
}
}()
conn, err := Connect(sockPath)
if err != nil {
t.Fatalf("testConnector: DialUnix: %v", err)
}
conn.ErrHandler = func(err error) {
t.Fatalf("testConnector: connection error: %v", err)
}
return conn
}
func checkWellFormedProto(buf []byte) (ok bool, pos int, desc string) {
for pos < len(buf) {
// grab attribute name up to '=' or '\n'
off := bytes.IndexAny(buf[pos:], "=\n")
if off == -1 {
return false, pos, "unterminated key"
}
key := string(buf[pos : pos+off])
if err := AttrKeyValid(key); err != nil {
return false, pos, err.Error()
}
pos += off
// for KEY=VALUE, the value is terminated by a newline
if buf[pos] == '=' {
pos++
off = bytes.IndexByte(buf[pos:], '\n')
if off == -1 {
return false, pos, "unterminated value"
}
pos += off // consume value
pos++ // consume trailing newline
continue
}
// otherwise, expect an 8-bit little-endian length
pos++ // consume newline after key
if pos+8 > len(buf) {
return false, pos, "value length too short"
}
vlen := binary.LittleEndian.Uint64(buf[pos:])
pos += 8
if vlen > uint64(len(buf)) /* protect against overflow */ ||
uint64(pos)+vlen+1 > uint64(len(buf)) {
return false, pos, "value length too long"
}
pos += int(vlen)
if buf[pos] != '\n' {
return false, pos, "value not terminated by newline"
}
pos++
}
return true, pos, ""
}
// TestConcurrentEntries is best run with the race detector, and tries to pick
// up any faults that might occur when concurrent goroutines write into the same
// Conn.
func TestConcurrentEntries(t *testing.T) {
c := testConnector(t)
// attributes which will be common to all EntryErr calls
attr := make([]Attr, 0, 10 /* enough capacity to avoid realloc on append; might trigger data races */)
attr = append(attr, Attr{
Key: AttrKey{
key: "HELLO",
},
Value: []byte("world"),
})
// spawn goroutines
start := make(chan struct{})
for i := 0; i < 16; i++ {
go func() {
<-start
for j := 0; j < 100; j++ {
err := c.EntryErr(PriDebug, "message "+strconv.Itoa(i)+"."+strconv.Itoa(j), attr)
if err != nil {
t.Fatalf("message %d.%d error: %v", i, j, err)
}
}
}()
}
// try to get all the goroutines to start at roughly the same time
close(start)
}
// BenchmarkEntry is a benchmark for the common Entry function.
func BenchmarkEntry(b *testing.B) {
c := testConnector(b)
// add some common attributes
c.Common = make([]Attr, 0, 10)
c.Common = append(c.Common, Attr{
Key: AttrKey{
key: "COMMON_ATTR",
},
Value: []byte("abc123\n"),
})
// attributes which will be common to all EntryErr calls
attr := make([]Attr, 0, 10)
attr = append(attr, Attr{
Key: AttrKey{
key: "HELLO",
},
Value: []byte("world"),
})
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := c.EntryErr(PriDebug, "hello world!", attr)
if err != nil {
b.Fatalf("message %d: error %v", i, err)
}
}
}