Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable estargz-based lazy pulling on registry cache importer #2648

Merged
merged 1 commit into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cache/remotecache/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/snapshots"
"github.com/docker/distribution/reference"
"github.com/moby/buildkit/cache/remotecache"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/estargz"
"github.com/moby/buildkit/util/push"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/limited"
Expand Down Expand Up @@ -105,3 +107,17 @@ func (dsl *withDistributionSourceLabel) SetDistributionSourceAnnotation(desc oci
desc.Annotations["containerd.io/distribution.source.ref"] = dsl.ref
return desc
}

func (dsl *withDistributionSourceLabel) SnapshotLabels(descs []ocispecs.Descriptor, index int) map[string]string {
if len(descs) < index {
return nil
}
labels := snapshots.FilterInheritedLabels(descs[index].Annotations)
if labels == nil {
labels = make(map[string]string)
}
for k, v := range estargz.SnapshotLabels(dsl.ref, descs, index) {
labels[k] = v
}
return labels
}
156 changes: 156 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func TestIntegration(t *testing.T) {
testSourceMapFromRef,
testLazyImagePush,
testStargzLazyPull,
testStargzLazyInlineCacheImportExport,
testFileOpInputSwap,
testRelativeMountpoint,
testLocalSourceDiffer,
Expand Down Expand Up @@ -2880,6 +2881,161 @@ func testBuildPushAndValidate(t *testing.T, sb integration.Sandbox) {
require.False(t, ok)
}

func testStargzLazyInlineCacheImportExport(t *testing.T, sb integration.Sandbox) {
skipDockerd(t, sb)
requiresLinux(t)

cdAddress := sb.ContainerdAddress()
if cdAddress == "" || sb.Snapshotter() != "stargz" {
t.Skip("test requires containerd worker with stargz snapshotter")
}

client, err := newContainerd(cdAddress)
require.NoError(t, err)
defer client.Close()
registry, err := sb.NewRegistry()
if errors.Is(err, integration.ErrRequirements) {
t.Skip(err.Error())
}
require.NoError(t, err)

var (
imageService = client.ImageService()
contentStore = client.ContentStore()
ctx = namespaces.WithNamespace(sb.Context(), "buildkit")
)

c, err := New(sb.Context(), sb.Address())
require.NoError(t, err)
defer c.Close()

// Prepare stargz inline cache
orgImage := "docker.io/library/alpine:latest"
sgzImage := registry + "/stargz/alpine:" + identity.NewID()
baseDef := llb.Image(orgImage).Run(llb.Args([]string{"/bin/touch", "/foo"}))
def, err := baseDef.Marshal(sb.Context())
require.NoError(t, err)
_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": sgzImage,
"push": "true",
"compression": "estargz",
"oci-mediatypes": "true",
"force-compression": "true",
},
},
},
CacheExports: []CacheOptionsEntry{
{
Type: "inline",
},
},
}, nil)
require.NoError(t, err)

// clear all local state out
err = imageService.Delete(ctx, sgzImage, images.SynchronousDelete())
require.NoError(t, err)
checkAllReleasable(t, c, sb, true)

// stargz layers should be lazy even for executing something on them
def, err = baseDef.
Run(llb.Args([]string{"/bin/touch", "/bar"})).
Marshal(sb.Context())
require.NoError(t, err)
target := registry + "/buildkit/testlazyimage:" + identity.NewID()
_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterImage,
Attrs: map[string]string{
"name": target,
"push": "true",
"oci-mediatypes": "true",
"compression": "estargz",
},
},
},
CacheExports: []CacheOptionsEntry{
{
Type: "inline",
},
},
CacheImports: []CacheOptionsEntry{
{
Type: "registry",
Attrs: map[string]string{
"ref": sgzImage,
},
},
},
}, nil)
require.NoError(t, err)

img, err := imageService.Get(ctx, target)
require.NoError(t, err)

manifest, err := images.Manifest(ctx, contentStore, img.Target, nil)
require.NoError(t, err)

// Check if image layers are lazy.
// The topmost(last) layer created by `Run` isn't lazy so we skip the check for the layer.
var sgzLayers []ocispecs.Descriptor
for i, layer := range manifest.Layers[:len(manifest.Layers)-1] {
_, err = contentStore.Info(ctx, layer.Digest)
require.ErrorIs(t, err, ctderrdefs.ErrNotFound, "unexpected error %v on layer %+v (%d)", err, layer, i)
sgzLayers = append(sgzLayers, layer)
}
require.NotEqual(t, 0, len(sgzLayers), "no layer can be used for checking lazypull")

// The topmost(last) layer created by `Run` shouldn't be lazy
_, err = contentStore.Info(ctx, manifest.Layers[len(manifest.Layers)-1].Digest)
require.NoError(t, err)

// clear all local state out
err = imageService.Delete(ctx, img.Name, images.SynchronousDelete())
require.NoError(t, err)
checkAllReleasable(t, c, sb, true)

// stargz layers should be exportable
destDir, err := ioutil.TempDir("", "buildkit")
require.NoError(t, err)
defer os.RemoveAll(destDir)
out := filepath.Join(destDir, "out.tar")
outW, err := os.Create(out)
require.NoError(t, err)
_, err = c.Solve(sb.Context(), def, SolveOpt{
Exports: []ExportEntry{
{
Type: ExporterOCI,
Output: fixedWriteCloser(outW),
},
},
CacheImports: []CacheOptionsEntry{
{
Type: "registry",
Attrs: map[string]string{
"ref": sgzImage,
},
},
},
}, nil)
require.NoError(t, err)

// Check if image layers are un-lazied
for _, layer := range sgzLayers {
_, err = contentStore.Info(ctx, layer.Digest)
require.NoError(t, err)
}

err = c.Prune(sb.Context(), nil, PruneAll)
require.NoError(t, err)
checkAllRemoved(t, c, sb)
}

func testStargzLazyPull(t *testing.T, sb integration.Sandbox) {
skipDockerd(t, sb)
requiresLinux(t)
Expand Down
27 changes: 3 additions & 24 deletions source/containerimage/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,17 @@ package containerimage
import (
"context"
"encoding/json"
"fmt"
"runtime"
"strings"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
containerderrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
ctdlabels "github.com/containerd/containerd/labels"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/stargz-snapshotter/estargz"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/session"
Expand All @@ -26,6 +22,7 @@ import (
"github.com/moby/buildkit/solver/errdefs"
"github.com/moby/buildkit/source"
srctypes "github.com/moby/buildkit/source/types"
"github.com/moby/buildkit/util/estargz"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/leaseutil"
Expand Down Expand Up @@ -218,31 +215,13 @@ func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (cach

p.descHandlers = cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler))
for i, desc := range p.manifest.Descriptors {
// Hints for remote/stargz snapshotter for searching for remote snapshots
labels := snapshots.FilterInheritedLabels(desc.Annotations)
if labels == nil {
labels = make(map[string]string)
}
for _, k := range []string{estargz.TOCJSONDigestAnnotation, estargz.StoreUncompressedSizeAnnotation} {
labels[k] = desc.Annotations[k]
for k, v := range estargz.SnapshotLabels(p.manifest.Ref, p.manifest.Descriptors, i) {
labels[k] = v
}
labels["containerd.io/snapshot/remote/stargz.reference"] = p.manifest.Ref
labels["containerd.io/snapshot/remote/stargz.digest"] = desc.Digest.String()
var (
layersKey = "containerd.io/snapshot/remote/stargz.layers"
layers string
)
for _, l := range p.manifest.Descriptors[i:] {
ls := fmt.Sprintf("%s,", l.Digest.String())
// This avoids the label hits the size limitation.
// Skipping layers is allowed here and only affects performance.
if err := ctdlabels.Validate(layersKey, layers+ls); err != nil {
break
}
layers += ls
}
labels[layersKey] = strings.TrimSuffix(layers, ",")

p.descHandlers[desc.Digest] = &cache.DescHandler{
Provider: p.manifest.Provider,
Progress: progressController,
Expand Down
24 changes: 24 additions & 0 deletions util/contentutil/multiprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ type MultiProvider struct {
sub map[digest.Digest]content.Provider
}

func (mp *MultiProvider) SnapshotLabels(descs []ocispecs.Descriptor, index int) map[string]string {
if len(descs) < index {
return nil
}
desc := descs[index]
type snapshotLabels interface {
SnapshotLabels([]ocispecs.Descriptor, int) map[string]string
}

mp.mu.RLock()
if p, ok := mp.sub[desc.Digest]; ok {
mp.mu.RUnlock()
if cd, ok := p.(snapshotLabels); ok {
return cd.SnapshotLabels(descs, index)
}
} else {
mp.mu.RUnlock()
}
if cd, ok := mp.base.(snapshotLabels); ok {
return cd.SnapshotLabels(descs, index)
}
return nil
}

func (mp *MultiProvider) CheckDescriptor(ctx context.Context, desc ocispecs.Descriptor) error {
type checkDescriptor interface {
CheckDescriptor(context.Context, ocispecs.Descriptor) error
Expand Down
38 changes: 38 additions & 0 deletions util/estargz/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package estargz

import (
"fmt"
"strings"

ctdlabels "github.com/containerd/containerd/labels"
"github.com/containerd/stargz-snapshotter/estargz"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
)

func SnapshotLabels(ref string, descs []ocispecs.Descriptor, targetIndex int) map[string]string {
if len(descs) < targetIndex {
return nil
}
desc := descs[targetIndex]
labels := make(map[string]string)
for _, k := range []string{estargz.TOCJSONDigestAnnotation, estargz.StoreUncompressedSizeAnnotation} {
labels[k] = desc.Annotations[k]
}
labels["containerd.io/snapshot/remote/stargz.reference"] = ref
labels["containerd.io/snapshot/remote/stargz.digest"] = desc.Digest.String()
var (
layersKey = "containerd.io/snapshot/remote/stargz.layers"
layers string
)
for _, l := range descs[targetIndex:] {
ls := fmt.Sprintf("%s,", l.Digest.String())
// This avoids the label hits the size limitation.
// Skipping layers is allowed here and only affects performance.
if err := ctdlabels.Validate(layersKey, layers+ls); err != nil {
break
}
layers += ls
}
labels[layersKey] = strings.TrimSuffix(layers, ",")
return labels
}
26 changes: 22 additions & 4 deletions worker/base/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,20 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cac
WriterFactory: progress.FromContext(ctx),
},
}
snapshotLabels := func([]ocispecs.Descriptor, int) map[string]string { return nil }
if cd, ok := remote.Provider.(interface {
SnapshotLabels([]ocispecs.Descriptor, int) map[string]string
}); ok {
snapshotLabels = cd.SnapshotLabels
}
descHandlers := cache.DescHandlers(make(map[digest.Digest]*cache.DescHandler))
for _, desc := range remote.Descriptors {
descHandlers[desc.Digest] = descHandler
for i, desc := range remote.Descriptors {
descHandlers[desc.Digest] = &cache.DescHandler{
Provider: descHandler.Provider,
Progress: descHandler.Progress,
Annotations: desc.Annotations,
SnapshotLabels: snapshotLabels(remote.Descriptors, i),
}
}

var current cache.ImmutableRef
Expand All @@ -417,10 +428,17 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (ref cac
if v, ok := desc.Annotations["buildkit/description"]; ok {
descr = v
}
ref, err := w.CacheMgr.GetByBlob(ctx, desc, current,
opts := []cache.RefOption{
cache.WithDescription(descr),
cache.WithCreationTime(tm),
descHandlers)
descHandlers,
}
if dh, ok := descHandlers[desc.Digest]; ok {
if ref, ok := dh.Annotations["containerd.io/distribution.source.ref"]; ok {
opts = append(opts, cache.WithImageRef(ref)) // can set by registry cache importer
}
}
ref, err := w.CacheMgr.GetByBlob(ctx, desc, current, opts...)
if current != nil {
current.Release(context.TODO())
}
Expand Down