Compare commits

...

4 Commits

Author SHA1 Message Date
Laurence Withers 596691457a Fix passing incorrect attrs slice to WireWrite 2024-04-05 13:44:49 +01:00
Laurence Withers c579349c05 Pre-size net.Buffers
This leads to a further reduction in allocations per call. Before:
	goos: linux
	goarch: amd64
	pkg: src.lwithers.me.uk/go/journal
	cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
	BenchmarkEntry-8   	12611158	    1445 ns/op	   221 B/op	     7 allocs/op
	PASS
	ok  	src.lwithers.me.uk/go/journal	19.673s

After:
	goos: linux
	goarch: amd64
	pkg: src.lwithers.me.uk/go/journal
	cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
	BenchmarkEntry-8   	13613377	     1329 ns/op	    149 B/op	      5 allocs/op
	PASS
	ok  	src.lwithers.me.uk/go/journal	19.435s
2024-02-18 14:44:20 +00:00
Laurence Withers d3666c9152 Fix slice gotchas including concurrency race
If the []Attr slice passed to Conn.Entry was used in several concurrent
calls, and it had sufficient capacity that append()ing to it succeeded
without reallocating the slice, then a data race would exist.

Fix this, and fix the efficiency gotcha which expected the attribute
slice to have additional capacity, by instead always constructing a new
final slice of []Attr and using a sync.Pool to avoid reallocating them.

Fixes go test -race, and improves the benchmark. Before:
	goos: linux
	goarch: amd64
	pkg: src.lwithers.me.uk/go/journal
	cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
	BenchmarkEntry-8   	8345163	    2139 ns/op	   848 B/op	    15 allocs/op
	PASS
	ok  	src.lwithers.me.uk/go/journal	20.034s

After:
	goos: linux
	goarch: amd64
	pkg: src.lwithers.me.uk/go/journal
	cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
	BenchmarkEntry-8   	12611158	    1445 ns/op	   221 B/op	     7 allocs/op
	PASS
	ok  	src.lwithers.me.uk/go/journal	19.673s
2024-02-18 14:43:33 +00:00
Laurence Withers 0c0a814a4e Add some higher level tests and benchmarks 2024-02-18 14:00:29 +00:00
3 changed files with 188 additions and 8 deletions

28
conn.go
View File

@ -2,6 +2,7 @@ package journal
import (
"net"
"slices"
"sync"
)
@ -23,6 +24,7 @@ type Conn struct {
s *net.UnixConn
bufs sync.Pool
atts sync.Pool
}
// Connect to the systemd journal. If the path string is empty, then it uses the
@ -40,23 +42,26 @@ func Connect(path string) (*Conn, error) {
return nil, err
}
return &Conn{
c := &Conn{
s: s,
bufs: sync.Pool{
New: func() any {
return &net.Buffers{}
},
},
}, nil
}
c.atts = sync.Pool{
New: func() any {
return make([]Attr, 0, 2+len(c.Common))
},
}
return c, nil
}
var messageAttrKey = AttrKey{key: "MESSAGE"}
// Entry emits a log entry. It will add PRIORITY and MESSAGE key/value pairs
// as well as any Common attributes.
//
// Note: to avoid allocation / garbage, ensure attrs has capacity for an extra
// 2+len(c.Common) values.
func (c *Conn) Entry(pri Priority, msg string, attrs []Attr) {
err := c.EntryErr(pri, msg, attrs)
switch {
@ -72,12 +77,18 @@ func (c *Conn) Entry(pri Priority, msg string, attrs []Attr) {
// EntryErr is like Entry, but will propagate errors to the caller, rather than
// using the built-in error handler.
func (c *Conn) EntryErr(pri Priority, msg string, attrs []Attr) error {
attrs = append(attrs, pri.Attr(), Attr{
a := c.atts.Get().([]Attr)
a = slices.Grow(a[:0], 2+len(c.Common)+len(attrs))
a = append(a, pri.Attr(), Attr{
Key: messageAttrKey,
Value: []byte(msg),
})
attrs = append(attrs, c.Common...)
return c.WriteAttrs(attrs)
a = append(a, c.Common...)
a = append(a, attrs...)
err := c.WriteAttrs(a)
slices.Delete(a, 0, len(a)) // ensure GC doesn't see refs to old data
c.atts.Put(a)
return err
}
// WriteAttrs is a low-level method which writes a journal entry comprised of
@ -86,6 +97,7 @@ func (c *Conn) WriteAttrs(attrs []Attr) error {
buf := c.bufs.Get().(*net.Buffers)
*buf = (*buf)[:0]
err := WireWrite(buf, c.s, attrs)
slices.Delete(*buf, 0, len(*buf)) // ensure GC doesn't see refs to old data
c.bufs.Put(buf)
return err
}

166
conn_test.go Normal file
View File

@ -0,0 +1,166 @@
package journal
import (
"bytes"
"encoding/binary"
"encoding/hex"
"io"
"net"
"path/filepath"
"strconv"
"testing"
)
type testingCommon interface {
TempDir() string
Fatalf(string, ...any)
}
// testConnector spawns a Conn with a local Unix datagram socket that checks
// incoming datagrams for well-formedness but otherwise discards them.
func testConnector(t testingCommon) *Conn {
sockPath := filepath.Join(t.TempDir(), "test-socket")
sock, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
Name: sockPath,
Net: "unixgram",
})
if err != nil {
t.Fatalf("testConnector: ListenUnix: %v", err)
}
go func() {
buf := make([]byte, 1<<16 /*enough for our tests */)
for {
n, err := sock.Read(buf)
switch err {
case nil:
case io.EOF:
return
default:
t.Fatalf("testConnector: Read: %v", err)
}
ok, pos, desc := checkWellFormedProto(buf[:n])
if !ok {
t.Fatalf("received malformed data at pos 0x%x: %s\n%s", pos, desc, hex.Dump(buf[:n]))
}
}
}()
conn, err := Connect(sockPath)
if err != nil {
t.Fatalf("testConnector: DialUnix: %v", err)
}
conn.ErrHandler = func(err error) {
t.Fatalf("testConnector: connection error: %v", err)
}
return conn
}
func checkWellFormedProto(buf []byte) (ok bool, pos int, desc string) {
for pos < len(buf) {
// grab attribute name up to '=' or '\n'
off := bytes.IndexAny(buf[pos:], "=\n")
if off == -1 {
return false, pos, "unterminated key"
}
key := string(buf[pos : pos+off])
if err := AttrKeyValid(key); err != nil {
return false, pos, err.Error()
}
pos += off
// for KEY=VALUE, the value is terminated by a newline
if buf[pos] == '=' {
pos++
off = bytes.IndexByte(buf[pos:], '\n')
if off == -1 {
return false, pos, "unterminated value"
}
pos += off // consume value
pos++ // consume trailing newline
continue
}
// otherwise, expect an 8-bit little-endian length
pos++ // consume newline after key
if pos+8 > len(buf) {
return false, pos, "value length too short"
}
vlen := binary.LittleEndian.Uint64(buf[pos:])
pos += 8
if vlen > uint64(len(buf)) /* protect against overflow */ ||
uint64(pos)+vlen+1 > uint64(len(buf)) {
return false, pos, "value length too long"
}
pos += int(vlen)
if buf[pos] != '\n' {
return false, pos, "value not terminated by newline"
}
pos++
}
return true, pos, ""
}
// TestConcurrentEntries is best run with the race detector, and tries to pick
// up any faults that might occur when concurrent goroutines write into the same
// Conn.
func TestConcurrentEntries(t *testing.T) {
c := testConnector(t)
// attributes which will be common to all EntryErr calls
attr := make([]Attr, 0, 10 /* enough capacity to avoid realloc on append; might trigger data races */)
attr = append(attr, Attr{
Key: AttrKey{
key: "HELLO",
},
Value: []byte("world"),
})
// spawn goroutines
start := make(chan struct{})
for i := 0; i < 16; i++ {
go func() {
<-start
for j := 0; j < 100; j++ {
err := c.EntryErr(PriDebug, "message "+strconv.Itoa(i)+"."+strconv.Itoa(j), attr)
if err != nil {
t.Fatalf("message %d.%d error: %v", i, j, err)
}
}
}()
}
// try to get all the goroutines to start at roughly the same time
close(start)
}
// BenchmarkEntry is a benchmark for the common Entry function.
func BenchmarkEntry(b *testing.B) {
c := testConnector(b)
// add some common attributes
c.Common = make([]Attr, 0, 10)
c.Common = append(c.Common, Attr{
Key: AttrKey{
key: "COMMON_ATTR",
},
Value: []byte("abc123\n"),
})
// attributes which will be common to all EntryErr calls
attr := make([]Attr, 0, 10)
attr = append(attr, Attr{
Key: AttrKey{
key: "HELLO",
},
Value: []byte("world"),
})
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := c.EntryErr(PriDebug, "hello world!", attr)
if err != nil {
b.Fatalf("message %d: error %v", i, err)
}
}
}

View File

@ -4,6 +4,7 @@ import (
"encoding/binary"
"io"
"net"
"slices"
)
var (
@ -19,6 +20,7 @@ var (
// atomic, and this function is safe to use across goroutines. Otherwise,
// writes may be interleaved.
func WireWrite(buf *net.Buffers, w io.Writer, attrs []Attr) error {
*buf = slices.Grow(*buf, len(*buf)+len(attrs)*4)
for i := range attrs {
key := attrs[i].Key.key
if key == "" {