diff --git a/conn_test.go b/conn_test.go new file mode 100644 index 0000000..34bdffa --- /dev/null +++ b/conn_test.go @@ -0,0 +1,166 @@ +package journal + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "io" + "net" + "path/filepath" + "strconv" + "testing" +) + +type testingCommon interface { + TempDir() string + Fatalf(string, ...any) +} + +// testConnector spawns a Conn with a local Unix datagram socket that checks +// incoming datagrams for well-formedness but otherwise discards them. +func testConnector(t testingCommon) *Conn { + sockPath := filepath.Join(t.TempDir(), "test-socket") + sock, err := net.ListenUnixgram("unixgram", &net.UnixAddr{ + Name: sockPath, + Net: "unixgram", + }) + if err != nil { + t.Fatalf("testConnector: ListenUnix: %v", err) + } + + go func() { + buf := make([]byte, 1<<16 /*enough for our tests */) + for { + n, err := sock.Read(buf) + switch err { + case nil: + case io.EOF: + return + default: + t.Fatalf("testConnector: Read: %v", err) + } + ok, pos, desc := checkWellFormedProto(buf[:n]) + if !ok { + t.Fatalf("received malformed data at pos 0x%x: %s\n%s", pos, desc, hex.Dump(buf[:n])) + } + } + }() + + conn, err := Connect(sockPath) + if err != nil { + t.Fatalf("testConnector: DialUnix: %v", err) + } + conn.ErrHandler = func(err error) { + t.Fatalf("testConnector: connection error: %v", err) + } + return conn +} + +func checkWellFormedProto(buf []byte) (ok bool, pos int, desc string) { + for pos < len(buf) { + // grab attribute name up to '=' or '\n' + off := bytes.IndexAny(buf[pos:], "=\n") + if off == -1 { + return false, pos, "unterminated key" + } + key := string(buf[pos : pos+off]) + if err := AttrKeyValid(key); err != nil { + return false, pos, err.Error() + } + pos += off + + // for KEY=VALUE, the value is terminated by a newline + if buf[pos] == '=' { + pos++ + off = bytes.IndexByte(buf[pos:], '\n') + if off == -1 { + return false, pos, "unterminated value" + } + pos += off // consume value + pos++ // consume trailing newline + continue + } + + // otherwise, expect an 8-bit little-endian length + pos++ // consume newline after key + if pos+8 > len(buf) { + return false, pos, "value length too short" + } + vlen := binary.LittleEndian.Uint64(buf[pos:]) + pos += 8 + if vlen > uint64(len(buf)) /* protect against overflow */ || + uint64(pos)+vlen+1 > uint64(len(buf)) { + return false, pos, "value length too long" + } + pos += int(vlen) + if buf[pos] != '\n' { + return false, pos, "value not terminated by newline" + } + pos++ + } + return true, pos, "" +} + +// TestConcurrentEntries is best run with the race detector, and tries to pick +// up any faults that might occur when concurrent goroutines write into the same +// Conn. +func TestConcurrentEntries(t *testing.T) { + c := testConnector(t) + + // attributes which will be common to all EntryErr calls + attr := make([]Attr, 0, 10 /* enough capacity to avoid realloc on append; might trigger data races */) + attr = append(attr, Attr{ + Key: AttrKey{ + key: "HELLO", + }, + Value: []byte("world"), + }) + + // spawn goroutines + start := make(chan struct{}) + for i := 0; i < 16; i++ { + go func() { + <-start + for j := 0; j < 100; j++ { + err := c.EntryErr(PriDebug, "message "+strconv.Itoa(i)+"."+strconv.Itoa(j), attr) + if err != nil { + t.Fatalf("message %d.%d error: %v", i, j, err) + } + } + }() + } + + // try to get all the goroutines to start at roughly the same time + close(start) +} + +// BenchmarkEntry is a benchmark for the common Entry function. +func BenchmarkEntry(b *testing.B) { + c := testConnector(b) + + // add some common attributes + c.Common = make([]Attr, 0, 10) + c.Common = append(c.Common, Attr{ + Key: AttrKey{ + key: "COMMON_ATTR", + }, + Value: []byte("abc123\n"), + }) + + // attributes which will be common to all EntryErr calls + attr := make([]Attr, 0, 10) + attr = append(attr, Attr{ + Key: AttrKey{ + key: "HELLO", + }, + Value: []byte("world"), + }) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := c.EntryErr(PriDebug, "hello world!", attr) + if err != nil { + b.Fatalf("message %d: error %v", i, err) + } + } +}