htpack/cmd/htpacker/packer/packer.go

515 lines
13 KiB
Go
Raw Normal View History

/*
Package packer implements the core packing functionality. It is designed to be
used by a wrapper program (CLI etc.).
*/
package packer
import (
"bufio"
"crypto/sha512"
"fmt"
"io/ioutil"
"net/http"
"os"
"runtime"
"sync"
"github.com/andybalholm/brotli"
"github.com/foobaz/go-zopfli/zopfli"
"golang.org/x/sys/unix"
"src.lwithers.me.uk/go/htpack/packed"
"src.lwithers.me.uk/go/writefile"
)
// FilesToPack is the set of files which will be incorporated into the packfile.
// The key is the path at which the file will be served, and the value gives the
// disk filename as well as headers / options.
type FilesToPack map[string]FileToPack
// FileToPack contains the headers / options for a file which is about to be
// packed.
type FileToPack struct {
// Filename is the path to the file on disk (relative or absolute).
Filename string `yaml:"filename"`
// ContentType is used as the Content-Type header for the source data.
ContentType string `yaml:"content_type"`
// DisableCompression can be set to skip any compression for this file.
DisableCompression bool `yaml:"disable_compression"`
// DisableGzip can be set to skip gzip compression for this file.
DisableGzip bool `yaml:"disable_gzip"`
// DisableBrotli can be set to skip brotli compression for this file.
DisableBrotli bool `yaml:"disable_brotli"`
}
// Progress is a callback object which reports packing progress.
type Progress interface {
// Count reports the number of items that have begun processing.
Count(n int)
// Begin denotes the processing of an input file.
Begin(filename, compression string)
// End denotes the completion of input file processing.
End(filename, compression string)
}
type ignoreProgress int
func (ignoreProgress) Count(_ int) {}
func (ignoreProgress) Begin(_, _ string) {}
func (ignoreProgress) End(_, _ string) {}
const (
// minCompressionSaving means we'll only use the compressed version of
// the file if it's at least this many bytes smaller than the original.
// Chosen somewhat arbitrarily; we have to add an HTTP header, and the
// decompression overhead is not zero.
minCompressionSaving = 128
// minCompressionFraction means we'll only use the compressed version of
// the file if it's at least (origSize>>minCompressionFraction) bytes
// smaller than the original. This is a guess at when the decompression
// overhead outweighs the time saved in transmission.
minCompressionFraction = 7 // i.e. files must be at least 1/128 smaller
// padWidth is the padding alignment size expressed as a power of 2.
// The value 12 (i.e. 4096 bytes) is chosen to align with a common
// page size and filesystem block size.
padWidth = 12
// sendfileLimit is the number of bytes we can transfer through a single
// sendfile(2) call. This value is from the man page.
sendfileLimit = 0x7FFFF000
)
// Pack a file. Use Pack2 for progress reporting.
func Pack(filesToPack FilesToPack, outputFilename string) error {
return Pack2(filesToPack, outputFilename, nil)
}
// Pack2 will pack a file, with progress reporting. The progress interface may
// be nil.
func Pack2(filesToPack FilesToPack, outputFilename string, progress Progress) error {
if progress == nil {
progress = ignoreProgress(0)
}
finalFname, w, err := writefile.New(outputFilename)
if err != nil {
return err
}
defer writefile.Abort(w)
// we use this little structure to serialise file writes below, and
// it has a couple of convenience methods for common operations
packer := packer{
w: w,
progress: progress,
}
// write initial header (will rewrite offset/length when known)
hdr := &packed.Header{
Magic: packed.Magic,
Version: packed.VersionInitial,
DirectoryOffset: 1,
DirectoryLength: 1,
}
m, err := hdr.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal header (%T): %v", hdr, err)
}
if _, err = w.Write(m); err != nil {
return err
}
if err = packer.pad(); err != nil {
return err
}
// Channel to limit number of CPU-bound goroutines. One token is written
// to the channel for each active worker; since the channel is bounded,
// further writes will block at the limit. As workers complete, they
// consume a token from the channel.
nCPU := runtime.NumCPU() + 2 // +2 for I/O bound portions
if nCPU < 4 {
nCPU = 4
}
packer.cpus = make(chan struct{}, nCPU)
// Channel to report worker errors. Writes should be non-blocking. If
// your error is dropped, don't worry, an earlier error will be
// reported.
packer.errors = make(chan error, 1)
// Channel to abort further operations. It should be closed to abort.
// The closer should be the one who writes onto packer.errors.
packer.aborted = make(chan struct{})
// write the packed files, storing info for the directory structure
packer.dir = &packed.Directory{
Files: make(map[string]*packed.File),
}
var count int
PackingLoop:
for path, fileToPack := range filesToPack {
select {
case <-packer.aborted:
// a worker reported an error; break out of loop early
break PackingLoop
default:
packer.packFile(path, fileToPack)
count++
progress.Count(count)
}
}
// wait for all goroutines to complete
for n := 0; n < nCPU; n++ {
packer.cpus <- struct{}{}
}
// check whether any of the just-completed goroutines returned an error
select {
case err = <-packer.errors:
return err
default:
}
// write the directory
if m, err = packer.dir.Marshal(); err != nil {
err = fmt.Errorf("failed to marshal directory object (%T): %v",
packer.dir, err)
return err
}
dirOffset, err := w.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
if _, err := w.Write(m); err != nil {
return err
}
// now modify the header at the start of the file
hdr.DirectoryOffset = uint64(dirOffset)
hdr.DirectoryLength = uint64(len(m))
if m, err = hdr.Marshal(); err != nil {
return fmt.Errorf("failed to marshal header (%T): %v", hdr, err)
}
if _, err = w.WriteAt(m, 0); err != nil {
return err
}
// all done!
return writefile.Commit(finalFname, w)
}
func etag(in []byte) string {
h := sha512.New384()
h.Write(in)
return fmt.Sprintf(`"1--%x"`, h.Sum(nil))
}
func compressionWorthwhile(data []byte, compressed os.FileInfo) bool {
uncompressedSize := uint64(len(data))
sz := uint64(compressed.Size())
switch {
case sz+minCompressionSaving > uncompressedSize,
sz+(uncompressedSize>>minCompressionFraction) > uncompressedSize:
return false
default:
return true
}
}
// packer packs input files into the output file. It has methods for each type
// of compression. Unexported methods assume they are called in a context where
// the lock is not needed or already taken; exported methods take the lock.
type packer struct {
w *os.File
lock sync.Mutex
cpus chan struct{}
errors chan error
aborted chan struct{}
dir *packed.Directory
progress Progress
}
// pad will move the file write pointer to the next padding boundary. It is not
// concurrency safe.
func (p *packer) pad() error {
pos, err := p.w.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
pos &= (1 << padWidth) - 1
if pos == 0 { // already aligned
return nil
}
_, err = p.w.Seek((1<<padWidth)-pos, os.SEEK_CUR)
return err
}
// appendPath will copy file data from srcPath and append it to the output file. The
// offset and length are stored in data on success. It is not concurrency safe.
func (p *packer) appendPath(srcPath string, data *packed.FileData) error {
// open the input file and grab its length
in, err := os.Open(srcPath)
if err != nil {
return err
}
defer in.Close()
fi, err := in.Stat()
if err != nil {
return err
}
// copy in the file data
return p.appendFile(in, fi.Size(), data)
}
// appendFile will copy file data from src and append it to the output file. The
// offset and length are stored in data on success. It is not concurrency safe.
func (p *packer) appendFile(src *os.File, srcLen int64, data *packed.FileData) error {
// retrieve current file position and store in data.Offset
off, err := p.w.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
data.Length = uint64(srcLen)
data.Offset = uint64(off)
// copy in the file data
remain := srcLen
off = 0
for remain > 0 {
var amt int
if remain > sendfileLimit {
amt = sendfileLimit
} else {
amt = int(remain)
}
amt, err := unix.Sendfile(int(p.w.Fd()), int(src.Fd()), &off, amt)
remain -= int64(amt)
if err != nil {
return fmt.Errorf("sendfile (copying data to "+
"htpack): %v", err)
}
}
// leave output file padded to next boundary
return p.pad()
}
func (p *packer) packFile(path string, fileToPack FileToPack) {
// open and mmap input file
f, err := os.Open(fileToPack.Filename)
if err != nil {
p.Abort(err)
return
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
p.Abort(err)
return
}
var data []byte
if fi.Size() > 0 {
data, err = unix.Mmap(int(f.Fd()), 0, int(fi.Size()),
unix.PROT_READ, unix.MAP_SHARED)
if err != nil {
p.Abort(fmt.Errorf("mmap %s: %v", fileToPack.Filename, err))
return
}
}
// prepare initial directory entry
info := &packed.File{
Etag: etag(data),
ContentType: fileToPack.ContentType,
}
if info.ContentType == "" {
info.ContentType = http.DetectContentType(data)
}
p.dir.Files[path] = info // NB: this part is not concurrent, so no mutex
// list of operations on this input file that we'll carry out asynchronously
ops := []func() error{
func() error {
p.progress.Begin(fileToPack.Filename, "uncompressed")
defer p.progress.End(fileToPack.Filename, "uncompressed")
return p.Uncompressed(fileToPack.Filename, info)
},
}
if !fileToPack.DisableCompression && !fileToPack.DisableGzip {
ops = append(ops, func() error {
p.progress.Begin(fileToPack.Filename, "gzip")
defer p.progress.End(fileToPack.Filename, "gzip")
if err := p.Gzip(data, info); err != nil {
return fmt.Errorf("gzip compression of %s "+
"failed: %v", fileToPack.Filename, err)
}
return nil
})
}
if !fileToPack.DisableCompression && !fileToPack.DisableBrotli {
ops = append(ops, func() error {
p.progress.Begin(fileToPack.Filename, "brotli")
defer p.progress.End(fileToPack.Filename, "brotli")
if err := p.Brotli(data, info); err != nil {
return fmt.Errorf("brotli compression of %s "+
"failed: %v", fileToPack.Filename, err)
}
return nil
})
}
// we have multiple operations on the file, and we need to wait for
// them all to complete before munmap()
wg := new(sync.WaitGroup)
wg.Add(len(ops))
go func() {
wg.Wait()
unix.Munmap(data)
}()
for _, op := range ops {
select {
case <-p.aborted:
// skip the operation
wg.Done()
case p.cpus <- struct{}{}:
go func(op func() error) {
if err := op(); err != nil {
p.Abort(err)
}
// release CPU token
<-p.cpus
wg.Done()
}(op)
}
}
return
}
// Abort records that an error occurred and records it onto the errors channel.
// It signals workers to abort by closed the aborted channel. If called
// multiple times, only one error will be recorded, and the aborted channel will
// only be closed once.
func (p *packer) Abort(err error) {
select {
case p.errors <- err:
// only one error can be written to this channel, so the write
// acts as a lock to ensure only a single close operation takes
// place
close(p.aborted)
default:
// errors channel was already written, so we're already aborted
}
}
// Uncompressed copies in an uncompressed file.
func (p *packer) Uncompressed(srcPath string, dir *packed.File) error {
dir.Uncompressed = new(packed.FileData)
p.lock.Lock()
defer p.lock.Unlock()
return p.appendPath(srcPath, dir.Uncompressed)
}
// Gzip will gzip input data to a temporary file, and then append that to the
// output file.
func (p *packer) Gzip(data []byte, dir *packed.File) error {
// write via temporary file
tmpfile, err := ioutil.TempFile("", "")
if err != nil {
return err
}
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
// compress
opts := zopfli.DefaultOptions()
if len(data) > (10 << 20) { // 10MiB
opts.NumIterations = 5
}
buf := bufio.NewWriter(tmpfile)
if err = zopfli.GzipCompress(&opts, data, buf); err != nil {
return err
}
if err = buf.Flush(); err != nil {
return err
}
// grab file length, evaluate whether compression is worth it
fi, err := tmpfile.Stat()
if err != nil {
return err
}
if !compressionWorthwhile(data, fi) {
return nil
}
// save the compressed data
dir.Gzip = new(packed.FileData)
p.lock.Lock()
defer p.lock.Unlock()
return p.appendFile(tmpfile, fi.Size(), dir.Gzip)
}
// Brotli will compress input data to a temporary file, and then append that to
// the output file.
func (p *packer) Brotli(data []byte, dir *packed.File) error {
// write via temporary file
tmpfile, err := ioutil.TempFile("", "")
if err != nil {
return err
}
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
// compress
buf := bufio.NewWriter(tmpfile)
comp := brotli.NewWriterOptions(buf, brotli.WriterOptions{
Quality: 11,
})
if _, err = comp.Write(data); err != nil {
return err
}
if err = comp.Close(); err != nil {
return err
}
if err = buf.Flush(); err != nil {
return err
}
// grab file length, evaluate whether compression is worth it
fi, err := tmpfile.Stat()
if err != nil {
return err
}
if !compressionWorthwhile(data, fi) {
return nil
}
// save the compressed data
dir.Brotli = new(packed.FileData)
p.lock.Lock()
defer p.lock.Unlock()
return p.appendFile(tmpfile, fi.Size(), dir.Brotli)
}