Skip to content

Commit

Permalink
Autodisconnect.0 (#19)
Browse files Browse the repository at this point in the history
* auto disconnect working
  • Loading branch information
asabya authored Oct 6, 2021
1 parent 95a1bef commit 655b263
Show file tree
Hide file tree
Showing 13 changed files with 354 additions and 56 deletions.
9 changes: 5 additions & 4 deletions cli/cmd/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,18 @@ Example:
return err
}
kind, _ := filetype.Match(head)
tag, _ := cmd.Flags().GetString("tag")
if tag == "" {
tag = filepath.Base(f.Name())
}
meta := &replication.Metatag{
Size: fileinfo.Size(),
Type: kind.MIME.Value,
Name: filepath.Base(f.Name()),
Hash: n.Cid(),
Timestamp: time.Now().Unix(),
Owner: comm.LitePeer.Host.ID(),
}
tag, _ := cmd.Flags().GetString("tag")
if tag == "" {
tag = filepath.Base(f.Name())
Tag: tag,
}
err = comm.LitePeer.Manager.Tag(tag, meta)
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions cli/cmd/completion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package cmd

import (
"os"

"github.com/datahop/ipfs-lite/cli/common"
"github.com/spf13/cobra"
)

// InitCompletionCmd creates the stop command
func InitCompletionCmd(comm *common.Common) *cobra.Command {
return &cobra.Command{
Use: "completion [bash|zsh|fish|powershell]",
Short: "Generate completion script",
Long: "To load completions",
DisableFlagsInUseLine: true,
ValidArgs: []string{"bash", "zsh", "fish", "powershell"},
Args: cobra.ExactValidArgs(1),
Run: func(cmd *cobra.Command, args []string) {
switch args[0] {
case "bash":
cmd.Root().GenBashCompletion(os.Stdout)
case "zsh":
cmd.Root().GenZshCompletion(os.Stdout)
}
},
}
}
2 changes: 1 addition & 1 deletion cli/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ datahop network by a simple tag
Example:
// To save at default "Download" location
// To save at default "download" location
$ datahop get /test.txt
Expand Down
2 changes: 1 addition & 1 deletion cli/datahop.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func main() {
cmd.InitMatrixCmd(comm),
cmd.InitializeDocCommand(comm),
cmd.InitGetCmd(comm),
cmd.InitCompletionCmd(comm),
)

for _, i := range allCommands {
rootCmd.AddCommand(i)
}

// check help flag
for _, v := range os.Args {
if v == "-h" || v == "--help" {
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.3.2
github.com/multiformats/go-multihash v0.0.15
github.com/plexsysio/taskmanager v0.0.0-00010101000000-000000000000
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
golang.org/x/mobile v0.0.0-20210902104108-5d9a33257ab5 // indirect
golang.org/x/tools v0.1.2 // indirect
google.golang.org/protobuf v1.26.0
)

replace github.com/plexsysio/taskmanager => github.com/asabya/taskmanager v0.1.0
14 changes: 5 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/asabya/go-ipc-uds v0.1.1 h1:xpCn1761D9RXx0nbp5y/REYb0IFjoTe2cRn12YyhfLM=
github.com/asabya/go-ipc-uds v0.1.1/go.mod h1:6fnoNAImh5CdGv6q9katuePtyNIhIfuNscswQJ13Wl4=
github.com/asabya/taskmanager v0.1.0 h1:vc1MxWLaKIqFS8d055QceIXEonfXvw+mi9mkgcGzOMg=
github.com/asabya/taskmanager v0.1.0/go.mod h1:jJPMZyf78X72eyZNrJS5Cz75OFBhQP2+81/FkAEvyOM=
github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
Expand Down Expand Up @@ -880,8 +882,9 @@ go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
Expand Down Expand Up @@ -927,7 +930,6 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4=
golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek=
golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
Expand All @@ -948,14 +950,12 @@ golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPI
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG1In4gJY5YRhtpDNeDeHWs=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mobile v0.0.0-20210902104108-5d9a33257ab5 h1:peBP2oZO/xVnGMaWMCyFEI0WENsGj71wx5K12mRELHQ=
golang.org/x/mobile v0.0.0-20210902104108-5d9a33257ab5/go.mod h1:c4YKU3ZylDmvbw+H/PSvm42vhdWbuxCzbonauEAP9B8=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand Down Expand Up @@ -1049,12 +1049,10 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210426080607-c94f62235c83 h1:kHSDPqCtsHZOg0nVylfTo20DDhE9gG4Y0jn7hKQ0QAM=
golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -1106,8 +1104,6 @@ golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
4 changes: 3 additions & 1 deletion internal/matrix/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (

// ContentMatrix keeps record for each hash
type ContentMatrix struct {
Tag string
Size int64
AvgSpeed float32
DownloadStartedAt int64
Expand Down Expand Up @@ -308,7 +309,7 @@ func (mKeeper *MatrixKeeper) NodeDisconnected(address string) {
}

// ContentDownloadStarted updates content download start time of a hash
func (mKeeper *MatrixKeeper) ContentDownloadStarted(hash string, size int64) {
func (mKeeper *MatrixKeeper) ContentDownloadStarted(tag, hash string, size int64) {
mKeeper.mtx.Lock()
defer mKeeper.mtx.Unlock()

Expand All @@ -318,6 +319,7 @@ func (mKeeper *MatrixKeeper) ContentDownloadStarted(hash string, size int64) {
}
}
contentMatrix := mKeeper.ContentMatrix[hash]
contentMatrix.Tag = tag
contentMatrix.DownloadStartedAt = time.Now().Unix()
contentMatrix.Size = size
log.Debug("ContentDownloadStarted : ", contentMatrix)
Expand Down
130 changes: 103 additions & 27 deletions internal/replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package replication
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/datahop/ipfs-lite/internal/repo"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/plexsysio/taskmanager"
)

var (
Expand All @@ -23,6 +25,7 @@ var (

// Metatag keeps meta information of a content in the crdt store
type Metatag struct {
Tag string
Size int64
Type string
Name string
Expand All @@ -33,12 +36,15 @@ type Metatag struct {

// Manager handles replication
type Manager struct {
ctx context.Context
cancel context.CancelFunc
crdt *crdt.Datastore
contentChan chan cid.Cid
syncer Syncer
repo repo.Repo
ctx context.Context
cancel context.CancelFunc
crdt *crdt.Datastore
syncer Syncer
repo repo.Repo
syncMtx sync.Mutex

dlManager *taskmanager.TaskManager
download chan Metatag
}

// Syncer gets the file and finds file provider from the network
Expand Down Expand Up @@ -69,29 +75,22 @@ func New(
if err != nil {
return nil, err
}
contentChan := make(chan cid.Cid)
contentChan := make(chan Metatag)
crdtOpts := crdt.DefaultOptions()
crdtOpts.Logger = log
crdtOpts.RebroadcastInterval = broadcastInterval
crdtOpts.PutHook = func(k datastore.Key, v []byte) {
log.Debugf("Added: [%s] -> %s\n", k, string(v))
log.Debugf("CRDT Replication :: Added Key: [%s] -> Value: %s\n", k, string(v))
m := &Metatag{}
err := json.Unmarshal(v, m)
if err != nil {
log.Error(err.Error())
return
}
contentChan <- m.Hash
state := r.State().Add([]byte(k.Name()))
log.Debugf("New State: %d\n", state)
err = r.SetState()
if err != nil {
log.Errorf("SetState failed %s\n", err.Error())
}
r.Matrix().ContentDownloadStarted(m.Hash.String(), m.Size)
contentChan <- *m
}
crdtOpts.DeleteHook = func(k datastore.Key) {
log.Debugf("Removed: [%s]\n", k)
log.Debugf("CRDT Replication :: Removed: [%s]\n", k)
state := r.State().Add([]byte("removed " + k.Name()))
log.Debugf("New State: %d\n", state)
err = r.SetState()
Expand All @@ -105,17 +104,19 @@ func New(
}

return &Manager{
ctx: ctx,
crdt: crdtStore,
contentChan: contentChan,
syncer: syncer,
cancel: cancel,
repo: r,
ctx: ctx,
crdt: crdtStore,
download: contentChan,
syncer: syncer,
cancel: cancel,
repo: r,
dlManager: taskmanager.New(1, 100, time.Second*15),
}, nil
}

// Close the crdt store
func (m *Manager) Close() error {
m.dlManager.Stop(m.ctx)
m.cancel()
return m.crdt.Close()
}
Expand Down Expand Up @@ -199,6 +200,29 @@ func (m *Manager) GetAllCids() ([]cid.Cid, error) {
return cids, nil
}

// DownloadManagerStatus returns taskmanager status for handling downloads
func (m *Manager) DownloadManagerStatus() (res map[string]taskmanager.TaskStatus) {
return m.dlManager.TaskStatus()
}

// StartUnfinishedDownload starts unfinished downloads once peer comes back
func (m *Manager) StartUnfinishedDownload(pid peer.ID) {
cm := m.repo.Matrix().ContentMatrix
for _, v := range cm {
if v.DownloadFinishedAt == 0 {
for _, provider := range v.ProvidedBy {
if provider == pid {
meta, err := m.FindTag(v.Tag)
if err != nil {
continue
}
m.download <- *meta
}
}
}
}
}

// Put stores the object `value` named by `key`.
func (m *Manager) Put(key datastore.Key, v []byte) error {
return m.crdt.Put(key, v)
Expand Down Expand Up @@ -230,21 +254,73 @@ func (m *Manager) StartContentWatcher() {
select {
case <-m.ctx.Done():
return
case id := <-m.contentChan:
case meta := <-m.download:
id := meta.Hash
log.Debugf("got %s\n", id.String())
go func() {
providers := m.syncer.FindProviders(m.ctx, id)
for _, provider := range providers {
m.repo.Matrix().ContentAddProvider(id.String(), provider)
}
_, err := m.syncer.GetFile(m.ctx, id)
//_, err := m.syncer.GetFile(m.ctx, id)
//if err != nil {
// log.Errorf("replication sync failed for %s, Err : %s", id.String(), err.Error())
// return
//}
ctx, cancel := context.WithCancel(m.ctx)
t := newDownloaderTask(ctx, cancel, meta, m.syncer)
done, err := m.dlManager.Go(t)
if err != nil {
log.Errorf("replication sync failed for %s, Err : %s", id.String(), err.Error())
log.Errorf("content watcher: unable to start downloader task for %s : %s", meta.Name, err.Error())
return
}
m.repo.Matrix().ContentDownloadFinished(id.String())
m.repo.Matrix().ContentDownloadStarted(meta.Tag, id.String(), meta.Size)
select {
case <-ctx.Done():
return
case <-done:
m.repo.Matrix().ContentDownloadFinished(id.String())
m.syncMtx.Lock()
state := m.repo.State().Add([]byte(meta.Tag))
log.Debugf("New State: %d\n", state)
err = m.repo.SetState()
if err != nil {
log.Errorf("SetState failed %s\n", err.Error())
}
m.syncMtx.Unlock()
}
}()
}
}
}()
}

type DownloaderTask struct {
ctx context.Context
cancel context.CancelFunc
metatag Metatag
syncer Syncer
}

func newDownloaderTask(ctx context.Context, cancel context.CancelFunc, metatag Metatag, syncer Syncer) *DownloaderTask {
return &DownloaderTask{
ctx: ctx,
cancel: cancel,
metatag: metatag,
syncer: syncer,
}
}

func (d *DownloaderTask) Execute(ctx context.Context) error {
<-time.After(time.Second)
_, err := d.syncer.GetFile(d.ctx, d.metatag.Hash)
if err != nil {
log.Errorf("replication sync failed for %s, Err : %s", d.metatag.Hash.String(), err.Error())
return err
}
return nil
}

func (d *DownloaderTask) Name() string {
return d.metatag.Name
}
Loading

0 comments on commit 655b263

Please sign in to comment.