Compare commits
9 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
df6107a67e | |
|
|
ecd852eede | |
|
|
8f364a74df | |
|
|
ffb40d7843 | |
|
|
b83a572500 | |
|
|
596691457a | |
|
|
c579349c05 | |
|
|
d3666c9152 | |
|
|
0c0a814a4e |
32
README.md
32
README.md
|
|
@ -1,3 +1,33 @@
|
|||
# journal
|
||||
|
||||
Native Go output to the systemd journal socket.
|
||||
Native Go output to the [systemd journal socket](https://systemd.io/JOURNAL_NATIVE_PROTOCOL/).
|
||||
See package `journslog` for a [`log/slog`](https://pkg.go.dev/log/slog) adapter.
|
||||
|
||||
Quick start:
|
||||
|
||||
```go
|
||||
import "src.lwithers.me.uk/go/journal"
|
||||
|
||||
func main() {
|
||||
journal.Entry(journal.PriInfo, "hello, world", nil)
|
||||
}
|
||||
```
|
||||
|
||||
The `log/slog` adapter:
|
||||
|
||||
```go
|
||||
import (
|
||||
"log/slog"
|
||||
|
||||
"src.lwithers.me.uk/go/journal/journslog"
|
||||
)
|
||||
|
||||
func main() {
|
||||
h, err := journslog.NewHandler()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
slog.SetDefault(slog.New(h))
|
||||
slog.Info("hello, world", "attr", "value")
|
||||
}
|
||||
```
|
||||
|
|
|
|||
34
conn.go
34
conn.go
|
|
@ -2,6 +2,7 @@ package journal
|
|||
|
||||
import (
|
||||
"net"
|
||||
"slices"
|
||||
"sync"
|
||||
)
|
||||
|
||||
|
|
@ -23,6 +24,7 @@ type Conn struct {
|
|||
|
||||
s *net.UnixConn
|
||||
bufs sync.Pool
|
||||
atts sync.Pool
|
||||
}
|
||||
|
||||
// Connect to the systemd journal. If the path string is empty, then it uses the
|
||||
|
|
@ -40,23 +42,26 @@ func Connect(path string) (*Conn, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return &Conn{
|
||||
c := &Conn{
|
||||
s: s,
|
||||
bufs: sync.Pool{
|
||||
New: func() any {
|
||||
return &net.Buffers{}
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
c.atts = sync.Pool{
|
||||
New: func() any {
|
||||
return make([]Attr, 0, 2+len(c.Common))
|
||||
},
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
var messageAttrKey = AttrKey{key: "MESSAGE"}
|
||||
|
||||
// Entry emits a log entry. It will add PRIORITY and MESSAGE key/value pairs
|
||||
// as well as any Common attributes.
|
||||
//
|
||||
// Note: to avoid allocation / garbage, ensure attrs has capacity for an extra
|
||||
// 2+len(c.Common) values.
|
||||
func (c *Conn) Entry(pri Priority, msg string, attrs []Attr) {
|
||||
err := c.EntryErr(pri, msg, attrs)
|
||||
switch {
|
||||
|
|
@ -72,12 +77,18 @@ func (c *Conn) Entry(pri Priority, msg string, attrs []Attr) {
|
|||
// EntryErr is like Entry, but will propagate errors to the caller, rather than
|
||||
// using the built-in error handler.
|
||||
func (c *Conn) EntryErr(pri Priority, msg string, attrs []Attr) error {
|
||||
attrs = append(attrs, pri.Attr(), Attr{
|
||||
a := c.atts.Get().([]Attr)
|
||||
a = slices.Grow(a[:0], 2+len(c.Common)+len(attrs))
|
||||
a = append(a, pri.Attr(), Attr{
|
||||
Key: messageAttrKey,
|
||||
Value: []byte(msg),
|
||||
})
|
||||
attrs = append(attrs, c.Common...)
|
||||
return c.WriteAttrs(attrs)
|
||||
a = append(a, c.Common...)
|
||||
a = append(a, attrs...)
|
||||
err := c.WriteAttrs(a)
|
||||
a = slices.Delete(a, 0, len(a)) // ensure GC doesn't see refs to old data
|
||||
c.atts.Put(a)
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteAttrs is a low-level method which writes a journal entry comprised of
|
||||
|
|
@ -86,6 +97,13 @@ func (c *Conn) WriteAttrs(attrs []Attr) error {
|
|||
buf := c.bufs.Get().(*net.Buffers)
|
||||
*buf = (*buf)[:0]
|
||||
err := WireWrite(buf, c.s, attrs)
|
||||
*buf = slices.Delete(*buf, 0, len(*buf)) // ensure GC doesn't see refs to old data
|
||||
c.bufs.Put(buf)
|
||||
return err
|
||||
}
|
||||
|
||||
// Close the underlying connection. After this, all attempts to write onto the
|
||||
// connection will return an error.
|
||||
func (c *Conn) Close() error {
|
||||
return c.s.Close()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,124 @@
|
|||
package journal
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"src.lwithers.me.uk/go/journal/testsink"
|
||||
)
|
||||
|
||||
// TestConn opens a connection to a test sink and writes one message,
|
||||
// ensuring it is received.
|
||||
func TestConn(t *testing.T) {
|
||||
sockpath := filepath.Join(t.TempDir(), "socket")
|
||||
sink, err := testsink.New(sockpath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer sink.Stop()
|
||||
|
||||
conn, err := Connect(sockpath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
conn.Entry(PriInfo, "hello, world", nil)
|
||||
|
||||
msg, err := sink.Message(0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msgText, attrs, err := msg.Decode()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if msgText != "hello, world" {
|
||||
t.Errorf("unexpected message text %q", msgText)
|
||||
}
|
||||
val, ok := testsink.GetAttr(attrs, "PRIORITY")
|
||||
switch {
|
||||
case !ok:
|
||||
t.Error("missing PRIORITY attribute")
|
||||
case val != "6":
|
||||
t.Error("unexpected PRIORITY value")
|
||||
}
|
||||
|
||||
if t.Failed() {
|
||||
for i := range attrs {
|
||||
t.Errorf("attr %q=%q", attrs[i].Key, attrs[i].Val)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestEntryBinary ensures that we can write a message with an attribute encoded
|
||||
// as a binary field.
|
||||
func TestEntryBinary(t *testing.T) {
|
||||
sockpath := filepath.Join(t.TempDir(), "socket")
|
||||
sink, err := testsink.New(sockpath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer sink.Stop()
|
||||
|
||||
conn, err := Connect(sockpath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
expAttrs := []Attr{
|
||||
Attr{
|
||||
Key: MustAttrKey("SHORT"),
|
||||
Value: []byte("short val"),
|
||||
},
|
||||
Attr{
|
||||
Key: MustAttrKey("BINARY"),
|
||||
Value: []byte("string with\n=embedded newline\nrequires binary protocol\n"),
|
||||
},
|
||||
Attr{
|
||||
Key: MustAttrKey("LAST"),
|
||||
Value: []byte("last\n"),
|
||||
},
|
||||
}
|
||||
conn.Entry(PriDebug, "hello, binary world", expAttrs)
|
||||
|
||||
msg, err := sink.Message(0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msgText, attrs, err := msg.Decode()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if msgText != "hello, binary world" {
|
||||
t.Errorf("unexpected message text %q", msgText)
|
||||
}
|
||||
|
||||
val, ok := testsink.GetAttr(attrs, "PRIORITY")
|
||||
switch {
|
||||
case !ok:
|
||||
t.Error("missing PRIORITY attribute")
|
||||
case val != "7":
|
||||
t.Error("unexpected PRIORITY value")
|
||||
}
|
||||
|
||||
for i := range expAttrs {
|
||||
key, expVal := expAttrs[i].Key.Key(), string(expAttrs[i].Value)
|
||||
val, ok = testsink.GetAttr(attrs, key)
|
||||
switch {
|
||||
case !ok:
|
||||
t.Errorf("missing %s attribute", key)
|
||||
case val != expVal:
|
||||
t.Errorf("unexpected %s value", key)
|
||||
}
|
||||
}
|
||||
|
||||
if t.Failed() {
|
||||
for i := range attrs {
|
||||
t.Errorf("attr %q=%q", attrs[i].Key, attrs[i].Val)
|
||||
}
|
||||
}
|
||||
}
|
||||
11
global.go
11
global.go
|
|
@ -26,7 +26,7 @@ const (
|
|||
// error otherwise. It will attempt to establish a connection to the journal
|
||||
// if there is not already one in progress.
|
||||
func CheckConnection() error {
|
||||
_, err := getGlobal()
|
||||
_, err := GetGlobalConn()
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -39,7 +39,12 @@ func SetGlobalConn(c *Conn) {
|
|||
global = c
|
||||
}
|
||||
|
||||
func getGlobal() (*Conn, error) {
|
||||
// GetGlobalConn returns the global (default) connection to the journal. It may
|
||||
// return an error if no connection could be established. If no connection is
|
||||
// yet established, it will attempt to establish one. This is rate limited to
|
||||
// avoid excessive connection attempts should there be a persistent problem
|
||||
// connecting to the journal socket.
|
||||
func GetGlobalConn() (*Conn, error) {
|
||||
globalLock.Lock()
|
||||
defer globalLock.Unlock()
|
||||
|
||||
|
|
@ -119,7 +124,7 @@ func SetGlobalErrHandler(errHandler func(err error)) {
|
|||
// Note: to avoid allocation / garbage, ensure attrs has capacity for an extra
|
||||
// 2+len(GlobalCommonAttr) values.
|
||||
func Entry(pri Priority, msg string, attrs []Attr) {
|
||||
c, _ := getGlobal()
|
||||
c, _ := GetGlobalConn()
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,83 @@
|
|||
package journslog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"src.lwithers.me.uk/go/journal"
|
||||
)
|
||||
|
||||
// HandlerOption is passed to [NewHandler] to control the behaviour of the
|
||||
// newly-created [JournalHandler].
|
||||
type HandlerOption func(jh *JournalHandler)
|
||||
|
||||
// WithHandlerOptions applies an [slog.HandlerOptions] to a newly-created
|
||||
// [JournalHandler].
|
||||
//
|
||||
// If AddSource is set, then the widley-recognised journal attributes
|
||||
// CODE_FILE, CODE_LINE and CODE_FUNC will be added to each journal entry.
|
||||
func WithHandlerOptions(opts slog.HandlerOptions) HandlerOption {
|
||||
return func(jh *JournalHandler) {
|
||||
jh.addSource = opts.AddSource
|
||||
if opts.Level == nil {
|
||||
jh.minLevel = slog.LevelInfo
|
||||
} else {
|
||||
jh.minLevel = opts.Level
|
||||
}
|
||||
jh.replaceAttr = opts.ReplaceAttr
|
||||
}
|
||||
}
|
||||
|
||||
// WithLevelMapper allows control over how [slog.Level] values are mapped to
|
||||
// [journal.Priority] values.
|
||||
func WithLevelMapper(levelMapper LevelMapper) HandlerOption {
|
||||
return func(jh *JournalHandler) {
|
||||
jh.levelMapper = levelMapper
|
||||
}
|
||||
}
|
||||
|
||||
// WithKeyMapper allows control over how attribute string keys are mapped to
|
||||
// journal keys.
|
||||
func WithKeyMapper(keyMapper KeyMapper) HandlerOption {
|
||||
return func(jh *JournalHandler) {
|
||||
jh.keyMapper = keyMapper
|
||||
}
|
||||
}
|
||||
|
||||
// WithConn uses the given [journal.Conn] connection object, rather than the
|
||||
// global default.
|
||||
func WithConn(conn *journal.Conn) HandlerOption {
|
||||
return func(jh *JournalHandler) {
|
||||
jh.conn = conn
|
||||
}
|
||||
}
|
||||
|
||||
// WithLevelFromContext supplies a function which can optionally extract a
|
||||
// more-specific minimum log level to use for a given message from a
|
||||
// [context.Context]. If the supplied levelFromContext returns true in its
|
||||
// second argument, then its first argument is used as the minimum level.
|
||||
// Otherwise, the handler's default minimum level will be used.
|
||||
func WithLevelFromContext(levelFromContext func(context.Context) (slog.Level, bool)) HandlerOption {
|
||||
return func(jh *JournalHandler) {
|
||||
jh.levelFromContext = levelFromContext
|
||||
}
|
||||
}
|
||||
|
||||
// WithAttrsFromContext supplies a function which can optionally extract
|
||||
// attributes from a given [context.Context] value. It may return nil or a
|
||||
// zero-length slice.
|
||||
func WithAttrsFromContext(attrsFromContext func(context.Context) []slog.Attr) HandlerOption {
|
||||
return func(jh *JournalHandler) {
|
||||
jh.attrsFromContext = attrsFromContext
|
||||
}
|
||||
}
|
||||
|
||||
// WithAttrsFromTime supplies a function which can optionally map the Time
|
||||
// field from an [slog.Record] onto zero or more attributes. The supplied
|
||||
// function is never called with a zero [time.Time].
|
||||
func WithAttrsFromTime(attrsFromTime func(time.Time) []slog.Attr) HandlerOption {
|
||||
return func(jh *JournalHandler) {
|
||||
jh.attrsFromTime = attrsFromTime
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,293 @@
|
|||
/*
|
||||
Package journslog implements an [slog.Handler] which writes output to the
|
||||
[systemd journal].
|
||||
|
||||
[systemd journal]: https://www.freedesktop.org/software/systemd/man/latest/systemd-journald.service.html
|
||||
*/
|
||||
package journslog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"src.lwithers.me.uk/go/journal"
|
||||
)
|
||||
|
||||
// JournalHandler is an [slog.Handler] which writes output to the
|
||||
// [systemd journal].
|
||||
//
|
||||
// Note that the handler discards the Time field from [slog.Record] by default,
|
||||
// as there is no standardised way to override the timestamp recorded by the
|
||||
// systemd-journal upon reception of a message. It is possible to emit a field
|
||||
// using [WithAttrsFromTime].
|
||||
//
|
||||
// [systemd journal]: https://www.freedesktop.org/software/systemd/man/latest/systemd-journald.service.html
|
||||
type JournalHandler struct {
|
||||
// addSource causes CODE_FILE, CODE_LINE and CODE_FUNC attrs to be added
|
||||
// to each entry.
|
||||
addSource bool
|
||||
|
||||
// minLevel is the minimum level of messages we emit. Lower levels are
|
||||
// not emitted. Note it can be overridden by levelFromContext.
|
||||
minLevel slog.Leveler
|
||||
|
||||
// replaceAttr is an optional function (may be nil) which the user can
|
||||
// use to map (resolved) [slog.Attr] values into other values, including
|
||||
// empty to skip.
|
||||
replaceAttr func(groups []string, a slog.Attr) slog.Attr
|
||||
|
||||
// levelMapper turns [slog.Level] values into [journal.Priority] values.
|
||||
levelMapper LevelMapper
|
||||
|
||||
// keyMapper turns [slog.Attr] key strings into [journal.AttrKey] values.
|
||||
keyMapper KeyMapper
|
||||
|
||||
// conn is the underlying journal connection to use.
|
||||
conn *journal.Conn
|
||||
|
||||
// levelFromContext allows overriding the minimum log level for a logger
|
||||
// based on the [context.Context] value. It may be nil.
|
||||
levelFromContext func(context.Context) (slog.Level, bool)
|
||||
|
||||
// attrsFromContext may supply additional attributes to the logger from
|
||||
// the [context.Context] value. It may be nil.
|
||||
attrsFromContext func(context.Context) []slog.Attr
|
||||
|
||||
// attrsFromTime may return zero or more attributes to represent the
|
||||
// Time from an [slog.Record]. It may be nil.
|
||||
attrsFromTime func(time.Time) []slog.Attr
|
||||
|
||||
// groups are the ordered group names added by WithGroup. We need to
|
||||
// maintain the full list, not just a pre-rendered prefix string,
|
||||
// because of the API of replaceAttr.
|
||||
groups []string
|
||||
|
||||
// group is the pre-rendered prefix string corresponding to groups.
|
||||
group groupPrefix
|
||||
|
||||
// jattrs are the pre-rendered attributes added by WithAttrs.
|
||||
jattrs []journal.Attr
|
||||
}
|
||||
|
||||
// NewHandler returns a new [JournalHandler]. It returns an error if a
|
||||
// connection to the journal socket could not be established.
|
||||
func NewHandler(opts ...HandlerOption) (*JournalHandler, error) {
|
||||
jh := &JournalHandler{
|
||||
minLevel: slog.LevelInfo,
|
||||
levelMapper: DefaultLevelMapper(),
|
||||
keyMapper: DefaultKeyMapper(),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(jh)
|
||||
}
|
||||
|
||||
// For fields which require allocation of resources, initialise them
|
||||
// after processing options in case the user already supplied a non-nil
|
||||
// value. In particular this means we don't cause the global connection
|
||||
// to the journal to be opened if the user supplied something more
|
||||
// specific.
|
||||
if jh.levelMapper == nil {
|
||||
jh.levelMapper = DefaultLevelMapper()
|
||||
}
|
||||
if jh.keyMapper == nil {
|
||||
jh.keyMapper = DefaultKeyMapper()
|
||||
}
|
||||
if jh.conn == nil {
|
||||
var err error
|
||||
if jh.conn, err = journal.GetGlobalConn(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return jh, nil
|
||||
}
|
||||
|
||||
// WithAttrs returns an [slog.Handler] containing some fixed attributes, which
|
||||
// will be emitted on all future calls to Handle. The attribute values are
|
||||
// resolved within this function call.
|
||||
func (jh *JournalHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||
if len(attrs) == 0 {
|
||||
return jh
|
||||
}
|
||||
|
||||
jattrs := make([]journal.Attr, 0, len(attrs))
|
||||
for _, attr := range attrs {
|
||||
jattrs = jh.renderAttribute(jattrs, jh.groups, jh.group, attr)
|
||||
}
|
||||
|
||||
if len(jattrs) == 0 {
|
||||
return jh
|
||||
}
|
||||
|
||||
nh := new(JournalHandler)
|
||||
*nh = *jh
|
||||
nh.jattrs = slices.Concat(jh.jattrs, jattrs)
|
||||
return nh
|
||||
}
|
||||
|
||||
// WithGroup returns an [slog.Handler] where are future attributes are assigned
|
||||
// to the given group.
|
||||
func (jh *JournalHandler) WithGroup(group string) slog.Handler {
|
||||
nh := new(JournalHandler)
|
||||
*nh = *jh
|
||||
nh.groups = append(slices.Clone(jh.groups), group)
|
||||
nh.group = jh.group.Append(group)
|
||||
return nh
|
||||
}
|
||||
|
||||
// Enabled returns true if the given log level is enabled, false otherwise.
|
||||
func (jh *JournalHandler) Enabled(ctx context.Context, level slog.Level) bool {
|
||||
var (
|
||||
minLevel slog.Level
|
||||
ok bool
|
||||
)
|
||||
// avoid call to jh.minLevel.Level() if we have a more specific value
|
||||
// from the context
|
||||
if jh.levelFromContext != nil {
|
||||
minLevel, ok = jh.levelFromContext(ctx)
|
||||
}
|
||||
if !ok {
|
||||
minLevel = jh.minLevel.Level()
|
||||
}
|
||||
return level >= minLevel
|
||||
}
|
||||
|
||||
// Handle a log record, writing it to the journal similar to [journal.Entry].
|
||||
func (jh *JournalHandler) Handle(ctx context.Context, rec slog.Record) error {
|
||||
pri := jh.levelMapper.LevelMap(rec.Level)
|
||||
|
||||
var addAttrs []slog.Attr
|
||||
if jh.attrsFromTime != nil && !rec.Time.IsZero() {
|
||||
addAttrs = attrPoolGet()
|
||||
defer attrPoolPut(addAttrs)
|
||||
addAttrs = append(addAttrs, jh.attrsFromTime(rec.Time)...)
|
||||
}
|
||||
if jh.attrsFromContext != nil {
|
||||
if addAttrs == nil {
|
||||
addAttrs = attrPoolGet()
|
||||
defer attrPoolPut(addAttrs)
|
||||
}
|
||||
addAttrs = append(addAttrs, jh.attrsFromContext(ctx)...)
|
||||
}
|
||||
|
||||
// 2 for journal.Conn.Entry (MESSAGE and PRIORITY),
|
||||
// 3 for SRC_* (optional but we always allocate space)
|
||||
numAttrsEstimate := 2 + len(jh.conn.Common) + 3 + len(addAttrs) + len(jh.jattrs) + rec.NumAttrs()
|
||||
jattrs := slices.Grow(jattrPoolGet(), numAttrsEstimate)
|
||||
defer jattrPoolPut(jattrs)
|
||||
|
||||
// add SRC_* if requested
|
||||
if jh.addSource && rec.PC != 0 {
|
||||
jattrs = append(jattrs, jh.sourceAttrs(rec.PC)...)
|
||||
}
|
||||
|
||||
// add time / context attributes, if any
|
||||
for i := range addAttrs {
|
||||
jattrs = jh.renderAttribute(jattrs, nil, "", addAttrs[i])
|
||||
}
|
||||
|
||||
// add pre-rendered attributes from WithAttrs
|
||||
jattrs = append(jattrs, jh.jattrs...)
|
||||
|
||||
// map [slog.Attr] values to [journal.Attr] values
|
||||
rec.Attrs(func(a slog.Attr) bool {
|
||||
jattrs = jh.renderAttribute(jattrs, jh.groups, jh.group, a)
|
||||
return true
|
||||
})
|
||||
|
||||
return jh.conn.EntryErr(pri, rec.Message, jattrs)
|
||||
}
|
||||
|
||||
// groupPrefix enables efficient building of attribute keys prepended by group
|
||||
// names, separated by '.'. Invariant: always n × (group-name '.'), n ≥ 0.
|
||||
type groupPrefix string
|
||||
|
||||
func (gp groupPrefix) Prepend(key string) string {
|
||||
if gp == "" { // avoid allocation in frequent path
|
||||
return key
|
||||
}
|
||||
return string(gp) + key
|
||||
}
|
||||
|
||||
func (gp groupPrefix) Append(group string) groupPrefix {
|
||||
var b strings.Builder
|
||||
b.WriteString(string(gp))
|
||||
b.WriteString(group)
|
||||
b.WriteRune('.')
|
||||
return groupPrefix(b.String())
|
||||
}
|
||||
|
||||
// renderAttribute turns an [slog.Attr] into some number of resolved,
|
||||
// fully-qualified [journal.Attr] values. It calls replaceAttr if defined.
|
||||
// New attributes are appended to the slice jattrs, and the new slice header
|
||||
// is returned.
|
||||
//
|
||||
// If the attribute (after replacement) is a zero [slog.Attr] value, or it is
|
||||
// of kind [slog.KindGroup] with zero child elements, no change is made to the
|
||||
// jattrs slice.
|
||||
//
|
||||
// If the attribute (after replacement) is of kind [slog.Group], the function
|
||||
// recurses, potentially adding multiple elements to the jattrs slice.
|
||||
// list:[]journal.Attr{...}}.
|
||||
//
|
||||
// Otherwise, the function appends a single element to the jattrs slice.
|
||||
func (jh *JournalHandler) renderAttribute(jattrs []journal.Attr, groups []string, gp groupPrefix, attr slog.Attr) []journal.Attr {
|
||||
val := attr.Value.Resolve()
|
||||
|
||||
if jh.replaceAttr != nil && val.Kind() != slog.KindGroup {
|
||||
attr = jh.replaceAttr(groups, attr)
|
||||
val = attr.Value.Resolve()
|
||||
}
|
||||
|
||||
if attr.Equal(slog.Attr{}) {
|
||||
return jattrs
|
||||
}
|
||||
|
||||
// handle slog.KindGroup by recursing
|
||||
if val.Kind() == slog.KindGroup {
|
||||
groups = append(groups, attr.Key)
|
||||
gp = gp.Append(attr.Key)
|
||||
for _, attr := range val.Group() {
|
||||
jattrs = jh.renderAttribute(jattrs, groups, gp, attr)
|
||||
}
|
||||
return jattrs
|
||||
}
|
||||
|
||||
key := gp.Prepend(attr.Key)
|
||||
return append(jattrs, journal.Attr{
|
||||
Key: jh.keyMapper.KeyMap(key),
|
||||
Value: []byte(val.String()),
|
||||
})
|
||||
}
|
||||
|
||||
var (
|
||||
srcAttrCodeFile = journal.MustAttrKey("CODE_FILE")
|
||||
srcAttrCodeLine = journal.MustAttrKey("CODE_LINE")
|
||||
srcAttrCodeFunc = journal.MustAttrKey("CODE_FUNC")
|
||||
)
|
||||
|
||||
// sourceAttrs returns the attributes we emit for source location.
|
||||
func (jh *JournalHandler) sourceAttrs(PC uintptr) []journal.Attr {
|
||||
fs := runtime.CallersFrames([]uintptr{PC})
|
||||
f, _ := fs.Next()
|
||||
|
||||
return []journal.Attr{
|
||||
journal.Attr{
|
||||
Key: srcAttrCodeFile,
|
||||
Value: []byte(f.File),
|
||||
},
|
||||
journal.Attr{
|
||||
Key: srcAttrCodeLine,
|
||||
Value: []byte(strconv.Itoa(f.Line)),
|
||||
},
|
||||
journal.Attr{
|
||||
Key: srcAttrCodeFunc,
|
||||
Value: []byte(f.Function),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,84 @@
|
|||
package journslog
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"src.lwithers.me.uk/go/journal"
|
||||
"src.lwithers.me.uk/go/journal/testsink"
|
||||
)
|
||||
|
||||
// TestHandler emits a log message with various attributes / groups and ensures
|
||||
// the result is logged successfully.
|
||||
func TestHandler(t *testing.T) {
|
||||
sockpath := filepath.Join(t.TempDir(), "socket")
|
||||
sink, err := testsink.New(sockpath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer sink.Stop()
|
||||
|
||||
conn, err := journal.Connect(sockpath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
jh, err := NewHandler(WithConn(conn))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
httpGroup := slog.Group("http", "path", "/foo", "status", http.StatusOK)
|
||||
logger := slog.New(jh)
|
||||
logger = logger.With("global_attr_key", "global_attr_val", httpGroup).WithGroup("app")
|
||||
logger.Info("hello, journal", "slowkey", slowValue("slowval"), slog.Group("g1", "attr1", "val1", "attr2", 123))
|
||||
|
||||
msg, err := sink.Message(0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msgText, attrs, err := msg.Decode()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if msgText != "hello, journal" {
|
||||
t.Errorf("unexpected message text %q", msgText)
|
||||
}
|
||||
|
||||
expAttrs := []testsink.DecodedAttr{
|
||||
testsink.DecodedAttr{Key: "PRIORITY", Val: "6"},
|
||||
testsink.DecodedAttr{Key: "GLOBAL_ATTR_KEY", Val: "global_attr_val"},
|
||||
testsink.DecodedAttr{Key: "HTTP_PATH", Val: "/foo"},
|
||||
testsink.DecodedAttr{Key: "HTTP_STATUS", Val: "200"},
|
||||
testsink.DecodedAttr{Key: "APP_SLOWKEY", Val: "slowval"},
|
||||
testsink.DecodedAttr{Key: "APP_G1_ATTR1", Val: "val1"},
|
||||
testsink.DecodedAttr{Key: "APP_G1_ATTR2", Val: "123"},
|
||||
}
|
||||
|
||||
for i := range expAttrs {
|
||||
key := expAttrs[i].Key
|
||||
val, ok := testsink.GetAttr(attrs, key)
|
||||
switch {
|
||||
case !ok:
|
||||
t.Errorf("missing %s attribute", key)
|
||||
case val != expAttrs[i].Val:
|
||||
t.Errorf("unexpected %s value", key)
|
||||
}
|
||||
}
|
||||
|
||||
if t.Failed() {
|
||||
for i := range attrs {
|
||||
t.Errorf("attr %q=%q", attrs[i].Key, attrs[i].Val)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type slowValue string
|
||||
|
||||
func (s slowValue) LogValue() slog.Value {
|
||||
return slog.StringValue(string(s))
|
||||
}
|
||||
|
|
@ -0,0 +1,97 @@
|
|||
package journslog
|
||||
|
||||
import (
|
||||
_ "log/slog"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"src.lwithers.me.uk/go/journal"
|
||||
)
|
||||
|
||||
// KeyMapper maps [slog.Attr] keys onto [journal.AttrKey] objects. Such a mapper
|
||||
// is necessary because systemd-journald is very restrictive about suitable
|
||||
// attribute keys.
|
||||
type KeyMapper interface {
|
||||
KeyMap(slogAttrKey string) journal.AttrKey
|
||||
}
|
||||
|
||||
// DefaultKeyMapper returns a [KeyMapper] with the package's default behaviour.
|
||||
// The returned mapper contains an internal, persistent map and a [sync.RWMutex]
|
||||
// to cache map results.
|
||||
//
|
||||
// The default behaviour is to strip leading underscores; to turn lowercase
|
||||
// ASCII ‘a’–‘z’ chars into uppercase ‘A’–‘Z’ (uppercase is passed through
|
||||
// verbatim), to accept underscores, and to map all other chars to underscores.
|
||||
// If the result is an empty string, it instead returns the key "BAD_ATTR_KEY".
|
||||
func DefaultKeyMapper() KeyMapper {
|
||||
return &defaultKeyMapper{
|
||||
badKey: journal.MustAttrKey("BAD_ATTR_KEY"),
|
||||
keys: make(map[string]journal.AttrKey),
|
||||
}
|
||||
}
|
||||
|
||||
type defaultKeyMapper struct {
|
||||
badKey journal.AttrKey
|
||||
keys map[string]journal.AttrKey
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
func (dkm *defaultKeyMapper) KeyMap(slogAttrKey string) journal.AttrKey {
|
||||
if key, ok := dkm.mapFast(slogAttrKey); ok {
|
||||
return key
|
||||
}
|
||||
return dkm.mapSlow(slogAttrKey)
|
||||
}
|
||||
|
||||
func (dkm *defaultKeyMapper) mapFast(slogAttrKey string) (journal.AttrKey, bool) {
|
||||
dkm.lock.RLock()
|
||||
defer dkm.lock.RUnlock()
|
||||
key, ok := dkm.keys[slogAttrKey]
|
||||
return key, ok
|
||||
}
|
||||
|
||||
func (dkm *defaultKeyMapper) mapSlow(slogAttrKey string) journal.AttrKey {
|
||||
key := dkm.mapFunc(slogAttrKey)
|
||||
|
||||
// it's OK if another goroutine in the slow path just wrote the key, we
|
||||
// will overwrite its result but the result will still be correct
|
||||
dkm.lock.Lock()
|
||||
defer dkm.lock.Unlock()
|
||||
dkm.keys[slogAttrKey] = key
|
||||
return key
|
||||
}
|
||||
|
||||
func (dkm *defaultKeyMapper) mapFunc(slogAttrKey string) journal.AttrKey {
|
||||
key, err := journal.NewAttrKey(slogAttrKey)
|
||||
if err == nil {
|
||||
return key
|
||||
}
|
||||
|
||||
var s strings.Builder
|
||||
for _, r := range slogAttrKey {
|
||||
switch {
|
||||
case r >= 'a' && r <= 'z':
|
||||
s.WriteRune(r - 'a' + 'A')
|
||||
case r >= '0' && r <= '9',
|
||||
r >= 'A' && r <= 'Z':
|
||||
s.WriteRune(r)
|
||||
default:
|
||||
if s.Len() > 0 {
|
||||
s.WriteRune('_')
|
||||
}
|
||||
}
|
||||
}
|
||||
if s.Len() == 0 {
|
||||
return dkm.badKey
|
||||
}
|
||||
|
||||
skey := s.String()
|
||||
if len(skey) > 255 {
|
||||
skey = skey[:255] // known to only contain ASCII chars
|
||||
}
|
||||
|
||||
if key, err = journal.NewAttrKey(skey); err != nil {
|
||||
return dkm.badKey
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
package journslog
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
|
||||
"src.lwithers.me.uk/go/journal"
|
||||
)
|
||||
|
||||
// LevelMapper maps [slog.Level] values onto [journal.Priority] values.
|
||||
type LevelMapper interface {
|
||||
LevelMap(slogLevel slog.Level) journal.Priority
|
||||
}
|
||||
|
||||
// DefaultLevelMapper returns a [LevelMapper] with the package's default
|
||||
// behaviour.
|
||||
//
|
||||
// The mapping for the four defined [slog.Level] constants is as follows:
|
||||
//
|
||||
// slog.Level journal.Priority
|
||||
// ---------- ----------------
|
||||
// slog.LevelDebug (-4) journal.PriDebug
|
||||
// slog.LevelInfo (0) journal.PriInfo
|
||||
// slog.LevelWarn (4) journal.PriWarning
|
||||
// slog.LevelError (8) journal.PriErr
|
||||
//
|
||||
// Other integer values are mapped as follows:
|
||||
//
|
||||
// integer slog.Level journal.Priority
|
||||
// ------------------ ----------------
|
||||
// -∞ ... -4 journal.PriDebug
|
||||
// -3 ... 0 journal.PriInfo
|
||||
// 1 ... 3 journal.PriNotice
|
||||
// 4 ... 7 journal.PriWarning
|
||||
// 8 journal.PriErr
|
||||
// 9 ... ∞ journal.PriCrit
|
||||
func DefaultLevelMapper() LevelMapper {
|
||||
return &defaultLevelMapper{}
|
||||
}
|
||||
|
||||
type defaultLevelMapper struct{}
|
||||
|
||||
func (*defaultLevelMapper) LevelMap(slogLevel slog.Level) journal.Priority {
|
||||
switch {
|
||||
case slogLevel <= slog.LevelDebug:
|
||||
return journal.PriDebug
|
||||
case slogLevel <= slog.LevelInfo:
|
||||
return journal.PriInfo
|
||||
case slogLevel < slog.LevelWarn:
|
||||
return journal.PriNotice
|
||||
case slogLevel < slog.LevelError:
|
||||
return journal.PriWarning
|
||||
case slogLevel == slog.LevelError:
|
||||
return journal.PriErr
|
||||
default:
|
||||
return journal.PriCrit
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
package journslog
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"src.lwithers.me.uk/go/journal"
|
||||
)
|
||||
|
||||
var attrPool = sync.Pool{
|
||||
New: func() any {
|
||||
return make([]slog.Attr, 0, 16)
|
||||
},
|
||||
}
|
||||
|
||||
func attrPoolGet() []slog.Attr {
|
||||
slice := attrPool.Get().([]slog.Attr)
|
||||
return slice[:0]
|
||||
}
|
||||
|
||||
func attrPoolPut(slice []slog.Attr) {
|
||||
clear(slice)
|
||||
attrPool.Put(slice)
|
||||
}
|
||||
|
||||
var jattrPool = sync.Pool{
|
||||
New: func() any {
|
||||
return make([]journal.Attr, 0, 16)
|
||||
},
|
||||
}
|
||||
|
||||
func jattrPoolGet() []journal.Attr {
|
||||
slice := jattrPool.Get().([]journal.Attr)
|
||||
return slice[:0]
|
||||
}
|
||||
|
||||
func jattrPoolPut(slice []journal.Attr) {
|
||||
clear(slice)
|
||||
jattrPool.Put(slice)
|
||||
}
|
||||
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
Package testsink provides a partial implementation of the systemd-journald
|
||||
Unix socket. Datagrams received on its socket are decoded and stored for unit
|
||||
tests to examine.
|
||||
*/
|
||||
package testsink
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Message returns the Nth message received. It waits for the message to arrive.
|
||||
func (sink *Sink) Message(N int) (Message, error) {
|
||||
sink.lock.Lock()
|
||||
defer sink.lock.Unlock()
|
||||
|
||||
for len(sink.msgs) <= N {
|
||||
sink.mcond.Wait()
|
||||
if sink.err != nil {
|
||||
return Message{}, sink.err
|
||||
}
|
||||
}
|
||||
return sink.msgs[N], nil
|
||||
}
|
||||
|
||||
// Message is recorded for each datagram received.
|
||||
type Message struct {
|
||||
Raw []byte
|
||||
}
|
||||
|
||||
type DecodedAttr struct {
|
||||
Key, Val string
|
||||
}
|
||||
|
||||
func (m *Message) Decode() (msg string, attr []DecodedAttr, err error) {
|
||||
raw := m.Raw
|
||||
var errs []error
|
||||
|
||||
DecodeLoop:
|
||||
for len(raw) > 0 {
|
||||
n, key := decodeAttrKey(raw)
|
||||
raw = raw[n:]
|
||||
switch {
|
||||
case len(raw) == 0:
|
||||
errs = append(errs, fmt.Errorf("unterminated attribute name %q", key))
|
||||
break DecodeLoop
|
||||
case key == "":
|
||||
errs = append(errs, errors.New("empty attribute name"))
|
||||
}
|
||||
|
||||
var val string
|
||||
switch raw[0] {
|
||||
case '=':
|
||||
n, val = decodeAttrValText(raw[1:])
|
||||
case '\n':
|
||||
var err error
|
||||
n, val, err = decodeAttrValLen(raw[1:])
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
raw = raw[1+n:]
|
||||
if len(raw) == 0 {
|
||||
errs = append(errs, fmt.Errorf("unterminated value for attribute %q", key))
|
||||
break DecodeLoop
|
||||
}
|
||||
|
||||
if raw[0] != '\n' {
|
||||
errs = append(errs, errors.New("incorrectly terminated attribute value"))
|
||||
}
|
||||
raw = raw[1:]
|
||||
|
||||
switch key {
|
||||
case "MESSAGE":
|
||||
msg = val
|
||||
default:
|
||||
attr = append(attr, DecodedAttr{Key: key, Val: val})
|
||||
}
|
||||
}
|
||||
|
||||
return msg, attr, errors.Join(errs...)
|
||||
}
|
||||
|
||||
func decodeAttrKey(raw []byte) (n int, key string) {
|
||||
for i := range raw {
|
||||
switch raw[i] {
|
||||
case '\n', '=':
|
||||
return i, string(raw[:i])
|
||||
}
|
||||
}
|
||||
return len(raw), string(raw)
|
||||
}
|
||||
|
||||
func decodeAttrValText(raw []byte) (n int, val string) {
|
||||
term := bytes.IndexByte(raw, '\n')
|
||||
if term == -1 {
|
||||
term = len(raw)
|
||||
}
|
||||
return term, string(raw[:term])
|
||||
}
|
||||
|
||||
func decodeAttrValLen(raw []byte) (n int, val string, err error) {
|
||||
if len(raw) < 8 {
|
||||
return len(raw), "", errors.New("not enough bytes for binary attribute value length")
|
||||
}
|
||||
amt := binary.LittleEndian.Uint64(raw)
|
||||
raw = raw[8:]
|
||||
if uint64(len(raw)) < amt {
|
||||
return 8 + len(raw), string(raw), errors.New("not enough bytes for binary attribute value")
|
||||
}
|
||||
return int(8 + amt), string(raw[:amt]), nil
|
||||
}
|
||||
|
||||
// GetAttr returns the value of the attribute whose key name matches, and a
|
||||
// boolean to indicate if it found a match.
|
||||
func GetAttr(attr []DecodedAttr, key string) (value string, ok bool) {
|
||||
for i := range attr {
|
||||
if attr[i].Key == key {
|
||||
return attr[i].Val, true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
Package testsink provides a partial implementation of the systemd-journald
|
||||
Unix socket. Datagrams received on its socket are decoded and stored for unit
|
||||
tests to examine.
|
||||
*/
|
||||
package testsink
|
||||
|
||||
import (
|
||||
"net"
|
||||
"slices"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Sink provides a Unix socket and captures messages sent to it using the
|
||||
// systemd-journald wire protocol.
|
||||
type Sink struct {
|
||||
sock *net.UnixConn
|
||||
stop chan struct{}
|
||||
|
||||
lock sync.Mutex
|
||||
mcond *sync.Cond
|
||||
msgs []Message
|
||||
err error
|
||||
}
|
||||
|
||||
// New returns a new Sink that is listening on the given path.
|
||||
func New(sockpath string) (*Sink, error) {
|
||||
sock, err := net.ListenUnixgram("unixgram", &net.UnixAddr{
|
||||
Name: sockpath,
|
||||
Net: "unixgram",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sink := &Sink{
|
||||
sock: sock,
|
||||
stop: make(chan struct{}, 1),
|
||||
}
|
||||
sink.mcond = sync.NewCond(&sink.lock)
|
||||
go sink.stopper()
|
||||
go sink.recv()
|
||||
return sink, nil
|
||||
}
|
||||
|
||||
// Stop listening and close the socket.
|
||||
func (sink *Sink) Stop() {
|
||||
// non-blocking write onto channel; we only need to read from it once in
|
||||
// order to stop the receiver, but using this rather than close ensures
|
||||
// Stop() can be called multiple times without negative side effects
|
||||
select {
|
||||
case sink.stop <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (sink *Sink) stopper() {
|
||||
<-sink.stop
|
||||
sink.sock.Close()
|
||||
}
|
||||
|
||||
func (sink *Sink) recv() {
|
||||
buf := make([]byte, 131072)
|
||||
for {
|
||||
n, err := sink.sock.Read(buf)
|
||||
|
||||
sink.lock.Lock()
|
||||
if n > 0 {
|
||||
sink.msgs = append(sink.msgs, Message{
|
||||
Raw: slices.Clone(buf[:n]),
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
sink.err = err
|
||||
}
|
||||
sink.lock.Unlock()
|
||||
|
||||
sink.mcond.Broadcast()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/binary"
|
||||
"io"
|
||||
"net"
|
||||
"slices"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -19,6 +20,7 @@ var (
|
|||
// atomic, and this function is safe to use across goroutines. Otherwise,
|
||||
// writes may be interleaved.
|
||||
func WireWrite(buf *net.Buffers, w io.Writer, attrs []Attr) error {
|
||||
*buf = slices.Grow(*buf, len(*buf)+len(attrs)*4)
|
||||
for i := range attrs {
|
||||
key := attrs[i].Key.key
|
||||
if key == "" {
|
||||
|
|
|
|||
Loading…
Reference in New Issue