Skip to content

Commit

Permalink
Merge pull request #164 from jkawamoto/acquireCtx
Browse files Browse the repository at this point in the history
Context based cancellation of acquiring a session
  • Loading branch information
lukechampine authored Apr 13, 2021
2 parents dc4b3df + 1060fe0 commit 574e553
Showing 1 changed file with 31 additions and 16 deletions.
47 changes: 31 additions & 16 deletions renter/renterutil/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,33 @@ import (
"lukechampine.com/us/renterhost"
)

func acquireCtx(ctx context.Context, hosts *HostSet, hostKey hostdb.HostPublicKey, block bool) (*proto.Session, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
var (
sess *proto.Session
err error
)
done := make(chan struct{})
go func() {
sess, err = hosts.tryAcquire(hostKey)
if err == errHostAcquired && block {
sess, err = hosts.acquire(hostKey)
}
if sess != nil && ctx.Err() != nil {
hosts.release(hostKey)
}
close(done)
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-done:
return sess, err
}
}

func uploadCtx(ctx context.Context, sess *proto.Session, shard *[renterhost.SectorSize]byte) (root crypto.Hash, err error) {
if ctx.Err() != nil {
return crypto.Hash{}, ctx.Err()
Expand Down Expand Up @@ -204,10 +231,7 @@ func (pcu ParallelChunkUploader) UploadChunk(ctx context.Context, db MetaDB, c D
go func() {
defer wg.Done()
for req := range reqChan {
sess, err := pcu.Hosts.tryAcquire(req.hostKey)
if err == errHostAcquired && req.block {
sess, err = pcu.Hosts.acquire(req.hostKey)
}
sess, err := acquireCtx(ctx, pcu.Hosts, req.hostKey, req.block)
if err != nil {
respChan <- resp{req, 0, err}
continue
Expand Down Expand Up @@ -451,10 +475,7 @@ func (ocu OverdriveChunkUploader) UploadChunk(ctx context.Context, db MetaDB, c
go func() {
defer wg.Done()
for req := range reqChan {
sess, err := ocu.Hosts.tryAcquire(req.hostKey)
if err == errHostAcquired && req.block {
sess, err = ocu.Hosts.acquire(req.hostKey)
}
sess, err := acquireCtx(ctx, ocu.Hosts, req.hostKey, req.block)
if err != nil {
respChan <- resp{req, crypto.Hash{}, err}
continue
Expand Down Expand Up @@ -656,10 +677,7 @@ func (pcd ParallelChunkDownloader) DownloadChunk(ctx context.Context, db MetaDB,
continue
}

sess, err := pcd.Hosts.tryAcquire(shard.HostKey)
if err == errHostAcquired && req.block {
sess, err = pcd.Hosts.acquire(shard.HostKey)
}
sess, err := acquireCtx(ctx, pcd.Hosts, shard.HostKey, req.block)
if err != nil {
respChan <- resp{req.shardIndex, &HostError{shard.HostKey, err}}
continue
Expand Down Expand Up @@ -764,10 +782,7 @@ func (ocd OverdriveChunkDownloader) DownloadChunk(ctx context.Context, db MetaDB
respChan <- resp{req, nil, &HostError{shard.HostKey, err}}
continue
}
sess, err := ocd.Hosts.tryAcquire(shard.HostKey)
if err == errHostAcquired && req.block {
sess, err = ocd.Hosts.acquire(shard.HostKey)
}
sess, err := acquireCtx(ctx, ocd.Hosts, shard.HostKey, req.block)
if err != nil {
respChan <- resp{req, nil, &HostError{shard.HostKey, err}}
continue
Expand Down

0 comments on commit 574e553

Please sign in to comment.