515 lines
13 KiB
Go
515 lines
13 KiB
Go
/*
|
||
Package packer implements the core packing functionality. It is designed to be
|
||
used by a wrapper program (CLI etc.).
|
||
*/
|
||
package packer
|
||
|
||
import (
|
||
"bufio"
|
||
"crypto/sha512"
|
||
"fmt"
|
||
"io/ioutil"
|
||
"net/http"
|
||
"os"
|
||
"runtime"
|
||
"sync"
|
||
|
||
"github.com/andybalholm/brotli"
|
||
"github.com/foobaz/go-zopfli/zopfli"
|
||
"golang.org/x/sys/unix"
|
||
"src.lwithers.me.uk/go/htpack/packed"
|
||
"src.lwithers.me.uk/go/writefile"
|
||
)
|
||
|
||
// FilesToPack is the set of files which will be incorporated into the packfile.
|
||
// The key is the path at which the file will be served, and the value gives the
|
||
// disk filename as well as headers / options.
|
||
type FilesToPack map[string]FileToPack
|
||
|
||
// FileToPack contains the headers / options for a file which is about to be
|
||
// packed.
|
||
type FileToPack struct {
|
||
// Filename is the path to the file on disk (relative or absolute).
|
||
Filename string `yaml:"filename"`
|
||
|
||
// ContentType is used as the Content-Type header for the source data.
|
||
ContentType string `yaml:"content_type"`
|
||
|
||
// DisableCompression can be set to skip any compression for this file.
|
||
DisableCompression bool `yaml:"disable_compression"`
|
||
|
||
// DisableGzip can be set to skip gzip compression for this file.
|
||
DisableGzip bool `yaml:"disable_gzip"`
|
||
|
||
// DisableBrotli can be set to skip brotli compression for this file.
|
||
DisableBrotli bool `yaml:"disable_brotli"`
|
||
}
|
||
|
||
// Progress is a callback object which reports packing progress.
|
||
type Progress interface {
|
||
// Count reports the number of items that have begun processing.
|
||
Count(n int)
|
||
|
||
// Begin denotes the processing of an input file.
|
||
Begin(filename, compression string)
|
||
|
||
// End denotes the completion of input file processing.
|
||
End(filename, compression string)
|
||
}
|
||
|
||
type ignoreProgress int
|
||
|
||
func (ignoreProgress) Count(_ int) {}
|
||
func (ignoreProgress) Begin(_, _ string) {}
|
||
func (ignoreProgress) End(_, _ string) {}
|
||
|
||
const (
|
||
// minCompressionSaving means we'll only use the compressed version of
|
||
// the file if it's at least this many bytes smaller than the original.
|
||
// Chosen somewhat arbitrarily; we have to add an HTTP header, and the
|
||
// decompression overhead is not zero.
|
||
minCompressionSaving = 128
|
||
|
||
// minCompressionFraction means we'll only use the compressed version of
|
||
// the file if it's at least (origSize>>minCompressionFraction) bytes
|
||
// smaller than the original. This is a guess at when the decompression
|
||
// overhead outweighs the time saved in transmission.
|
||
minCompressionFraction = 7 // i.e. files must be at least 1/128 smaller
|
||
|
||
// padWidth is the padding alignment size expressed as a power of 2.
|
||
// The value 12 (i.e. 4096 bytes) is chosen to align with a common
|
||
// page size and filesystem block size.
|
||
padWidth = 12
|
||
|
||
// sendfileLimit is the number of bytes we can transfer through a single
|
||
// sendfile(2) call. This value is from the man page.
|
||
sendfileLimit = 0x7FFFF000
|
||
)
|
||
|
||
// Pack a file. Use Pack2 for progress reporting.
|
||
func Pack(filesToPack FilesToPack, outputFilename string) error {
|
||
return Pack2(filesToPack, outputFilename, nil)
|
||
}
|
||
|
||
// Pack2 will pack a file, with progress reporting. The progress interface may
|
||
// be nil.
|
||
func Pack2(filesToPack FilesToPack, outputFilename string, progress Progress) error {
|
||
if progress == nil {
|
||
progress = ignoreProgress(0)
|
||
}
|
||
|
||
finalFname, w, err := writefile.New(outputFilename)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer writefile.Abort(w)
|
||
|
||
// we use this little structure to serialise file writes below, and
|
||
// it has a couple of convenience methods for common operations
|
||
packer := packer{
|
||
w: w,
|
||
progress: progress,
|
||
}
|
||
|
||
// write initial header (will rewrite offset/length when known)
|
||
hdr := &packed.Header{
|
||
Magic: packed.Magic,
|
||
Version: packed.VersionInitial,
|
||
DirectoryOffset: 1,
|
||
DirectoryLength: 1,
|
||
}
|
||
m, err := hdr.Marshal()
|
||
if err != nil {
|
||
return fmt.Errorf("failed to marshal header (%T): %v", hdr, err)
|
||
}
|
||
if _, err = w.Write(m); err != nil {
|
||
return err
|
||
}
|
||
if err = packer.pad(); err != nil {
|
||
return err
|
||
}
|
||
|
||
// Channel to limit number of CPU-bound goroutines. One token is written
|
||
// to the channel for each active worker; since the channel is bounded,
|
||
// further writes will block at the limit. As workers complete, they
|
||
// consume a token from the channel.
|
||
nCPU := runtime.NumCPU() + 2 // +2 for I/O bound portions
|
||
if nCPU < 4 {
|
||
nCPU = 4
|
||
}
|
||
packer.cpus = make(chan struct{}, nCPU)
|
||
|
||
// Channel to report worker errors. Writes should be non-blocking. If
|
||
// your error is dropped, don't worry, an earlier error will be
|
||
// reported.
|
||
packer.errors = make(chan error, 1)
|
||
|
||
// Channel to abort further operations. It should be closed to abort.
|
||
// The closer should be the one who writes onto packer.errors.
|
||
packer.aborted = make(chan struct{})
|
||
|
||
// write the packed files, storing info for the directory structure
|
||
packer.dir = &packed.Directory{
|
||
Files: make(map[string]*packed.File),
|
||
}
|
||
|
||
var count int
|
||
PackingLoop:
|
||
for path, fileToPack := range filesToPack {
|
||
select {
|
||
case <-packer.aborted:
|
||
// a worker reported an error; break out of loop early
|
||
break PackingLoop
|
||
default:
|
||
packer.packFile(path, fileToPack)
|
||
count++
|
||
progress.Count(count)
|
||
}
|
||
}
|
||
|
||
// wait for all goroutines to complete
|
||
for n := 0; n < nCPU; n++ {
|
||
packer.cpus <- struct{}{}
|
||
}
|
||
|
||
// check whether any of the just-completed goroutines returned an error
|
||
select {
|
||
case err = <-packer.errors:
|
||
return err
|
||
default:
|
||
}
|
||
|
||
// write the directory
|
||
if m, err = packer.dir.Marshal(); err != nil {
|
||
err = fmt.Errorf("failed to marshal directory object (%T): %v",
|
||
packer.dir, err)
|
||
return err
|
||
}
|
||
dirOffset, err := w.Seek(0, os.SEEK_CUR)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if _, err := w.Write(m); err != nil {
|
||
return err
|
||
}
|
||
|
||
// now modify the header at the start of the file
|
||
hdr.DirectoryOffset = uint64(dirOffset)
|
||
hdr.DirectoryLength = uint64(len(m))
|
||
if m, err = hdr.Marshal(); err != nil {
|
||
return fmt.Errorf("failed to marshal header (%T): %v", hdr, err)
|
||
}
|
||
if _, err = w.WriteAt(m, 0); err != nil {
|
||
return err
|
||
}
|
||
|
||
// all done!
|
||
return writefile.Commit(finalFname, w)
|
||
}
|
||
|
||
func etag(in []byte) string {
|
||
h := sha512.New384()
|
||
h.Write(in)
|
||
return fmt.Sprintf(`"1--%x"`, h.Sum(nil))
|
||
}
|
||
|
||
func compressionWorthwhile(data []byte, compressed os.FileInfo) bool {
|
||
uncompressedSize := uint64(len(data))
|
||
sz := uint64(compressed.Size())
|
||
|
||
switch {
|
||
case sz+minCompressionSaving > uncompressedSize,
|
||
sz+(uncompressedSize>>minCompressionFraction) > uncompressedSize:
|
||
return false
|
||
default:
|
||
return true
|
||
}
|
||
}
|
||
|
||
// packer packs input files into the output file. It has methods for each type
|
||
// of compression. Unexported methods assume they are called in a context where
|
||
// the lock is not needed or already taken; exported methods take the lock.
|
||
type packer struct {
|
||
w *os.File
|
||
lock sync.Mutex
|
||
cpus chan struct{}
|
||
errors chan error
|
||
aborted chan struct{}
|
||
dir *packed.Directory
|
||
progress Progress
|
||
}
|
||
|
||
// pad will move the file write pointer to the next padding boundary. It is not
|
||
// concurrency safe.
|
||
func (p *packer) pad() error {
|
||
pos, err := p.w.Seek(0, os.SEEK_CUR)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
pos &= (1 << padWidth) - 1
|
||
if pos == 0 { // already aligned
|
||
return nil
|
||
}
|
||
|
||
_, err = p.w.Seek((1<<padWidth)-pos, os.SEEK_CUR)
|
||
return err
|
||
}
|
||
|
||
// appendPath will copy file data from srcPath and append it to the output file. The
|
||
// offset and length are stored in ‘data’ on success. It is not concurrency safe.
|
||
func (p *packer) appendPath(srcPath string, data *packed.FileData) error {
|
||
// open the input file and grab its length
|
||
in, err := os.Open(srcPath)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer in.Close()
|
||
|
||
fi, err := in.Stat()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// copy in the file data
|
||
return p.appendFile(in, fi.Size(), data)
|
||
}
|
||
|
||
// appendFile will copy file data from src and append it to the output file. The
|
||
// offset and length are stored in ‘data’ on success. It is not concurrency safe.
|
||
func (p *packer) appendFile(src *os.File, srcLen int64, data *packed.FileData) error {
|
||
// retrieve current file position and store in data.Offset
|
||
off, err := p.w.Seek(0, os.SEEK_CUR)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
data.Length = uint64(srcLen)
|
||
data.Offset = uint64(off)
|
||
|
||
// copy in the file data
|
||
remain := srcLen
|
||
off = 0
|
||
for remain > 0 {
|
||
var amt int
|
||
if remain > sendfileLimit {
|
||
amt = sendfileLimit
|
||
} else {
|
||
amt = int(remain)
|
||
}
|
||
|
||
amt, err := unix.Sendfile(int(p.w.Fd()), int(src.Fd()), &off, amt)
|
||
remain -= int64(amt)
|
||
if err != nil {
|
||
return fmt.Errorf("sendfile (copying data to "+
|
||
"htpack): %v", err)
|
||
}
|
||
}
|
||
|
||
// leave output file padded to next boundary
|
||
return p.pad()
|
||
}
|
||
|
||
func (p *packer) packFile(path string, fileToPack FileToPack) {
|
||
// open and mmap input file
|
||
f, err := os.Open(fileToPack.Filename)
|
||
if err != nil {
|
||
p.Abort(err)
|
||
return
|
||
}
|
||
defer f.Close()
|
||
|
||
fi, err := f.Stat()
|
||
if err != nil {
|
||
p.Abort(err)
|
||
return
|
||
}
|
||
|
||
var data []byte
|
||
if fi.Size() > 0 {
|
||
data, err = unix.Mmap(int(f.Fd()), 0, int(fi.Size()),
|
||
unix.PROT_READ, unix.MAP_SHARED)
|
||
if err != nil {
|
||
p.Abort(fmt.Errorf("mmap %s: %v", fileToPack.Filename, err))
|
||
return
|
||
}
|
||
}
|
||
|
||
// prepare initial directory entry
|
||
info := &packed.File{
|
||
Etag: etag(data),
|
||
ContentType: fileToPack.ContentType,
|
||
}
|
||
if info.ContentType == "" {
|
||
info.ContentType = http.DetectContentType(data)
|
||
}
|
||
p.dir.Files[path] = info // NB: this part is not concurrent, so no mutex
|
||
|
||
// list of operations on this input file that we'll carry out asynchronously
|
||
ops := []func() error{
|
||
func() error {
|
||
p.progress.Begin(fileToPack.Filename, "uncompressed")
|
||
defer p.progress.End(fileToPack.Filename, "uncompressed")
|
||
return p.Uncompressed(fileToPack.Filename, info)
|
||
},
|
||
}
|
||
if !fileToPack.DisableCompression && !fileToPack.DisableGzip {
|
||
ops = append(ops, func() error {
|
||
p.progress.Begin(fileToPack.Filename, "gzip")
|
||
defer p.progress.End(fileToPack.Filename, "gzip")
|
||
if err := p.Gzip(data, info); err != nil {
|
||
return fmt.Errorf("gzip compression of %s "+
|
||
"failed: %v", fileToPack.Filename, err)
|
||
}
|
||
return nil
|
||
})
|
||
}
|
||
if !fileToPack.DisableCompression && !fileToPack.DisableBrotli {
|
||
ops = append(ops, func() error {
|
||
p.progress.Begin(fileToPack.Filename, "brotli")
|
||
defer p.progress.End(fileToPack.Filename, "brotli")
|
||
if err := p.Brotli(data, info); err != nil {
|
||
return fmt.Errorf("brotli compression of %s "+
|
||
"failed: %v", fileToPack.Filename, err)
|
||
}
|
||
return nil
|
||
})
|
||
}
|
||
|
||
// we have multiple operations on the file, and we need to wait for
|
||
// them all to complete before munmap()
|
||
wg := new(sync.WaitGroup)
|
||
wg.Add(len(ops))
|
||
go func() {
|
||
wg.Wait()
|
||
unix.Munmap(data)
|
||
}()
|
||
|
||
for _, op := range ops {
|
||
select {
|
||
case <-p.aborted:
|
||
// skip the operation
|
||
wg.Done()
|
||
|
||
case p.cpus <- struct{}{}:
|
||
go func(op func() error) {
|
||
if err := op(); err != nil {
|
||
p.Abort(err)
|
||
}
|
||
// release CPU token
|
||
<-p.cpus
|
||
wg.Done()
|
||
}(op)
|
||
}
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
// Abort records that an error occurred and records it onto the errors channel.
|
||
// It signals workers to abort by closed the aborted channel. If called
|
||
// multiple times, only one error will be recorded, and the aborted channel will
|
||
// only be closed once.
|
||
func (p *packer) Abort(err error) {
|
||
select {
|
||
case p.errors <- err:
|
||
// only one error can be written to this channel, so the write
|
||
// acts as a lock to ensure only a single close operation takes
|
||
// place
|
||
close(p.aborted)
|
||
default:
|
||
// errors channel was already written, so we're already aborted
|
||
}
|
||
}
|
||
|
||
// Uncompressed copies in an uncompressed file.
|
||
func (p *packer) Uncompressed(srcPath string, dir *packed.File) error {
|
||
dir.Uncompressed = new(packed.FileData)
|
||
p.lock.Lock()
|
||
defer p.lock.Unlock()
|
||
return p.appendPath(srcPath, dir.Uncompressed)
|
||
}
|
||
|
||
// Gzip will gzip input data to a temporary file, and then append that to the
|
||
// output file.
|
||
func (p *packer) Gzip(data []byte, dir *packed.File) error {
|
||
// write via temporary file
|
||
tmpfile, err := ioutil.TempFile("", "")
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer os.Remove(tmpfile.Name())
|
||
defer tmpfile.Close()
|
||
|
||
// compress
|
||
opts := zopfli.DefaultOptions()
|
||
if len(data) > (10 << 20) { // 10MiB
|
||
opts.NumIterations = 5
|
||
}
|
||
|
||
buf := bufio.NewWriter(tmpfile)
|
||
if err = zopfli.GzipCompress(&opts, data, buf); err != nil {
|
||
return err
|
||
}
|
||
if err = buf.Flush(); err != nil {
|
||
return err
|
||
}
|
||
|
||
// grab file length, evaluate whether compression is worth it
|
||
fi, err := tmpfile.Stat()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !compressionWorthwhile(data, fi) {
|
||
return nil
|
||
}
|
||
|
||
// save the compressed data
|
||
dir.Gzip = new(packed.FileData)
|
||
p.lock.Lock()
|
||
defer p.lock.Unlock()
|
||
return p.appendFile(tmpfile, fi.Size(), dir.Gzip)
|
||
}
|
||
|
||
// Brotli will compress input data to a temporary file, and then append that to
|
||
// the output file.
|
||
func (p *packer) Brotli(data []byte, dir *packed.File) error {
|
||
// write via temporary file
|
||
tmpfile, err := ioutil.TempFile("", "")
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer os.Remove(tmpfile.Name())
|
||
defer tmpfile.Close()
|
||
|
||
// compress
|
||
buf := bufio.NewWriter(tmpfile)
|
||
comp := brotli.NewWriterOptions(buf, brotli.WriterOptions{
|
||
Quality: 11,
|
||
})
|
||
if _, err = comp.Write(data); err != nil {
|
||
return err
|
||
}
|
||
if err = comp.Close(); err != nil {
|
||
return err
|
||
}
|
||
if err = buf.Flush(); err != nil {
|
||
return err
|
||
}
|
||
|
||
// grab file length, evaluate whether compression is worth it
|
||
fi, err := tmpfile.Stat()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !compressionWorthwhile(data, fi) {
|
||
return nil
|
||
}
|
||
|
||
// save the compressed data
|
||
dir.Brotli = new(packed.FileData)
|
||
p.lock.Lock()
|
||
defer p.lock.Unlock()
|
||
return p.appendFile(tmpfile, fi.Size(), dir.Brotli)
|
||
}
|