Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http retrieve entire piece (not just CAR file) #640

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 62 additions & 6 deletions cmd/booster-http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
)

var ErrNotFound = errors.New("not found")

const carSuffix = ".car"

type HttpServer struct {
path string
port int
Expand Down Expand Up @@ -129,6 +133,7 @@ func (s *HttpServer) handleByPayloadCid(w http.ResponseWriter, r *http.Request)
}

fileName := r.URL.Path[prefixLen:]
isCar := strings.HasSuffix(fileName, carSuffix)
payloadCidStr := strings.Replace(fileName, ".car", "", 1)
payloadCid, err := cid.Parse(payloadCidStr)
if err != nil {
Expand All @@ -155,6 +160,9 @@ func (s *HttpServer) handleByPayloadCid(w http.ResponseWriter, r *http.Request)
pieceCid := pieces[0]
ctx := r.Context()
content, err := s.getPieceContent(ctx, pieceCid)
if err == nil && isCar {
content, err = s.getCarContent(pieceCid, content)
}
if err != nil {
if isNotFoundError(err) {
msg := fmt.Sprintf("getting content for payload CID %s in piece %s: %s", payloadCidStr, pieceCid, err)
Expand All @@ -167,7 +175,7 @@ func (s *HttpServer) handleByPayloadCid(w http.ResponseWriter, r *http.Request)
return
}

serveCAR(w, r, content)
serveContent(w, r, content, getContentType(isCar))
}

func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) {
Expand All @@ -179,7 +187,8 @@ func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) {
}

fileName := r.URL.Path[prefixLen:]
pieceCidStr := strings.Replace(fileName, ".car", "", 1)
isCar := strings.HasSuffix(fileName, carSuffix)
pieceCidStr := strings.Replace(fileName, carSuffix, "", 1)
pieceCid, err := cid.Parse(pieceCidStr)
if err != nil {
msg := fmt.Sprintf("parsing piece CID '%s': %s", pieceCidStr, err.Error())
Expand All @@ -189,6 +198,9 @@ func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) {

ctx := r.Context()
content, err := s.getPieceContent(ctx, pieceCid)
if err == nil && isCar {
content, err = s.getCarContent(pieceCid, content)
}
Comment on lines 200 to +203
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't getPieceContent and getCarContent expensive operations? Shouldn't we do one or the other, but not override content when isCar == true ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case content is an io.ReadSeeker
getPieceContent gets a reader over the unsealed piece.
if isCar == true then getCarContent figures out the size of the CAR file by asking the DAG store for the max offset of the CAR index. If the DAG store doesn't have the index, getCarContent creates the index on the fly. Once it has the max offset, it just wraps the piece reader with a limit reader so that it only serves the CAR file.

if err != nil {
if isNotFoundError(err) {
writeError(w, r, http.StatusNotFound, err.Error())
Expand All @@ -200,13 +212,20 @@ func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) {
return
}

serveCAR(w, r, content)
serveContent(w, r, content, getContentType(isCar))
}

func getContentType(isCar bool) string {
if isCar {
return "application/vnd.ipld.car"
}
return "application/piece"
}

func serveCAR(w http.ResponseWriter, r *http.Request, content io.ReadSeeker) {
func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, contentType string) {
// Set the Content-Type header explicitly so that http.ServeContent doesn't
// try to do it implicitly
w.Header().Set("Content-Type", "application/vnd.ipld.car")
w.Header().Set("Content-Type", contentType)

if r.Method == "HEAD" {
// For an HTTP HEAD request we don't send any data (just headers)
Expand Down Expand Up @@ -282,11 +301,21 @@ func (s *HttpServer) getPieceContent(ctx context.Context, pieceCid cid.Cid) (io.
return nil, fmt.Errorf("getting raw data from sector %d: %w", di.SectorID, err)
}

return pieceReader, nil
}

func (s *HttpServer) getCarContent(pieceCid cid.Cid, pieceReader io.ReadSeeker) (io.ReadSeeker, error) {
maxOffset, err := s.api.GetMaxPieceOffset(pieceCid)
if err != nil {
return nil, fmt.Errorf("getting max offset for piece %s: %w", pieceCid, err)
// If it's not possible to get the max piece offset it may be because
// the CAR file hasn't been indexed yet. So try to index it in real time.
maxOffset, err = getMaxPieceOffset(pieceReader)
if err != nil {
return nil, fmt.Errorf("getting max offset for piece %s: %w", pieceCid, err)
}
}

// Seek to the max offset
_, err = pieceReader.Seek(int64(maxOffset), io.SeekStart)
if err != nil {
return nil, fmt.Errorf("seeking to offset %d in piece data: %w", maxOffset, err)
Expand Down Expand Up @@ -329,6 +358,33 @@ func (s *HttpServer) getPieceContent(ctx context.Context, pieceCid cid.Cid) (io.
return lr, nil
}

// getMaxPieceOffset generates a CAR file index from the reader, and returns
// the maximum offset in the index
func getMaxPieceOffset(reader io.ReadSeeker) (uint64, error) {
idx, err := car.GenerateIndex(reader, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true))
if err != nil {
return 0, fmt.Errorf("generating CAR index: %w", err)
}

itidx, ok := idx.(index.IterableIndex)
if !ok {
return 0, fmt.Errorf("could not cast CAR file index %t to an IterableIndex", idx)
}

var maxOffset uint64
err = itidx.ForEach(func(m multihash.Multihash, offset uint64) error {
if offset > maxOffset {
maxOffset = offset
}
return nil
})
if err != nil {
return 0, fmt.Errorf("getting max offset: %w", err)
}

return maxOffset, nil
}

type limitSeekReader struct {
io.Reader
readSeeker io.ReadSeeker
Expand Down