htpack/cmd/htpacker/packer/packer.go

527 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
Package packer implements the core packing functionality. It is designed to be
used by a wrapper program (CLI etc.).
*/
package packer
import (
"bufio"
"crypto/sha512"
"fmt"
"io/ioutil"
"net/http"
"os"
"runtime"
"sync"
"github.com/andybalholm/brotli"
"github.com/foobaz/go-zopfli/zopfli"
"golang.org/x/sys/unix"
"src.lwithers.me.uk/go/htpack/packed"
"src.lwithers.me.uk/go/writefile"
)
// FilesToPack is the set of files which will be incorporated into the packfile.
// The key is the path at which the file will be served, and the value gives the
// disk filename as well as headers / options.
type FilesToPack map[string]FileToPack
// FileToPack contains the headers / options for a file which is about to be
// packed.
type FileToPack struct {
// Filename is the path to the file on disk (relative or absolute).
Filename string `yaml:"filename"`
// ContentType is used as the Content-Type header for the source data.
ContentType string `yaml:"content_type"`
// DisableCompression can be set to skip any compression for this file.
DisableCompression bool `yaml:"disable_compression"`
// DisableGzip can be set to skip gzip compression for this file.
DisableGzip bool `yaml:"disable_gzip"`
// DisableBrotli can be set to skip brotli compression for this file.
DisableBrotli bool `yaml:"disable_brotli"`
}
// Progress is a callback object which reports packing progress.
type Progress interface {
// Count reports the number of items that have begun processing.
Count(n int)
// Begin denotes the processing of an input file.
Begin(filename, compression string)
// End denotes the completion of input file processing.
End(filename, compression string)
}
type ignoreProgress int
func (ignoreProgress) Count(_ int) {}
func (ignoreProgress) Begin(_, _ string) {}
func (ignoreProgress) End(_, _ string) {}
const (
// minCompressionFileSize is the minimum filesize we need before
// considering compression. Note this must be at least 2, to avoid
// known bugs in go-zopfli.
minCompressionFileSize = 128
// minCompressionSaving means we'll only use the compressed version of
// the file if it's at least this many bytes smaller than the original.
// Chosen somewhat arbitrarily; we have to add an HTTP header, and the
// decompression overhead is not zero.
minCompressionSaving = 128
// minCompressionFraction means we'll only use the compressed version of
// the file if it's at least (origSize>>minCompressionFraction) bytes
// smaller than the original. This is a guess at when the decompression
// overhead outweighs the time saved in transmission.
minCompressionFraction = 7 // i.e. files must be at least 1/128 smaller
// padWidth is the padding alignment size expressed as a power of 2.
// The value 12 (i.e. 4096 bytes) is chosen to align with a common
// page size and filesystem block size.
padWidth = 12
// sendfileLimit is the number of bytes we can transfer through a single
// sendfile(2) call. This value is from the man page.
sendfileLimit = 0x7FFFF000
)
// Pack a file. Use Pack2 for progress reporting.
func Pack(filesToPack FilesToPack, outputFilename string) error {
return Pack2(filesToPack, outputFilename, nil)
}
// Pack2 will pack a file, with progress reporting. The progress interface may
// be nil.
func Pack2(filesToPack FilesToPack, outputFilename string, progress Progress) error {
if progress == nil {
progress = ignoreProgress(0)
}
finalFname, w, err := writefile.New(outputFilename)
if err != nil {
return err
}
defer writefile.Abort(w)
// we use this little structure to serialise file writes below, and
// it has a couple of convenience methods for common operations
packer := packer{
w: w,
progress: progress,
}
// write initial header (will rewrite offset/length when known)
hdr := &packed.Header{
Magic: packed.Magic,
Version: packed.VersionInitial,
DirectoryOffset: 1,
DirectoryLength: 1,
}
m, err := hdr.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal header (%T): %v", hdr, err)
}
if _, err = w.Write(m); err != nil {
return err
}
if err = packer.pad(); err != nil {
return err
}
// Channel to limit number of CPU-bound goroutines. One token is written
// to the channel for each active worker; since the channel is bounded,
// further writes will block at the limit. As workers complete, they
// consume a token from the channel.
nCPU := runtime.NumCPU() + 2 // +2 for I/O bound portions
if nCPU < 4 {
nCPU = 4
}
packer.cpus = make(chan struct{}, nCPU)
// Channel to report worker errors. Writes should be non-blocking. If
// your error is dropped, don't worry, an earlier error will be
// reported.
packer.errors = make(chan error, 1)
// Channel to abort further operations. It should be closed to abort.
// The closer should be the one who writes onto packer.errors.
packer.aborted = make(chan struct{})
// write the packed files, storing info for the directory structure
packer.dir = &packed.Directory{
Files: make(map[string]*packed.File),
}
var count int
PackingLoop:
for path, fileToPack := range filesToPack {
select {
case <-packer.aborted:
// a worker reported an error; break out of loop early
break PackingLoop
default:
packer.packFile(path, fileToPack)
count++
progress.Count(count)
}
}
// wait for all goroutines to complete
for n := 0; n < nCPU; n++ {
packer.cpus <- struct{}{}
}
// check whether any of the just-completed goroutines returned an error
select {
case err = <-packer.errors:
return err
default:
}
// write the directory
if m, err = packer.dir.Marshal(); err != nil {
err = fmt.Errorf("failed to marshal directory object (%T): %v",
packer.dir, err)
return err
}
dirOffset, err := w.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if _, err := w.Write(m); err != nil {
return err
}
// now modify the header at the start of the file
hdr.DirectoryOffset = uint64(dirOffset)
hdr.DirectoryLength = uint64(len(m))
if m, err = hdr.Marshal(); err != nil {
return fmt.Errorf("failed to marshal header (%T): %v", hdr, err)
}
if _, err = w.WriteAt(m, 0); err != nil {
return err
}
// all done!
return writefile.Commit(finalFname, w)
}
func etag(in []byte) string {
h := sha512.New384()
h.Write(in)
return fmt.Sprintf(`"1--%x"`, h.Sum(nil))
}
func compressionWorthwhile(data []byte, compressed os.FileInfo) bool {
uncompressedSize := uint64(len(data))
sz := uint64(compressed.Size())
switch {
case sz+minCompressionSaving > uncompressedSize,
sz+(uncompressedSize>>minCompressionFraction) > uncompressedSize:
return false
default:
return true
}
}
// packer packs input files into the output file. It has methods for each type
// of compression. Unexported methods assume they are called in a context where
// the lock is not needed or already taken; exported methods take the lock.
type packer struct {
w *os.File
lock sync.Mutex
cpus chan struct{}
errors chan error
aborted chan struct{}
dir *packed.Directory
progress Progress
}
// pad will move the file write pointer to the next padding boundary. It is not
// concurrency safe.
func (p *packer) pad() error {
pos, err := p.w.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
pos &= (1 << padWidth) - 1
if pos == 0 { // already aligned
return nil
}
_, err = p.w.Seek((1<<padWidth)-pos, os.SEEK_CUR)
return err
}
// appendPath will copy file data from srcPath and append it to the output file. The
// offset and length are stored in data on success. It is not concurrency safe.
func (p *packer) appendPath(srcPath string, data *packed.FileData) error {
// open the input file and grab its length
in, err := os.Open(srcPath)
if err != nil {
return err
}
defer in.Close()
fi, err := in.Stat()
if err != nil {
return err
}
// copy in the file data
return p.appendFile(in, fi.Size(), data)
}
// appendFile will copy file data from src and append it to the output file. The
// offset and length are stored in data on success. It is not concurrency safe.
func (p *packer) appendFile(src *os.File, srcLen int64, data *packed.FileData) error {
// retrieve current file position and store in data.Offset
off, err := p.w.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
data.Length = uint64(srcLen)
data.Offset = uint64(off)
// copy in the file data
remain := srcLen
off = 0
for remain > 0 {
var amt int
if remain > sendfileLimit {
amt = sendfileLimit
} else {
amt = int(remain)
}
amt, err := unix.Sendfile(int(p.w.Fd()), int(src.Fd()), &off, amt)
remain -= int64(amt)
if err != nil {
return fmt.Errorf("sendfile (copying data to "+
"htpack): %v", err)
}
}
// leave output file padded to next boundary
return p.pad()
}
func (p *packer) packFile(path string, fileToPack FileToPack) {
// open and mmap input file
f, err := os.Open(fileToPack.Filename)
if err != nil {
p.Abort(err)
return
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
p.Abort(err)
return
}
var data []byte
if fi.Size() > 0 {
data, err = unix.Mmap(int(f.Fd()), 0, int(fi.Size()),
unix.PROT_READ, unix.MAP_SHARED)
if err != nil {
p.Abort(fmt.Errorf("mmap %s: %v", fileToPack.Filename, err))
return
}
}
// prepare initial directory entry
info := &packed.File{
Etag: etag(data),
ContentType: fileToPack.ContentType,
}
if info.ContentType == "" {
info.ContentType = http.DetectContentType(data)
}
p.dir.Files[path] = info // NB: this part is not concurrent, so no mutex
// list of operations on this input file that we'll carry out asynchronously
ops := []func() error{
func() error {
p.progress.Begin(fileToPack.Filename, "uncompressed")
defer p.progress.End(fileToPack.Filename, "uncompressed")
return p.Uncompressed(fileToPack.Filename, info)
},
}
if !fileToPack.DisableCompression && !fileToPack.DisableGzip {
ops = append(ops, func() error {
p.progress.Begin(fileToPack.Filename, "gzip")
defer p.progress.End(fileToPack.Filename, "gzip")
if err := p.Gzip(data, info); err != nil {
return fmt.Errorf("gzip compression of %s "+
"failed: %v", fileToPack.Filename, err)
}
return nil
})
}
if !fileToPack.DisableCompression && !fileToPack.DisableBrotli {
ops = append(ops, func() error {
p.progress.Begin(fileToPack.Filename, "brotli")
defer p.progress.End(fileToPack.Filename, "brotli")
if err := p.Brotli(data, info); err != nil {
return fmt.Errorf("brotli compression of %s "+
"failed: %v", fileToPack.Filename, err)
}
return nil
})
}
// we have multiple operations on the file, and we need to wait for
// them all to complete before munmap()
wg := new(sync.WaitGroup)
wg.Add(len(ops))
go func() {
wg.Wait()
unix.Munmap(data)
}()
for _, op := range ops {
select {
case <-p.aborted:
// skip the operation
wg.Done()
case p.cpus <- struct{}{}:
go func(op func() error) {
if err := op(); err != nil {
p.Abort(err)
}
// release CPU token
<-p.cpus
wg.Done()
}(op)
}
}
return
}
// Abort records that an error occurred and records it onto the errors channel.
// It signals workers to abort by closed the aborted channel. If called
// multiple times, only one error will be recorded, and the aborted channel will
// only be closed once.
func (p *packer) Abort(err error) {
select {
case p.errors <- err:
// only one error can be written to this channel, so the write
// acts as a lock to ensure only a single close operation takes
// place
close(p.aborted)
default:
// errors channel was already written, so we're already aborted
}
}
// Uncompressed copies in an uncompressed file.
func (p *packer) Uncompressed(srcPath string, dir *packed.File) error {
dir.Uncompressed = new(packed.FileData)
p.lock.Lock()
defer p.lock.Unlock()
return p.appendPath(srcPath, dir.Uncompressed)
}
// Gzip will gzip input data to a temporary file, and then append that to the
// output file.
func (p *packer) Gzip(data []byte, dir *packed.File) error {
if len(data) < minCompressionFileSize {
return nil
}
// write via temporary file
tmpfile, err := ioutil.TempFile("", "")
if err != nil {
return err
}
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
// compress
opts := zopfli.DefaultOptions()
if len(data) > (10 << 20) { // 10MiB
opts.NumIterations = 5
}
buf := bufio.NewWriter(tmpfile)
if err = zopfli.GzipCompress(&opts, data, buf); err != nil {
return err
}
if err = buf.Flush(); err != nil {
return err
}
// grab file length, evaluate whether compression is worth it
fi, err := tmpfile.Stat()
if err != nil {
return err
}
if !compressionWorthwhile(data, fi) {
return nil
}
// save the compressed data
dir.Gzip = new(packed.FileData)
p.lock.Lock()
defer p.lock.Unlock()
return p.appendFile(tmpfile, fi.Size(), dir.Gzip)
}
// Brotli will compress input data to a temporary file, and then append that to
// the output file.
func (p *packer) Brotli(data []byte, dir *packed.File) error {
if len(data) < minCompressionFileSize {
return nil
}
// write via temporary file
tmpfile, err := ioutil.TempFile("", "")
if err != nil {
return err
}
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
// compress
buf := bufio.NewWriter(tmpfile)
comp := brotli.NewWriterOptions(buf, brotli.WriterOptions{
Quality: 11,
})
if _, err = comp.Write(data); err != nil {
return err
}
if err = comp.Close(); err != nil {
return err
}
if err = buf.Flush(); err != nil {
return err
}
// grab file length, evaluate whether compression is worth it
fi, err := tmpfile.Stat()
if err != nil {
return err
}
if !compressionWorthwhile(data, fi) {
return nil
}
// save the compressed data
dir.Brotli = new(packed.FileData)
p.lock.Lock()
defer p.lock.Unlock()
return p.appendFile(tmpfile, fi.Size(), dir.Brotli)
}