/* Package packer implements the core packing functionality. It is designed to be used by a wrapper program (CLI etc.). */ package packer import ( "bufio" "crypto/sha512" "fmt" "io/ioutil" "net/http" "os" "runtime" "sync" "github.com/andybalholm/brotli" "github.com/foobaz/go-zopfli/zopfli" "golang.org/x/sys/unix" "src.lwithers.me.uk/go/htpack/packed" "src.lwithers.me.uk/go/writefile" ) // FilesToPack is the set of files which will be incorporated into the packfile. // The key is the path at which the file will be served, and the value gives the // disk filename as well as headers / options. type FilesToPack map[string]FileToPack // FileToPack contains the headers / options for a file which is about to be // packed. type FileToPack struct { // Filename is the path to the file on disk (relative or absolute). Filename string `yaml:"filename"` // ContentType is used as the Content-Type header for the source data. ContentType string `yaml:"content_type"` // DisableCompression can be set to skip any compression for this file. DisableCompression bool `yaml:"disable_compression"` // DisableGzip can be set to skip gzip compression for this file. DisableGzip bool `yaml:"disable_gzip"` // DisableBrotli can be set to skip brotli compression for this file. DisableBrotli bool `yaml:"disable_brotli"` } // Progress is a callback object which reports packing progress. type Progress interface { // Count reports the number of items that have begun processing. Count(n int) // Begin denotes the processing of an input file. Begin(filename, compression string) // End denotes the completion of input file processing. End(filename, compression string) } type ignoreProgress int func (ignoreProgress) Count(_ int) {} func (ignoreProgress) Begin(_, _ string) {} func (ignoreProgress) End(_, _ string) {} const ( // minCompressionFileSize is the minimum filesize we need before // considering compression. Note this must be at least 2, to avoid // known bugs in go-zopfli. minCompressionFileSize = 128 // minCompressionSaving means we'll only use the compressed version of // the file if it's at least this many bytes smaller than the original. // Chosen somewhat arbitrarily; we have to add an HTTP header, and the // decompression overhead is not zero. minCompressionSaving = 128 // minCompressionFraction means we'll only use the compressed version of // the file if it's at least (origSize>>minCompressionFraction) bytes // smaller than the original. This is a guess at when the decompression // overhead outweighs the time saved in transmission. minCompressionFraction = 7 // i.e. files must be at least 1/128 smaller // padWidth is the padding alignment size expressed as a power of 2. // The value 12 (i.e. 4096 bytes) is chosen to align with a common // page size and filesystem block size. padWidth = 12 // sendfileLimit is the number of bytes we can transfer through a single // sendfile(2) call. This value is from the man page. sendfileLimit = 0x7FFFF000 ) // Pack a file. Use Pack2 for progress reporting. func Pack(filesToPack FilesToPack, outputFilename string) error { return Pack2(filesToPack, outputFilename, nil) } // Pack2 will pack a file, with progress reporting. The progress interface may // be nil. func Pack2(filesToPack FilesToPack, outputFilename string, progress Progress) error { if progress == nil { progress = ignoreProgress(0) } finalFname, w, err := writefile.New(outputFilename) if err != nil { return err } defer writefile.Abort(w) // we use this little structure to serialise file writes below, and // it has a couple of convenience methods for common operations packer := packer{ w: w, progress: progress, } // write initial header (will rewrite offset/length when known) hdr := &packed.Header{ Magic: packed.Magic, Version: packed.VersionInitial, DirectoryOffset: 1, DirectoryLength: 1, } m, err := hdr.Marshal() if err != nil { return fmt.Errorf("failed to marshal header (%T): %v", hdr, err) } if _, err = w.Write(m); err != nil { return err } if err = packer.pad(); err != nil { return err } // Channel to limit number of CPU-bound goroutines. One token is written // to the channel for each active worker; since the channel is bounded, // further writes will block at the limit. As workers complete, they // consume a token from the channel. nCPU := runtime.NumCPU() + 2 // +2 for I/O bound portions if nCPU < 4 { nCPU = 4 } packer.cpus = make(chan struct{}, nCPU) // Channel to report worker errors. Writes should be non-blocking. If // your error is dropped, don't worry, an earlier error will be // reported. packer.errors = make(chan error, 1) // Channel to abort further operations. It should be closed to abort. // The closer should be the one who writes onto packer.errors. packer.aborted = make(chan struct{}) // write the packed files, storing info for the directory structure packer.dir = &packed.Directory{ Files: make(map[string]*packed.File), } var count int PackingLoop: for path, fileToPack := range filesToPack { select { case <-packer.aborted: // a worker reported an error; break out of loop early break PackingLoop default: packer.packFile(path, fileToPack) count++ progress.Count(count) } } // wait for all goroutines to complete for n := 0; n < nCPU; n++ { packer.cpus <- struct{}{} } // check whether any of the just-completed goroutines returned an error select { case err = <-packer.errors: return err default: } // write the directory if m, err = packer.dir.Marshal(); err != nil { err = fmt.Errorf("failed to marshal directory object (%T): %v", packer.dir, err) return err } dirOffset, err := w.Seek(0, os.SEEK_CUR) if err != nil { return err } if _, err := w.Write(m); err != nil { return err } // now modify the header at the start of the file hdr.DirectoryOffset = uint64(dirOffset) hdr.DirectoryLength = uint64(len(m)) if m, err = hdr.Marshal(); err != nil { return fmt.Errorf("failed to marshal header (%T): %v", hdr, err) } if _, err = w.WriteAt(m, 0); err != nil { return err } // all done! return writefile.Commit(finalFname, w) } func etag(in []byte) string { h := sha512.New384() h.Write(in) return fmt.Sprintf(`"1--%x"`, h.Sum(nil)) } func compressionWorthwhile(data []byte, compressed os.FileInfo) bool { uncompressedSize := uint64(len(data)) sz := uint64(compressed.Size()) switch { case sz+minCompressionSaving > uncompressedSize, sz+(uncompressedSize>>minCompressionFraction) > uncompressedSize: return false default: return true } } // packer packs input files into the output file. It has methods for each type // of compression. Unexported methods assume they are called in a context where // the lock is not needed or already taken; exported methods take the lock. type packer struct { w *os.File lock sync.Mutex cpus chan struct{} errors chan error aborted chan struct{} dir *packed.Directory progress Progress } // pad will move the file write pointer to the next padding boundary. It is not // concurrency safe. func (p *packer) pad() error { pos, err := p.w.Seek(0, os.SEEK_CUR) if err != nil { return err } pos &= (1 << padWidth) - 1 if pos == 0 { // already aligned return nil } _, err = p.w.Seek((1< 0 { var amt int if remain > sendfileLimit { amt = sendfileLimit } else { amt = int(remain) } amt, err := unix.Sendfile(int(p.w.Fd()), int(src.Fd()), &off, amt) remain -= int64(amt) if err != nil { return fmt.Errorf("sendfile (copying data to "+ "htpack): %v", err) } } // leave output file padded to next boundary return p.pad() } func (p *packer) packFile(path string, fileToPack FileToPack) { // open and mmap input file f, err := os.Open(fileToPack.Filename) if err != nil { p.Abort(err) return } defer f.Close() fi, err := f.Stat() if err != nil { p.Abort(err) return } var data []byte if fi.Size() > 0 { data, err = unix.Mmap(int(f.Fd()), 0, int(fi.Size()), unix.PROT_READ, unix.MAP_SHARED) if err != nil { p.Abort(fmt.Errorf("mmap %s: %v", fileToPack.Filename, err)) return } } // prepare initial directory entry info := &packed.File{ Etag: etag(data), ContentType: fileToPack.ContentType, } if info.ContentType == "" { info.ContentType = http.DetectContentType(data) } p.dir.Files[path] = info // NB: this part is not concurrent, so no mutex // list of operations on this input file that we'll carry out asynchronously ops := []func() error{ func() error { p.progress.Begin(fileToPack.Filename, "uncompressed") defer p.progress.End(fileToPack.Filename, "uncompressed") return p.Uncompressed(fileToPack.Filename, info) }, } if !fileToPack.DisableCompression && !fileToPack.DisableGzip { ops = append(ops, func() error { p.progress.Begin(fileToPack.Filename, "gzip") defer p.progress.End(fileToPack.Filename, "gzip") if err := p.Gzip(data, info); err != nil { return fmt.Errorf("gzip compression of %s "+ "failed: %w", fileToPack.Filename, err) } return nil }) } if !fileToPack.DisableCompression && !fileToPack.DisableBrotli { ops = append(ops, func() error { p.progress.Begin(fileToPack.Filename, "brotli") defer p.progress.End(fileToPack.Filename, "brotli") if err := p.Brotli(data, info); err != nil { return fmt.Errorf("brotli compression of %s "+ "failed: %w", fileToPack.Filename, err) } return nil }) } // we have multiple operations on the file, and we need to wait for // them all to complete before munmap() wg := new(sync.WaitGroup) wg.Add(len(ops)) go func() { wg.Wait() unix.Munmap(data) }() for _, op := range ops { select { case <-p.aborted: // skip the operation wg.Done() case p.cpus <- struct{}{}: go func(op func() error) { if err := op(); err != nil { p.Abort(err) } // release CPU token <-p.cpus wg.Done() }(op) } } return } // Abort records that an error occurred and records it onto the errors channel. // It signals workers to abort by closed the aborted channel. If called // multiple times, only one error will be recorded, and the aborted channel will // only be closed once. func (p *packer) Abort(err error) { select { case p.errors <- err: // only one error can be written to this channel, so the write // acts as a lock to ensure only a single close operation takes // place close(p.aborted) default: // errors channel was already written, so we're already aborted } } // Uncompressed copies in an uncompressed file. func (p *packer) Uncompressed(srcPath string, dir *packed.File) error { dir.Uncompressed = new(packed.FileData) p.lock.Lock() defer p.lock.Unlock() return p.appendPath(srcPath, dir.Uncompressed) } // Gzip will gzip input data to a temporary file, and then append that to the // output file. func (p *packer) Gzip(data []byte, dir *packed.File) error { if len(data) < minCompressionFileSize { return nil } // write via temporary file tmpfile, err := ioutil.TempFile("", "") if err != nil { return err } defer os.Remove(tmpfile.Name()) defer tmpfile.Close() // compress opts := zopfli.DefaultOptions() if len(data) > (10 << 20) { // 10MiB opts.NumIterations = 5 } buf := bufio.NewWriter(tmpfile) if err = zopfli.GzipCompress(&opts, data, buf); err != nil { return err } if err = buf.Flush(); err != nil { return err } // grab file length, evaluate whether compression is worth it fi, err := tmpfile.Stat() if err != nil { return err } if !compressionWorthwhile(data, fi) { return nil } // save the compressed data dir.Gzip = new(packed.FileData) p.lock.Lock() defer p.lock.Unlock() return p.appendFile(tmpfile, fi.Size(), dir.Gzip) } // Brotli will compress input data to a temporary file, and then append that to // the output file. func (p *packer) Brotli(data []byte, dir *packed.File) error { if len(data) < minCompressionFileSize { return nil } // write via temporary file tmpfile, err := ioutil.TempFile("", "") if err != nil { return err } defer os.Remove(tmpfile.Name()) defer tmpfile.Close() // compress buf := bufio.NewWriter(tmpfile) comp := brotli.NewWriterOptions(buf, brotli.WriterOptions{ Quality: 11, }) if _, err = comp.Write(data); err != nil { return err } if err = comp.Close(); err != nil { return err } if err = buf.Flush(); err != nil { return err } // grab file length, evaluate whether compression is worth it fi, err := tmpfile.Stat() if err != nil { return err } if !compressionWorthwhile(data, fi) { return nil } // save the compressed data dir.Brotli = new(packed.FileData) p.lock.Lock() defer p.lock.Unlock() return p.appendFile(tmpfile, fi.Size(), dir.Brotli) }