167 lines
3.8 KiB
Go
167 lines
3.8 KiB
Go
|
package journal
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"encoding/binary"
|
||
|
"encoding/hex"
|
||
|
"io"
|
||
|
"net"
|
||
|
"path/filepath"
|
||
|
"strconv"
|
||
|
"testing"
|
||
|
)
|
||
|
|
||
|
type testingCommon interface {
|
||
|
TempDir() string
|
||
|
Fatalf(string, ...any)
|
||
|
}
|
||
|
|
||
|
// testConnector spawns a Conn with a local Unix datagram socket that checks
|
||
|
// incoming datagrams for well-formedness but otherwise discards them.
|
||
|
func testConnector(t testingCommon) *Conn {
|
||
|
sockPath := filepath.Join(t.TempDir(), "test-socket")
|
||
|
sock, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
|
||
|
Name: sockPath,
|
||
|
Net: "unixgram",
|
||
|
})
|
||
|
if err != nil {
|
||
|
t.Fatalf("testConnector: ListenUnix: %v", err)
|
||
|
}
|
||
|
|
||
|
go func() {
|
||
|
buf := make([]byte, 1<<16 /*enough for our tests */)
|
||
|
for {
|
||
|
n, err := sock.Read(buf)
|
||
|
switch err {
|
||
|
case nil:
|
||
|
case io.EOF:
|
||
|
return
|
||
|
default:
|
||
|
t.Fatalf("testConnector: Read: %v", err)
|
||
|
}
|
||
|
ok, pos, desc := checkWellFormedProto(buf[:n])
|
||
|
if !ok {
|
||
|
t.Fatalf("received malformed data at pos 0x%x: %s\n%s", pos, desc, hex.Dump(buf[:n]))
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
conn, err := Connect(sockPath)
|
||
|
if err != nil {
|
||
|
t.Fatalf("testConnector: DialUnix: %v", err)
|
||
|
}
|
||
|
conn.ErrHandler = func(err error) {
|
||
|
t.Fatalf("testConnector: connection error: %v", err)
|
||
|
}
|
||
|
return conn
|
||
|
}
|
||
|
|
||
|
func checkWellFormedProto(buf []byte) (ok bool, pos int, desc string) {
|
||
|
for pos < len(buf) {
|
||
|
// grab attribute name up to '=' or '\n'
|
||
|
off := bytes.IndexAny(buf[pos:], "=\n")
|
||
|
if off == -1 {
|
||
|
return false, pos, "unterminated key"
|
||
|
}
|
||
|
key := string(buf[pos : pos+off])
|
||
|
if err := AttrKeyValid(key); err != nil {
|
||
|
return false, pos, err.Error()
|
||
|
}
|
||
|
pos += off
|
||
|
|
||
|
// for KEY=VALUE, the value is terminated by a newline
|
||
|
if buf[pos] == '=' {
|
||
|
pos++
|
||
|
off = bytes.IndexByte(buf[pos:], '\n')
|
||
|
if off == -1 {
|
||
|
return false, pos, "unterminated value"
|
||
|
}
|
||
|
pos += off // consume value
|
||
|
pos++ // consume trailing newline
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// otherwise, expect an 8-bit little-endian length
|
||
|
pos++ // consume newline after key
|
||
|
if pos+8 > len(buf) {
|
||
|
return false, pos, "value length too short"
|
||
|
}
|
||
|
vlen := binary.LittleEndian.Uint64(buf[pos:])
|
||
|
pos += 8
|
||
|
if vlen > uint64(len(buf)) /* protect against overflow */ ||
|
||
|
uint64(pos)+vlen+1 > uint64(len(buf)) {
|
||
|
return false, pos, "value length too long"
|
||
|
}
|
||
|
pos += int(vlen)
|
||
|
if buf[pos] != '\n' {
|
||
|
return false, pos, "value not terminated by newline"
|
||
|
}
|
||
|
pos++
|
||
|
}
|
||
|
return true, pos, ""
|
||
|
}
|
||
|
|
||
|
// TestConcurrentEntries is best run with the race detector, and tries to pick
|
||
|
// up any faults that might occur when concurrent goroutines write into the same
|
||
|
// Conn.
|
||
|
func TestConcurrentEntries(t *testing.T) {
|
||
|
c := testConnector(t)
|
||
|
|
||
|
// attributes which will be common to all EntryErr calls
|
||
|
attr := make([]Attr, 0, 10 /* enough capacity to avoid realloc on append; might trigger data races */)
|
||
|
attr = append(attr, Attr{
|
||
|
Key: AttrKey{
|
||
|
key: "HELLO",
|
||
|
},
|
||
|
Value: []byte("world"),
|
||
|
})
|
||
|
|
||
|
// spawn goroutines
|
||
|
start := make(chan struct{})
|
||
|
for i := 0; i < 16; i++ {
|
||
|
go func() {
|
||
|
<-start
|
||
|
for j := 0; j < 100; j++ {
|
||
|
err := c.EntryErr(PriDebug, "message "+strconv.Itoa(i)+"."+strconv.Itoa(j), attr)
|
||
|
if err != nil {
|
||
|
t.Fatalf("message %d.%d error: %v", i, j, err)
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
// try to get all the goroutines to start at roughly the same time
|
||
|
close(start)
|
||
|
}
|
||
|
|
||
|
// BenchmarkEntry is a benchmark for the common Entry function.
|
||
|
func BenchmarkEntry(b *testing.B) {
|
||
|
c := testConnector(b)
|
||
|
|
||
|
// add some common attributes
|
||
|
c.Common = make([]Attr, 0, 10)
|
||
|
c.Common = append(c.Common, Attr{
|
||
|
Key: AttrKey{
|
||
|
key: "COMMON_ATTR",
|
||
|
},
|
||
|
Value: []byte("abc123\n"),
|
||
|
})
|
||
|
|
||
|
// attributes which will be common to all EntryErr calls
|
||
|
attr := make([]Attr, 0, 10)
|
||
|
attr = append(attr, Attr{
|
||
|
Key: AttrKey{
|
||
|
key: "HELLO",
|
||
|
},
|
||
|
Value: []byte("world"),
|
||
|
})
|
||
|
|
||
|
b.ResetTimer()
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
err := c.EntryErr(PriDebug, "hello world!", attr)
|
||
|
if err != nil {
|
||
|
b.Fatalf("message %d: error %v", i, err)
|
||
|
}
|
||
|
}
|
||
|
}
|