Skip to content

Commit

Permalink
feat: wire up full & shallow selectors for paths
Browse files Browse the repository at this point in the history
Closes: #82
  • Loading branch information
rvagg committed Feb 28, 2023
1 parent 2b9b5a4 commit b69b851
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 249 deletions.
31 changes: 20 additions & 11 deletions cmd/lassie/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
carstore "github.com/ipld/go-car/v2/storage"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -26,7 +25,7 @@ var fetchProviderAddrInfos []peer.AddrInfo

var fetchCmd = &cli.Command{
Name: "fetch",
Usage: "Fetches content from Filecoin",
Usage: "Fetches content from the IPFS and Filecoin network",
Before: before,
Action: Fetch,
Flags: []cli.Flag{
Expand All @@ -47,6 +46,12 @@ var fetchCmd = &cli.Command{
Aliases: []string{"p"},
Usage: "print progress output",
},
&cli.BoolFlag{
Name: "shallow",
Usage: "only fetch the content at the end of the path",
DefaultText: "false, the entire DAG at the end of the path will be fetched",
Value: false,
},
&cli.StringFlag{
Name: "providers",
Aliases: []string{"provider"},
Expand Down Expand Up @@ -74,11 +79,14 @@ var fetchCmd = &cli.Command{

func Fetch(c *cli.Context) error {
if c.Args().Len() != 1 {
return fmt.Errorf("usage: lassie fetch [-o <CAR file>] [-t <timeout>] <CID>")
return fmt.Errorf("usage: lassie fetch [-o <CAR file>] [-t <timeout>] <CID>[/path/to/content]")
}
progress := c.Bool("progress")

rootCid, err := cid.Parse(c.Args().Get(0))
cpath := c.Args().Get(0)
cstr := strings.Split(cpath, "/")[0]
path := strings.TrimPrefix(cpath, cstr)
rootCid, err := cid.Parse(cstr)
if err != nil {
return err
}
Expand Down Expand Up @@ -106,9 +114,9 @@ func Fetch(c *cli.Context) error {
setupLassieEventRecorder(c, lassie)

if len(fetchProviderAddrInfos) == 0 {
fmt.Printf("Fetching %s", rootCid)
fmt.Printf("Fetching %s", rootCid.String()+path)
} else {
fmt.Printf("Fetching %s from %v", rootCid, fetchProviderAddrInfos)
fmt.Printf("Fetching %s from %v", rootCid.String()+path, fetchProviderAddrInfos)
}
if progress {
fmt.Println()
Expand Down Expand Up @@ -155,13 +163,14 @@ func Fetch(c *cli.Context) error {
fmt.Printf("\rReceived %d blocks / %s...", blockCount, humanize.IBytes(byteLength))
}
}

store := cmdinternal.NewPutCbStore(parentOpener, putCb)
linkSystem := cidlink.DefaultLinkSystem()
linkSystem.SetReadStorage(store)
linkSystem.SetWriteStorage(store)
linkSystem.TrustedStorage = true
request, err := types.NewRequestForPath(store, rootCid, path, !c.Bool("shallow"))
if err != nil {
return err
}

_, stats, err := lassie.Fetch(c.Context, rootCid, linkSystem)
stats, err := lassie.Fetch(c.Context, request)
if err != nil {
fmt.Println()
return err
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car/v2 v2.7.0
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipld/go-ipld-selector-text-lite v0.0.1
github.com/ipni/storetheindex v0.5.4
github.com/libp2p/go-libp2p v0.25.1
github.com/libp2p/go-libp2p-testing v0.12.0
Expand Down
130 changes: 0 additions & 130 deletions go.sum

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions pkg/internal/itest/direct_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lassie/pkg/internal/lp2ptransports"
"github.com/filecoin-project/lassie/pkg/lassie"
"github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-unixfsnode"
carv2 "github.com/ipld/go-car/v2"
Expand Down Expand Up @@ -121,11 +122,9 @@ func TestDirectFetch(t *testing.T) {
}()
outCar, err := storage.NewReadableWritable(outFile, []cid.Cid{rootCid}, carv2.WriteAsCarV1(true))
req.NoError(err)
outLsys := cidlink.DefaultLinkSystem()
outLsys.SetReadStorage(outCar)
outLsys.SetWriteStorage(outCar)
outLsys.TrustedStorage = true
_, _, err = lassie.Fetch(ctx, rootCid, outLsys)
request, err := types.NewRequestForPath(outCar, rootCid, "", true)
req.NoError(err)
_, err = lassie.Fetch(ctx, request)
req.NoError(err)
err = outCar.Finalize()
req.NoError(err)
Expand Down
37 changes: 17 additions & 20 deletions pkg/lassie/lassie.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import (
"github.com/filecoin-project/lassie/pkg/internal"
"github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/ipld/go-ipld-prime/linking"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
)

// Lassie represents a reusable retrieval client.
type Lassie struct {
cfg *LassieConfig
retriever *retriever.Retriever
}

// LassieConfig customizes the behavior of a Lassie instance.
type LassieConfig struct {
Finder retriever.CandidateFinder
Host host.Host
Expand All @@ -33,6 +33,7 @@ type LassieConfig struct {

type LassieOption func(cfg *LassieConfig)

// NewLassie creates a new Lassie instance.
func NewLassie(ctx context.Context, opts ...LassieOption) (*Lassie, error) {
cfg := &LassieConfig{}
for _, opt := range opts {
Expand All @@ -41,6 +42,8 @@ func NewLassie(ctx context.Context, opts ...LassieOption) (*Lassie, error) {
return NewLassieWithConfig(ctx, cfg)
}

// NewLassieWithConfig creates a new Lassie instance with a custom
// configuration.
func NewLassieWithConfig(ctx context.Context, cfg *LassieConfig) (*Lassie, error) {
if cfg.Finder == nil {
var err error
Expand Down Expand Up @@ -93,29 +96,38 @@ func NewLassieWithConfig(ctx context.Context, cfg *LassieConfig) (*Lassie, error
return lassie, nil
}

// WithFinder allows you to specify a custom candidate finder.
func WithFinder(finder retriever.CandidateFinder) LassieOption {
return func(cfg *LassieConfig) {
cfg.Finder = finder
}
}

// WithProviderTimeout allows you to specify a custom timeout for retrieving
// data from a provider. Beyond this limit, when no data has been received,
// the retrieval will fail.
func WithProviderTimeout(timeout time.Duration) LassieOption {
return func(cfg *LassieConfig) {
cfg.ProviderTimeout = timeout
}
}

// WithGlobalTimeout allows you to specify a custom timeout for the entire
// retrieval process.
func WithGlobalTimeout(timeout time.Duration) LassieOption {
return func(cfg *LassieConfig) {
cfg.GlobalTimeout = timeout
}
}

// WithHost allows you to specify a custom libp2p host.
func WithHost(host host.Host) LassieOption {
return func(cfg *LassieConfig) {
cfg.Host = host
}
}

// WithLibp2pOpts allows you to specify custom libp2p options.
func WithLibp2pOpts(libp2pOptions ...libp2p.Option) LassieOption {
return func(cfg *LassieConfig) {
cfg.Libp2pOptions = libp2pOptions
Expand All @@ -128,7 +140,7 @@ func WithConcurrentSPRetrievals(maxConcurrentSPRtreievals uint) LassieOption {
}
}

func (l *Lassie) Retrieve(ctx context.Context, request types.RetrievalRequest) (*types.RetrievalStats, error) {
func (l *Lassie) Fetch(ctx context.Context, request types.RetrievalRequest) (*types.RetrievalStats, error) {
var cancel context.CancelFunc
if l.cfg.GlobalTimeout != time.Duration(0) {
ctx, cancel = context.WithTimeout(ctx, l.cfg.GlobalTimeout)
Expand All @@ -137,23 +149,8 @@ func (l *Lassie) Retrieve(ctx context.Context, request types.RetrievalRequest) (
return l.retriever.Retrieve(ctx, request, func(types.RetrievalEvent) {})
}

func (l *Lassie) Fetch(ctx context.Context, rootCid cid.Cid, linkSystem linking.LinkSystem) (types.RetrievalID, *types.RetrievalStats, error) {
// Assign an ID to this retrieval
retrievalId, err := types.NewRetrievalID()
if err != nil {
return types.RetrievalID{}, nil, err
}

// retrieve!
request := types.RetrievalRequest{RetrievalID: retrievalId, Cid: rootCid, LinkSystem: linkSystem}
stats, err := l.retriever.Retrieve(ctx, request, func(types.RetrievalEvent) {})
if err != nil {
return retrievalId, nil, err
}

return retrievalId, stats, nil
}

// RegisterSubscriber registers a subscriber to receive retrieval events.
// The returned function can be called to unregister the subscriber.
func (l *Lassie) RegisterSubscriber(subscriber types.RetrievalEventSubscriber) func() {
return l.retriever.RegisterSubscriber(subscriber)
}
78 changes: 55 additions & 23 deletions pkg/retriever/selectorutils/selectorutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,75 @@ package selectorutils

import (
"fmt"
"strings"

"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
textselector "github.com/ipld/go-ipld-selector-text-lite"
)

// PathToSelector converts a standard IPLD path to a selector that matches the
// whole path (inclusive) and the complete DAG at its termination.
// Kind of a combination of these two, with some additional magic sprinkles:
// https://github.com/ipld/go-ipld-selector-text-lite/blob/master/parser.go
// https://github.com/ipfs/go-unixfsnode/blob/9cc15a4574f13f434f2da2cd1afb9de7d0bb3979/signaling.go#L23
// TODO: upstream this, probably to go-unixfsnode

// PathToSelector converts a standard IPLD path to a selector that explores the
// whole UnixFS path (inclusive), and if 'full' is true, the complete DAG at its
// termination, or if not true, only the UnixFS node at its termination (which
// may be an entire sharded directory or file).
//
// Paths must start with a '/'.
// Path is optional, but if supplied it must start with a '/'.
//
// The empty path "" is treated as a special case and will match the entire
// DAG.
func PathToSelector(path string) (ipld.Node, error) {
if len(path) == 0 {
return selectorparse.CommonSelector_ExploreAllRecursively, nil
}

if path[0] != '/' {
// This selector does _not_ match, only explore, so it's useful for traversals
// where block loads are important, not where the matcher visitor callback is
// important.
func UnixfsPathToSelector(path string, full bool) (ipld.Node, error) {
if len(path) > 0 && path[0] != '/' {
return nil, fmt.Errorf("path must start with /")
}

ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
selspec, err := textselector.SelectorSpecFromPath(
textselector.Expression(path),
true, // match path inclusive
// match everything below the path:
ssb.ExploreRecursive(
var ss builder.SelectorSpec
if full {
// ExploreAllRecursively
ss = ssb.ExploreRecursive(
selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
),
)
if err != nil {
return nil, fmt.Errorf("failed to parse path as a selector '%s': %w", path, err)
)
} else {
// ExploreAll only this node, interpreted as unixfs-preload, which will
// load sharded files and sharded directories, not go no further.
ss = ssb.ExploreInterpretAs("unixfs-preload", ssb.ExploreRecursive(
selector.RecursionLimitDepth(0),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
))
}
return selspec.Node(), nil

segments := strings.Split(path, "/")
for i := len(segments) - 1; i >= 0; i-- {
if segments[i] == "" {
// Allow one leading and one trailing '/' at most
if i == 0 || i == len(segments)-1 {
continue
}
return nil, fmt.Errorf("invalid empty path segment at position %d", i)
}

if segments[i] == "." || segments[i] == ".." {
return nil, fmt.Errorf("'%s' is unsupported in paths", segments[i])
}

// Wrap selector in ExploreFields as we walk back up through the path.
// We can assume each segment to be a unixfs path section, so we
// InterpretAs to make sure the node is reified through go-unixfsnode
// (if possible) and we can traverse through according to unixfs pathing
// rather than bare IPLD pathing - which also gives us the ability to
// traverse through HAMT shards.
ss = ssb.ExploreInterpretAs("unixfs", ssb.ExploreFields(
func(efsb builder.ExploreFieldsSpecBuilder) { efsb.Insert(segments[i], ss) },
))
}

return ss.Node(), nil
}
Loading

0 comments on commit b69b851

Please sign in to comment.