Datastore is an interface.
Flatfs is the lowest level interface.
// CachedBlockstore returns a blockstore wrapped in an ARCCache and // then in a bloom filter cache, if the options indicate it. func CachedBlockstore( ctx context.Context, bs Blockstore, opts CacheOpts) (cbs Blockstore, err error) { cbs = bs if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 || opts.HasARCCacheSize < 0 { return nil, errors.New("all options for cache need to be greater than zero") } if opts.HasBloomFilterSize ! = 0 && opts.HasBloomFilterHashes == 0 { return nil, errors.New("bloom filter hash count can't be 0 when there is size set") } ctx = metrics.CtxSubScope(ctx, "bs.cache") if opts.HasARCCacheSize > 0 { cbs, err = newARCCachedBS(ctx, cbs, opts.HasARCCacheSize) } if opts.HasBloomFilterSize ! = 0 { // *8 because of bytes to bits conversion cbs, err = bloomCached(ctx, cbs, opts.HasBloomFilterSize*8, opts.HasBloomFilterHashes) } return cbs, err }Copy the code
retrydatastore
This interface builds on previous FlatFS and LevelDB.
Retry and sleep are used.
If the operation fails, a retry is performed.
- Retry a maximum of six times
- Each failure, sleep 200ms; The first failure is 200ms, the second is 400, a linear function.
We wrap the operations performed on the FlatFS layer as a function.
- The err function is called operation
- At the heart of the Retry layer is a function of runOp().
- It performs the function operation.
- It then retries if it is a temporary error.
- EMFILE Too many open files 24
- If it is caused by too many open files, it will be retried
func isTooManyFDError(err error) bool {
perr, ok := err.(*os.PathError)
if ok && perr.Err == syscall.EMFILE {
return true
}
return false
}
Copy the code
rds := &retrystore.Datastore{
Batching: repo.Datastore(),
Delay: time.Millisecond * 200,
Retries: 6,
TempErrFunc: isTooManyFDError,
}
Copy the code
func (d *Datastore) runOp(op func() error) error { err := op() if err == nil || ! d.TempErrFunc(err) { return err } for i := 0; i < d.Retries; i++ { time.Sleep(time.Duration(i+1) * d.Delay) err = op() if err == nil || ! d.TempErrFunc(err) { return err } } return xerrors.Errorf(errFmtString, err) }Copy the code
arccached
And then on RetryDatastore we’ll wrap another layer.
Is called a cache.
We use an Arccached one for caching.
Cache is key – whether it exists, or size. (Convenient for GETSIZE function)
When we get.
-
The key is found in the cache and does not exist.
Then we can go straight back.
-
If the key is not found or the size is displayed, we will look it up in the Blockstore
-
If not found in blockstore, then cache key and a bool false means not found.
-
If found, we use the size of the cache key.
func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { if ! k.Defined() { log.Error("undefined cid in arc cache") return nil, ErrNotFound } if has, _, ok := b.hasCached(k); ok && ! has { return nil, ErrNotFound } bl, err := b.blockstore.Get(k) if bl == nil && err == ErrNotFound { b.cacheHave(k, false) } else if bl ! = nil { b.cacheSize(k, len(bl.RawData())) } return bl, err }Copy the code
Same thing with put
If it already exists in the cache, then there is no need to duplicate the cache.
hash
If it is found and not there, then you don’t have to go to has.
Cache the results.
func (b *arccache) Has(k cid.Cid) (bool, error) { if has, _, ok := b.hasCached(k); ok { return has, nil } has, err := b.blockstore.Has(k) if err ! = nil { return false, err } b.cacheHave(k, has) return has, nil }Copy the code
bloom
This arccache, it’s 64 << 10.
A maximum of 65536 keys can be stored.
Bloom filter size is 512 << 10,
The number of stored items is significantly larger.
errors.New("usage: New(float64(number_of_entries), float64(number_of_hashlocations))
Copy the code
CacheOpts{
HasBloomFilterSize: 512 << 10,
HasBloomFilterHashes: 7,
HasARCCacheSize: 64 << 10,
}
Copy the code
For has requests, check the bloom filter for possible HAS.
There is no need to look in the cache if it is completely impossible.
// if ok == false has is inconclusive // if ok == true then has respons to question: is it contained func (b *bloomcache) hasCached(k cid.Cid) (has bool, ok bool) { b.total.Inc() if ! k.Defined() { log.Error("undefined in bloom cache") // Return cache invalid so call to blockstore // in case of invalid key is forwarded deeper return false, false } if b.BloomActive() { blr := b.bloom.HasTS(k.Hash()) if ! blr { // not contained in bloom is only conclusive answer bloom gives b.hits.Inc() return false, true } } return false, false } func (b *bloomcache) Has(k cid.Cid) (bool, error) { if has, ok := b.hasCached(k); ok { return has, nil } return b.blockstore.Has(k) }Copy the code
For get, filter out impossible requests.
For put, add this value to the Bloom filter.
Because it may not exist, there is no need to judge.
func (b *bloomcache) Get(k cid.Cid) (blocks.Block, error) { if has, ok := b.hasCached(k); ok && ! has { return nil, ErrNotFound } return b.blockstore.Get(k) } func (b *bloomcache) Put(bl blocks.Block) error { // See comment in PutMany err := b.blockstore.Put(bl) if err == nil { b.bloom.AddTS(bl.Cid().Hash()) } return err }Copy the code
BATCH
-
In fact the dagService we use for adding a file is called bufferDag
-
There is a batch in it called batch processing
-
Operations are buffered in this request.
-
Then commit at the end.
-
That is, buffered write operations, read operations immediately executed.
-
The remove operation is also executed immediately.
-
Write operations are not buffered indefinitely and are automatically asyncCommit when the size is greater than 8 << 20 bytes, or after 128 nodes.
-
The maximum number of commit threads is the number of logical cpus * 2, or 8 on Intel Corei7.
-
AsyncCommit starts a coroutine and copies the current context and the node to be written to.
-
And a chan that returns result.
-
Note that the batch structure is applied to a single file (i.e. a directory may contain many files).
-
Although it is axynccommit
-
But the Commit function itself is synchronous.
// Constructs a node from reader's data, and adds it. Doesn't pin.
func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
chnk, err := chunker.FromString(reader, adder.Chunker)
iferr ! =nil {
return nil, err
}
params := ihelper.DagBuilderParams{
Dagserv: adder.bufferedDS,
RawLeaves: adder.RawLeaves,
Maxlinks: ihelper.DefaultLinksPerBlock,
NoCopy: adder.NoCopy,
CidBuilder: adder.CidBuilder,
}
db, err := params.New(chnk)
iferr ! =nil {
return nil, err
}
var nd ipld.Node
if adder.Trickle {
nd, err = trickle.Layout(db)
} else {
nd, err = balanced.Layout(db)
}
iferr ! =nil {
return nil, err
}
return nd, adder.bufferedDS.Commit()
}
Copy the code
// Commit commits batched nodes.
func (t *Batch) Commit(a) error {
ift.err ! =nil {
return t.err
}
t.asyncCommit()
loop:
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
iferr ! =nil {
t.setError(err)
break loop
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
break loop
}
}
return t.err
}
Copy the code
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2
Copy the code
var defaultBatchOptions = batchOptions{
maxSize: 8 << 20.// that is 8M
// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
maxNodes: 128,}Copy the code
// Get commits and gets a node from the DAGService.
func (bd *BufferedDAG) Get(ctx context.Context, c cid.Cid) (Node, error) {
err := bd.b.Commit()
iferr ! =nil {
return nil, err
}
return bd.ds.Get(ctx, c)
}
Copy the code
// BufferedDAG implements DAGService using a Batch NodeAdder to wrap add
// operations in the given DAGService. It will trigger Commit() before any
// non-Add operations, but otherwise calling Commit() is left to the user.
type BufferedDAG struct {
ds DAGService
b *Batch
}
// NewBufferedDAG creates a BufferedDAG using the given DAGService and the
// given options for the Batch NodeAdder.
func NewBufferedDAG(ctx context.Context, ds DAGService, opts ... BatchOption) *BufferedDAG {
return&BufferedDAG{ ds: ds, b: NewBatch(ctx, ds, opts...) ,}}Copy the code
// AddMany many calls Add for every given Node, thus batching and
// commiting them as needed.
func (t *Batch) AddMany(ctx context.Context, nodes []Node) error {
ift.err ! =nil {
return t.err
}
// Not strictly necessary but allows us to catch errors early.
t.processResults()
ift.err ! =nil {
return t.err
}
t.nodes = append(t.nodes, nodes...)
for _, nd := range nodes {
t.size += len(nd.RawData())
}
if t.size > t.opts.maxSize || len(t.nodes) > t.opts.maxNodes {
t.asyncCommit()
}
return t.err
}
Copy the code
- The size of slices allocated after aysncCommitt is not initialized to 0..
- Note that if you add 16MB of data directly, you will not start two threads to commit
- Instead, it will start a thread to commit 16M of data (we are adding blocks one by one so this will not happen).
func (t *Batch) asyncCommit() { numBlocks := len(t.nodes) if numBlocks == 0 { return } if t.activeCommits >= ParallelBatchCommits { select { case err := <-t.commitResults: t.activeCommits-- if err ! = nil { t.setError(err) return } case <-t.ctx.Done(): t.setError(t.ctx.Err()) return } } go func(ctx context.Context, b []Node, result chan error, na NodeAdder) { select { case result <- na.AddMany(ctx, b): case <-ctx.Done(): } }(t.ctx, t.nodes, t.commitResults, t.na) t.activeCommits++ t.nodes = make([]Node, 0, numBlocks) t.size = 0 return }Copy the code
How do I display the progress bar
func (adder *Adder) addFile(path string, file files.File) error {
// if the progress flag was specified, wrap the file so that we can send
// progress updates to the client (over the output channel)
var reader io.Reader = file
if adder.Progress {
rdr := &progressReader{file: reader, path: path, out: adder.Out}
if fi, ok := file.(files.FileInfo); ok {
reader = &progressReader2{rdr, fi}
} else {
reader = rdr
}
}
Copy the code
When we read a file from a file.
Let’s wrap it as a progressReader structure.
This is also where the later chunker is read when chunked.
- This works by stuffing a structure into a pipe every 256KB (1024 * 256 bytes) read
- Include the current pathname of the file and how many bytes have been read from it.
type progressReader struct {
file io.Reader
path string
out chan<- interface{}
bytes int64
lastProgress int64
}
func (i *progressReader) Read(p []byte) (int, error) {
n, err := i.file.Read(p)
i.bytes += int64(n)
if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
i.lastProgress = i.bytes
i.out <- &coreiface.AddEvent{
Name: i.path,
Bytes: i.bytes,
}
}
return n, err
}
Copy the code
To add a file to the command line, we turn on a coroutine
- Loop in the results from the Events window
- Include the name of the file
- hash
- Byte stream
- Then emit it
for event := range events {
output, ok := event.(*coreiface.AddEvent)
if! ok {return errors.New("unknown event type")
}
h := ""
ifoutput.Path ! =nil {
h = enc.Encode(output.Path.Cid())
}
if! dir && name ! ="" {
output.Name = name
} else {
output.Name = path.Join(name, output.Name)
}
iferr := res.Emit(&AddEvent{ Name: output.Name, Hash: h, Bytes: output.Bytes, Size: output.Size, }); err ! =nil {
return err
}
}
return <-errCh
},
Copy the code