Compare commits
4 Commits
Author | SHA1 | Date |
---|---|---|
|
596691457a | |
|
c579349c05 | |
|
d3666c9152 | |
|
0c0a814a4e |
28
conn.go
28
conn.go
|
@ -2,6 +2,7 @@ package journal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -23,6 +24,7 @@ type Conn struct {
|
||||||
|
|
||||||
s *net.UnixConn
|
s *net.UnixConn
|
||||||
bufs sync.Pool
|
bufs sync.Pool
|
||||||
|
atts sync.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect to the systemd journal. If the path string is empty, then it uses the
|
// Connect to the systemd journal. If the path string is empty, then it uses the
|
||||||
|
@ -40,23 +42,26 @@ func Connect(path string) (*Conn, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Conn{
|
c := &Conn{
|
||||||
s: s,
|
s: s,
|
||||||
bufs: sync.Pool{
|
bufs: sync.Pool{
|
||||||
New: func() any {
|
New: func() any {
|
||||||
return &net.Buffers{}
|
return &net.Buffers{}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}, nil
|
}
|
||||||
|
c.atts = sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
return make([]Attr, 0, 2+len(c.Common))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var messageAttrKey = AttrKey{key: "MESSAGE"}
|
var messageAttrKey = AttrKey{key: "MESSAGE"}
|
||||||
|
|
||||||
// Entry emits a log entry. It will add PRIORITY and MESSAGE key/value pairs
|
// Entry emits a log entry. It will add PRIORITY and MESSAGE key/value pairs
|
||||||
// as well as any Common attributes.
|
// as well as any Common attributes.
|
||||||
//
|
|
||||||
// Note: to avoid allocation / garbage, ensure attrs has capacity for an extra
|
|
||||||
// 2+len(c.Common) values.
|
|
||||||
func (c *Conn) Entry(pri Priority, msg string, attrs []Attr) {
|
func (c *Conn) Entry(pri Priority, msg string, attrs []Attr) {
|
||||||
err := c.EntryErr(pri, msg, attrs)
|
err := c.EntryErr(pri, msg, attrs)
|
||||||
switch {
|
switch {
|
||||||
|
@ -72,12 +77,18 @@ func (c *Conn) Entry(pri Priority, msg string, attrs []Attr) {
|
||||||
// EntryErr is like Entry, but will propagate errors to the caller, rather than
|
// EntryErr is like Entry, but will propagate errors to the caller, rather than
|
||||||
// using the built-in error handler.
|
// using the built-in error handler.
|
||||||
func (c *Conn) EntryErr(pri Priority, msg string, attrs []Attr) error {
|
func (c *Conn) EntryErr(pri Priority, msg string, attrs []Attr) error {
|
||||||
attrs = append(attrs, pri.Attr(), Attr{
|
a := c.atts.Get().([]Attr)
|
||||||
|
a = slices.Grow(a[:0], 2+len(c.Common)+len(attrs))
|
||||||
|
a = append(a, pri.Attr(), Attr{
|
||||||
Key: messageAttrKey,
|
Key: messageAttrKey,
|
||||||
Value: []byte(msg),
|
Value: []byte(msg),
|
||||||
})
|
})
|
||||||
attrs = append(attrs, c.Common...)
|
a = append(a, c.Common...)
|
||||||
return c.WriteAttrs(attrs)
|
a = append(a, attrs...)
|
||||||
|
err := c.WriteAttrs(a)
|
||||||
|
slices.Delete(a, 0, len(a)) // ensure GC doesn't see refs to old data
|
||||||
|
c.atts.Put(a)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteAttrs is a low-level method which writes a journal entry comprised of
|
// WriteAttrs is a low-level method which writes a journal entry comprised of
|
||||||
|
@ -86,6 +97,7 @@ func (c *Conn) WriteAttrs(attrs []Attr) error {
|
||||||
buf := c.bufs.Get().(*net.Buffers)
|
buf := c.bufs.Get().(*net.Buffers)
|
||||||
*buf = (*buf)[:0]
|
*buf = (*buf)[:0]
|
||||||
err := WireWrite(buf, c.s, attrs)
|
err := WireWrite(buf, c.s, attrs)
|
||||||
|
slices.Delete(*buf, 0, len(*buf)) // ensure GC doesn't see refs to old data
|
||||||
c.bufs.Put(buf)
|
c.bufs.Put(buf)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,166 @@
|
||||||
|
package journal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"encoding/hex"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testingCommon interface {
|
||||||
|
TempDir() string
|
||||||
|
Fatalf(string, ...any)
|
||||||
|
}
|
||||||
|
|
||||||
|
// testConnector spawns a Conn with a local Unix datagram socket that checks
|
||||||
|
// incoming datagrams for well-formedness but otherwise discards them.
|
||||||
|
func testConnector(t testingCommon) *Conn {
|
||||||
|
sockPath := filepath.Join(t.TempDir(), "test-socket")
|
||||||
|
sock, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
|
||||||
|
Name: sockPath,
|
||||||
|
Net: "unixgram",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("testConnector: ListenUnix: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
buf := make([]byte, 1<<16 /*enough for our tests */)
|
||||||
|
for {
|
||||||
|
n, err := sock.Read(buf)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
case io.EOF:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
t.Fatalf("testConnector: Read: %v", err)
|
||||||
|
}
|
||||||
|
ok, pos, desc := checkWellFormedProto(buf[:n])
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("received malformed data at pos 0x%x: %s\n%s", pos, desc, hex.Dump(buf[:n]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
conn, err := Connect(sockPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("testConnector: DialUnix: %v", err)
|
||||||
|
}
|
||||||
|
conn.ErrHandler = func(err error) {
|
||||||
|
t.Fatalf("testConnector: connection error: %v", err)
|
||||||
|
}
|
||||||
|
return conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkWellFormedProto(buf []byte) (ok bool, pos int, desc string) {
|
||||||
|
for pos < len(buf) {
|
||||||
|
// grab attribute name up to '=' or '\n'
|
||||||
|
off := bytes.IndexAny(buf[pos:], "=\n")
|
||||||
|
if off == -1 {
|
||||||
|
return false, pos, "unterminated key"
|
||||||
|
}
|
||||||
|
key := string(buf[pos : pos+off])
|
||||||
|
if err := AttrKeyValid(key); err != nil {
|
||||||
|
return false, pos, err.Error()
|
||||||
|
}
|
||||||
|
pos += off
|
||||||
|
|
||||||
|
// for KEY=VALUE, the value is terminated by a newline
|
||||||
|
if buf[pos] == '=' {
|
||||||
|
pos++
|
||||||
|
off = bytes.IndexByte(buf[pos:], '\n')
|
||||||
|
if off == -1 {
|
||||||
|
return false, pos, "unterminated value"
|
||||||
|
}
|
||||||
|
pos += off // consume value
|
||||||
|
pos++ // consume trailing newline
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// otherwise, expect an 8-bit little-endian length
|
||||||
|
pos++ // consume newline after key
|
||||||
|
if pos+8 > len(buf) {
|
||||||
|
return false, pos, "value length too short"
|
||||||
|
}
|
||||||
|
vlen := binary.LittleEndian.Uint64(buf[pos:])
|
||||||
|
pos += 8
|
||||||
|
if vlen > uint64(len(buf)) /* protect against overflow */ ||
|
||||||
|
uint64(pos)+vlen+1 > uint64(len(buf)) {
|
||||||
|
return false, pos, "value length too long"
|
||||||
|
}
|
||||||
|
pos += int(vlen)
|
||||||
|
if buf[pos] != '\n' {
|
||||||
|
return false, pos, "value not terminated by newline"
|
||||||
|
}
|
||||||
|
pos++
|
||||||
|
}
|
||||||
|
return true, pos, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestConcurrentEntries is best run with the race detector, and tries to pick
|
||||||
|
// up any faults that might occur when concurrent goroutines write into the same
|
||||||
|
// Conn.
|
||||||
|
func TestConcurrentEntries(t *testing.T) {
|
||||||
|
c := testConnector(t)
|
||||||
|
|
||||||
|
// attributes which will be common to all EntryErr calls
|
||||||
|
attr := make([]Attr, 0, 10 /* enough capacity to avoid realloc on append; might trigger data races */)
|
||||||
|
attr = append(attr, Attr{
|
||||||
|
Key: AttrKey{
|
||||||
|
key: "HELLO",
|
||||||
|
},
|
||||||
|
Value: []byte("world"),
|
||||||
|
})
|
||||||
|
|
||||||
|
// spawn goroutines
|
||||||
|
start := make(chan struct{})
|
||||||
|
for i := 0; i < 16; i++ {
|
||||||
|
go func() {
|
||||||
|
<-start
|
||||||
|
for j := 0; j < 100; j++ {
|
||||||
|
err := c.EntryErr(PriDebug, "message "+strconv.Itoa(i)+"."+strconv.Itoa(j), attr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("message %d.%d error: %v", i, j, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to get all the goroutines to start at roughly the same time
|
||||||
|
close(start)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkEntry is a benchmark for the common Entry function.
|
||||||
|
func BenchmarkEntry(b *testing.B) {
|
||||||
|
c := testConnector(b)
|
||||||
|
|
||||||
|
// add some common attributes
|
||||||
|
c.Common = make([]Attr, 0, 10)
|
||||||
|
c.Common = append(c.Common, Attr{
|
||||||
|
Key: AttrKey{
|
||||||
|
key: "COMMON_ATTR",
|
||||||
|
},
|
||||||
|
Value: []byte("abc123\n"),
|
||||||
|
})
|
||||||
|
|
||||||
|
// attributes which will be common to all EntryErr calls
|
||||||
|
attr := make([]Attr, 0, 10)
|
||||||
|
attr = append(attr, Attr{
|
||||||
|
Key: AttrKey{
|
||||||
|
key: "HELLO",
|
||||||
|
},
|
||||||
|
Value: []byte("world"),
|
||||||
|
})
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
err := c.EntryErr(PriDebug, "hello world!", attr)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("message %d: error %v", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
"slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -19,6 +20,7 @@ var (
|
||||||
// atomic, and this function is safe to use across goroutines. Otherwise,
|
// atomic, and this function is safe to use across goroutines. Otherwise,
|
||||||
// writes may be interleaved.
|
// writes may be interleaved.
|
||||||
func WireWrite(buf *net.Buffers, w io.Writer, attrs []Attr) error {
|
func WireWrite(buf *net.Buffers, w io.Writer, attrs []Attr) error {
|
||||||
|
*buf = slices.Grow(*buf, len(*buf)+len(attrs)*4)
|
||||||
for i := range attrs {
|
for i := range attrs {
|
||||||
key := attrs[i].Key.key
|
key := attrs[i].Key.key
|
||||||
if key == "" {
|
if key == "" {
|
||||||
|
|
Loading…
Reference in New Issue