From b7a256702a24bcd15c54616a3a7591cad56749c8 Mon Sep 17 00:00:00 2001 From: Marco Vidonis <31407403+marcovidonis@users.noreply.github.com> Date: Fri, 9 Feb 2024 17:01:03 +0000 Subject: [PATCH 1/4] handle connection to torrent peer status update messages * basic observer framework * connects to a valid tracker * added observer channel for announce status * set up Peer Connection status Observers * add PeerConn test: connection established * added Observers factory method * Added Event to AnnounceStatus, with embedded TrackerStatus * state updates must be non-blocking * add unit tests on PeerConn Observer status reading * add test and debug log on dropped connection * add PeerID check to test --------- Co-authored-by: Parker Whittle --- client-peerconn_test.go | 231 +++++++++++++++++++++++++++++++++++ client-tracker_test.go | 166 +++++++++++++++++++++++++ client.go | 27 +++- config.go | 20 +++ peerconn.go | 19 +++ testing.go | 11 ++ torrent.go | 14 +++ webtorrent/tracker-client.go | 64 ++++++++++ wstracker.go | 4 +- 9 files changed, 554 insertions(+), 2 deletions(-) create mode 100644 client-peerconn_test.go create mode 100644 client-tracker_test.go diff --git a/client-peerconn_test.go b/client-peerconn_test.go new file mode 100644 index 0000000000..9e114b4edc --- /dev/null +++ b/client-peerconn_test.go @@ -0,0 +1,231 @@ +package torrent + +import ( + "io" + "os" + "testing" + "testing/iotest" + "time" + + "github.com/anacrolix/missinggo/v2" + "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/torrent/internal/testutil" + "github.com/anacrolix/torrent/types" + "github.com/frankban/quicktest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func TestPeerConnObserverReadStatusOk(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.EstablishedConnsPerTorrent = 1 + cfg.Observers = &Observers{ + Peers: PeerObserver{ + PeerStatus: make(chan PeerStatus), + }, + } + + c, _ := NewClient(cfg) + defer c.Close() + + go func() { + cfg.Observers.Peers.PeerStatus <- PeerStatus{ + Ok: true, + } + }() + + status := readChannelTimeout(t, cfg.Observers.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + require.True(t, status.Ok) + require.Equal(t, "", status.Err) +} + +func TestPeerConnObserverReadStatusErr(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.EstablishedConnsPerTorrent = 1 + cfg.Observers = &Observers{ + Peers: PeerObserver{ + PeerStatus: make(chan PeerStatus), + }, + } + + c, _ := NewClient(cfg) + defer c.Close() + + go func() { + cfg.Observers.Peers.PeerStatus <- PeerStatus{ + Err: "test error", + } + }() + + status := readChannelTimeout(t, cfg.Observers.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + require.False(t, status.Ok) + require.Equal(t, status.Err, "test error") +} + +func TestPeerConnEstablished(t *testing.T) { + obs := NewClientObservers() + ps := testClientTransferParams{ + ConfigureSeeder: ConfigureClient{ + Config: func(cfg *ClientConfig) { + cfg.PeerID = "12345123451234512345" + }, + }, + ConfigureLeecher: ConfigureClient{ + Config: func(cfg *ClientConfig) { + // TODO one of UTP or TCP is needed for the transfer + // Does this mean we're not doing webtorrent? TBC + // cfg.DisableUTP = true + cfg.DisableTCP = true + cfg.Debug = false + cfg.DisableTrackers = true + cfg.EstablishedConnsPerTorrent = 1 + cfg.Observers = obs + }, + }, + } + + go testClientTransfer(t, ps) + + status := readChannelTimeout(t, obs.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + var expectedPeerId types.PeerID + missinggo.CopyExact(&expectedPeerId, "12345123451234512345") + require.Equal(t, expectedPeerId, status.Id) + require.True(t, status.Ok) + require.Equal(t, "", status.Err) + + // Peer conn is dropped after transfer is finished. This is the next update we receive. + status = readChannelTimeout(t, obs.Peers.PeerStatus, 500*time.Millisecond).(PeerStatus) + require.Equal(t, expectedPeerId, status.Id) + require.False(t, status.Ok) + require.Equal(t, "", status.Err) +} + +type ConfigureClient struct { + Config func(cfg *ClientConfig) + Client func(cl *Client) +} + +type testClientTransferParams struct { + SeederUploadRateLimiter *rate.Limiter + LeecherDownloadRateLimiter *rate.Limiter + ConfigureSeeder ConfigureClient + ConfigureLeecher ConfigureClient + + LeecherStartsWithoutMetadata bool +} + +// Simplified version of testClientTransfer found in test/leecher-storage.go. +// Could not import and reuse that function due to circular dependencies between modules. +func testClientTransfer(t *testing.T, ps testClientTransferParams) { + greetingTempDir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(greetingTempDir) + // Create seeder and a Torrent. + cfg := TestingConfig(t) + cfg.Seed = true + // Some test instances don't like this being on, even when there's no cache involved. + cfg.DropMutuallyCompletePeers = false + if ps.SeederUploadRateLimiter != nil { + cfg.UploadRateLimiter = ps.SeederUploadRateLimiter + } + cfg.DataDir = greetingTempDir + if ps.ConfigureSeeder.Config != nil { + ps.ConfigureSeeder.Config(cfg) + } + seeder, err := NewClient(cfg) + require.NoError(t, err) + if ps.ConfigureSeeder.Client != nil { + ps.ConfigureSeeder.Client(seeder) + } + seederTorrent, _, _ := seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) + defer seeder.Close() + <-seederTorrent.Complete().On() + + // Create leecher and a Torrent. + leecherDataDir := t.TempDir() + cfg = TestingConfig(t) + // See the seeder client config comment. + cfg.DropMutuallyCompletePeers = false + cfg.DataDir = leecherDataDir + if ps.LeecherDownloadRateLimiter != nil { + cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter + } + cfg.Seed = false + if ps.ConfigureLeecher.Config != nil { + ps.ConfigureLeecher.Config(cfg) + } + leecher, err := NewClient(cfg) + require.NoError(t, err) + defer leecher.Close() + if ps.ConfigureLeecher.Client != nil { + ps.ConfigureLeecher.Client(leecher) + } + leecherTorrent, new, err := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { + ret = TorrentSpecFromMetaInfo(mi) + ret.ChunkSize = 2 + if ps.LeecherStartsWithoutMetadata { + ret.InfoBytes = nil + } + return + }()) + require.NoError(t, err) + assert.False(t, leecherTorrent.Complete().Bool()) + assert.True(t, new) + + added := leecherTorrent.AddClientPeer(seeder) + assert.False(t, leecherTorrent.Seeding()) + // The leecher will use peers immediately if it doesn't have the metadata. Otherwise, they + // should be sitting idle until we demand data. + if !ps.LeecherStartsWithoutMetadata { + assert.EqualValues(t, added, leecherTorrent.Stats().PendingPeers) + } + if ps.LeecherStartsWithoutMetadata { + <-leecherTorrent.GotInfo() + } + r := leecherTorrent.NewReader() + defer r.Close() + go leecherTorrent.SetInfoBytes(mi.InfoBytes) + + assertReadAllGreeting(t, r) + <-leecherTorrent.Complete().On() + assert.NotEmpty(t, seederTorrent.PeerConns()) + leecherPeerConns := leecherTorrent.PeerConns() + if cfg.DropMutuallyCompletePeers { + // I don't think we can assume it will be empty already, due to timing. + // assert.Empty(t, leecherPeerConns) + } else { + assert.NotEmpty(t, leecherPeerConns) + } + foundSeeder := false + for _, pc := range leecherPeerConns { + completed := pc.PeerPieces().GetCardinality() + t.Logf("peer conn %v has %v completed pieces", pc, completed) + if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) { + foundSeeder = true + } + } + if !foundSeeder { + t.Errorf("didn't find seeder amongst leecher peer conns") + } + + seederStats := seederTorrent.Stats() + assert.True(t, 13 <= seederStats.BytesWrittenData.Int64()) + assert.True(t, 8 <= seederStats.ChunksWritten.Int64()) + + leecherStats := leecherTorrent.Stats() + assert.True(t, 13 <= leecherStats.BytesReadData.Int64()) + assert.True(t, 8 <= leecherStats.ChunksRead.Int64()) + + // Try reading through again for the cases where the torrent data size + // exceeds the size of the cache. + assertReadAllGreeting(t, r) +} + +func assertReadAllGreeting(t *testing.T, r io.ReadSeeker) { + pos, err := r.Seek(0, io.SeekStart) + assert.NoError(t, err) + assert.EqualValues(t, 0, pos) + quicktest.Check(t, iotest.TestReader(r, []byte(testutil.GreetingFileContents)), quicktest.IsNil) +} diff --git a/client-tracker_test.go b/client-tracker_test.go new file mode 100644 index 0000000000..a75e861a25 --- /dev/null +++ b/client-tracker_test.go @@ -0,0 +1,166 @@ +package torrent + +import ( + "errors" + "github.com/anacrolix/torrent/internal/testutil" + "github.com/anacrolix/torrent/tracker" + "github.com/anacrolix/torrent/webtorrent" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + "net" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" +) + +func TestClientInvalidTracker(t *testing.T) { + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {"ws://test.invalid:4242"}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.ConnStatus, 500*time.Millisecond).(webtorrent.TrackerStatus) + require.Equal(t, "ws://test.invalid:4242", status.Url) + var expected *net.OpError + require.ErrorAs(t, expected, &status.Err) + + to.Drop() +} + +var upgrader = websocket.Upgrader{} + +func testtracker(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer c.Close() + for { + _, _, err := c.ReadMessage() + if err != nil { + break + } + //err = c.WriteMessage(mt, message) + //if err != nil { + // break + //} + } +} + +func TestClientValidTrackerConn(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.ConnStatus, 500*time.Millisecond).(webtorrent.TrackerStatus) + require.Equal(t, trackerUrl, status.Url) + require.True(t, status.Ok) + require.Nil(t, status.Err) + + to.Drop() +} + +func TestClientAnnounceFailure(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + cl.websocketTrackers.GetAnnounceRequest = func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) { + return tracker.AnnounceRequest{}, errors.New("test error") + } + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.AnnounceStatus, 500*time.Millisecond).(webtorrent.AnnounceStatus) + require.Equal(t, trackerUrl, status.Url) + require.False(t, status.Ok) + require.EqualError(t, status.Err, "test error") + require.Empty(t, status.Event) + + to.Drop() +} + +func TestClientAnnounceSuccess(t *testing.T) { + s, trackerUrl := startTestTracker() + defer s.Close() + + cfg := TestingConfig(t) + cfg.DisableTrackers = false + cfg.Observers = NewClientObservers() + + cl, err := NewClient(cfg) + require.NoError(t, err) + defer cl.Close() + + dir, mi := testutil.GreetingTestTorrent() + defer os.RemoveAll(dir) + + mi.AnnounceList = [][]string{ + {trackerUrl}, + } + + to, err := cl.AddTorrent(mi) + require.NoError(t, err) + + status := readChannelTimeout(t, cfg.Observers.Trackers.AnnounceStatus, 500*time.Millisecond).(webtorrent.AnnounceStatus) + require.Equal(t, trackerUrl, status.Url) + require.True(t, status.Ok) + require.Nil(t, status.Err) + require.Equal(t, "started", status.Event) + + to.Drop() +} + +func startTestTracker() (*httptest.Server, string) { + s := httptest.NewServer(http.HandlerFunc(testtracker)) + trackerUrl := "ws" + strings.TrimPrefix(s.URL, "http") + return s, trackerUrl +} diff --git a/client.go b/client.go index acf5732ec1..1fd4e307ca 100644 --- a/client.go +++ b/client.go @@ -18,6 +18,8 @@ import ( "strconv" "time" + "github.com/anacrolix/torrent/webtorrent" + "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" @@ -48,7 +50,6 @@ import ( "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/types/infohash" infohash_v2 "github.com/anacrolix/torrent/types/infohash-v2" - "github.com/anacrolix/torrent/webtorrent" ) // Clients contain zero or more Torrents. A Client manages a blocklist, the @@ -313,7 +314,12 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { } } + var obs *webtorrent.TrackerObserver + if cl.config.Observers != nil { + obs = &cl.config.Observers.Trackers + } cl.websocketTrackers = websocketTrackers{ + obs: obs, PeerId: cl.peerID, Logger: cl.logger.WithNames("websocketTrackers"), GetAnnounceRequest: func( @@ -742,6 +748,11 @@ func doProtocolHandshakeOnDialResult( cl := t.cl nc := dr.Conn addrIpPort, _ := tryIpPortFromNetAddr(addr) + + var obs *PeerObserver + if t.cl.config.Observers != nil { + obs = &t.cl.config.Observers.Peers + } c, err = cl.initiateProtocolHandshakes( context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ @@ -751,6 +762,7 @@ func doProtocolHandshakeOnDialResult( localPublicAddr: cl.publicAddr(addrIpPort.IP), network: dr.Dialer.DialerNetwork(), connString: regularNetConnPeerConnConnString(nc), + obs: obs, }) if err != nil { nc.Close() @@ -1129,8 +1141,19 @@ func (t *Torrent) runHandshookConn(pc *PeerConn) error { pc.startMessageWriter() pc.sendInitialMessages() pc.initUpdateRequestsTimer() + + pc.UpdatePeerConnStatus(PeerStatus{ + Id: pc.PeerID, + Ok: true, + }) + err := pc.mainReadLoop() if err != nil { + pc.UpdatePeerConnStatus(PeerStatus{ + Id: pc.PeerID, + Ok: false, + Err: fmt.Sprintf("%s", err), + }) return fmt.Errorf("main read loop: %w", err) } return nil @@ -1639,6 +1662,7 @@ type newConnectionOpts struct { localPublicAddr peerLocalPublicAddr network string connString string + obs *PeerObserver } func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) { @@ -1659,6 +1683,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon }, connString: opts.connString, conn: nc, + Observers: opts.obs, } c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn c.initRequestState() diff --git a/config.go b/config.go index 76979463cc..7fdd89f450 100644 --- a/config.go +++ b/config.go @@ -7,6 +7,8 @@ import ( "net/url" "time" + "github.com/anacrolix/torrent/webtorrent" + "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/log" @@ -20,6 +22,23 @@ import ( "github.com/anacrolix/torrent/version" ) +type Observers struct { + Trackers webtorrent.TrackerObserver + Peers PeerObserver +} + +func NewClientObservers() *Observers { + return &Observers{ + Trackers: webtorrent.TrackerObserver{ + ConnStatus: make(chan webtorrent.TrackerStatus), + AnnounceStatus: make(chan webtorrent.AnnounceStatus), + }, + Peers: PeerObserver{ + PeerStatus: make(chan PeerStatus), + }, + } +} + // Contains config elements that are exclusive to tracker handling. There may be other fields in // ClientConfig that are also relevant. type ClientTrackerConfig struct { @@ -32,6 +51,7 @@ type ClientTrackerConfig struct { // Takes a tracker's hostname and requests DNS A and AAAA records. // Used in case DNS lookups require a special setup (i.e., dns-over-https) LookupTrackerIp func(*url.URL) ([]net.IP, error) + Observers *Observers } type ClientDhtConfig struct { diff --git a/peerconn.go b/peerconn.go index 65800dd695..0c93814d6e 100644 --- a/peerconn.go +++ b/peerconn.go @@ -32,6 +32,16 @@ import ( utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" ) +type PeerStatus struct { + Id PeerID + Ok bool + Err string // see https://github.com/golang/go/issues/5161 +} + +type PeerObserver struct { + PeerStatus chan PeerStatus +} + // Maintains the state of a BitTorrent-protocol based connection with a peer. type PeerConn struct { Peer @@ -90,6 +100,7 @@ type PeerConn struct { // we can verify all the pieces for a file when they're all arrived before submitting them to // the torrent. receivedHashPieces map[[32]byte][][32]byte + Observers *PeerObserver } func (cn *PeerConn) pexStatus() string { @@ -1432,3 +1443,11 @@ func hashRequestFromMessage(m pp.Message) hashRequest { proofLayers: m.ProofLayers, } } +func (c *PeerConn) UpdatePeerConnStatus(status PeerStatus) { + if c.Observers != nil { + select { + case c.Observers.PeerStatus <- status: + default: + } + } +} diff --git a/testing.go b/testing.go index a490927308..1ab74c7a83 100644 --- a/testing.go +++ b/testing.go @@ -1,6 +1,7 @@ package torrent import ( + "github.com/stretchr/testify/require" "testing" "time" @@ -35,3 +36,13 @@ func TestingConfig(t testing.TB) *ClientConfig { //}) return cfg } + +func readChannelTimeout[T any](t *testing.T, channel chan T, duration time.Duration) interface{} { + select { + case s := <-channel: + return s + case <-time.After(duration): + require.Fail(t, "Timeout reading observer channel.") + } + return nil +} diff --git a/torrent.go b/torrent.go index f8115f6562..3cd27e23f0 100644 --- a/torrent.go +++ b/torrent.go @@ -1805,6 +1805,13 @@ func (t *Torrent) assertPendingRequests() { func (t *Torrent) dropConnection(c *PeerConn) { t.cl.event.Broadcast() c.close() + + c.UpdatePeerConnStatus(PeerStatus{ + Id: c.PeerID, + Ok: false, + }) + t.logger.WithDefaultLevel(log.Debug).Printf("dropping connection to %+q, sent peerconn update", c.PeerID) + if t.deletePeerConn(c) { t.openNewConns() } @@ -1865,6 +1872,12 @@ func (t *Torrent) onWebRtcConn( return } localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr()) + + var obs *PeerObserver + if t.cl.config.Observers != nil { + obs = &t.cl.config.Observers.Peers + } + pc, err := t.cl.initiateProtocolHandshakes( context.Background(), netConn, @@ -1876,6 +1889,7 @@ func (t *Torrent) onWebRtcConn( localPublicAddr: localAddrIpPort, network: webrtcNetwork, connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)), + obs: obs, }, ) if err != nil { diff --git a/webtorrent/tracker-client.go b/webtorrent/tracker-client.go index 945097f30d..70f6d900aa 100644 --- a/webtorrent/tracker-client.go +++ b/webtorrent/tracker-client.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/json" "fmt" + "github.com/anacrolix/torrent/types/infohash" "net/http" "sync" "time" @@ -19,6 +20,23 @@ import ( "github.com/anacrolix/torrent/tracker" ) +type TrackerStatus struct { + Url string `json:"url"` + Ok bool `json:"ok"` + Err error `json:"err"` +} + +type AnnounceStatus struct { + TrackerStatus + Event string `json:"event"` + InfoHash string `json:"info_hash"` +} + +type TrackerObserver struct { + ConnStatus chan TrackerStatus + AnnounceStatus chan AnnounceStatus +} + type TrackerClientStats struct { Dials int64 ConvertedInboundConns int64 @@ -33,6 +51,7 @@ type TrackerClient struct { OnConn onDataChannelOpen Logger log.Logger Dialer *websocket.Dialer + Observers *TrackerObserver mu sync.Mutex cond sync.Cond @@ -100,6 +119,7 @@ func (tc *TrackerClient) doWebsocket() error { c, _, err := tc.Dialer.Dial(tc.Url, header) if err != nil { + tc.updateTrackerConnStatus(TrackerStatus{tc.Url, false, err}) return fmt.Errorf("dialing tracker: %w", err) } defer c.Close() @@ -126,6 +146,7 @@ func (tc *TrackerClient) doWebsocket() error { } } }() + tc.updateTrackerConnStatus(TrackerStatus{tc.Url, true, nil}) err = tc.trackerReadLoop(tc.wsConn) close(closeChan) tc.mu.Lock() @@ -134,6 +155,24 @@ func (tc *TrackerClient) doWebsocket() error { return err } +func (tc *TrackerClient) updateTrackerConnStatus(status TrackerStatus) { + if tc.Observers != nil { + select { + case tc.Observers.ConnStatus <- status: + default: + } + } +} + +func (tc *TrackerClient) updateTrackerAnnounceStatus(status AnnounceStatus) { + if tc.Observers != nil { + select { + case tc.Observers.AnnounceStatus <- status: + default: + } + } +} + // Finishes initialization and spawns the run routine, calling onStop when it completes with the // result. We don't let the caller just spawn the runner directly, since then we can race against // .Close to finish initialization. @@ -262,6 +301,17 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error { request, err := tc.GetAnnounceRequest(event, infoHash) + if err != nil { + tc.updateTrackerAnnounceStatus(AnnounceStatus{ + TrackerStatus: TrackerStatus{ + Url: tc.Url, + Ok: false, + Err: err, + }, + Event: "", + InfoHash: infohash.HashBytes(infoHash[:]).HexString(), + }) + } if err != nil { return fmt.Errorf("getting announce parameters: %w", err) } @@ -283,6 +333,16 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte }) } + announceStatus := AnnounceStatus{ + TrackerStatus: TrackerStatus{ + Url: tc.Url, + Ok: true, + Err: nil, + }, + Event: req.Event, + InfoHash: binaryToJsonString(infoHash[:]), + } + data, err := json.Marshal(req) if err != nil { return fmt.Errorf("marshalling request: %w", err) @@ -292,8 +352,12 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte defer tc.mu.Unlock() err = tc.writeMessage(data) if err != nil { + announceStatus.Ok = false + announceStatus.Err = err + tc.updateTrackerAnnounceStatus(announceStatus) return fmt.Errorf("write AnnounceRequest: %w", err) } + tc.updateTrackerAnnounceStatus(announceStatus) for _, offer := range offers { g.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue) } diff --git a/wstracker.go b/wstracker.go index 7f53987bc9..cf04dcf8e6 100644 --- a/wstracker.go +++ b/wstracker.go @@ -3,6 +3,7 @@ package torrent import ( "context" "fmt" + "github.com/anacrolix/torrent/webtorrent" "net" netHttp "net/http" "net/url" @@ -15,7 +16,6 @@ import ( "github.com/anacrolix/torrent/tracker" httpTracker "github.com/anacrolix/torrent/tracker/http" - "github.com/anacrolix/torrent/webtorrent" ) type websocketTrackerStatus struct { @@ -46,6 +46,7 @@ type websocketTrackers struct { OnConn func(datachannel.ReadWriteCloser, webtorrent.DataChannelContext) mu sync.Mutex clients map[string]*refCountedWebtorrentTrackerClient + obs *webtorrent.TrackerObserver Proxy httpTracker.ProxyFunc DialContext func(ctx context.Context, network, addr string) (net.Conn, error) WebsocketTrackerHttpHeader func() netHttp.Header @@ -64,6 +65,7 @@ func (me *websocketTrackers) Get(url string, infoHash [20]byte) (*webtorrent.Tra } value = &refCountedWebtorrentTrackerClient{ TrackerClient: webtorrent.TrackerClient{ + Observers: me.obs, Dialer: dialer, Url: url, GetAnnounceRequest: me.GetAnnounceRequest, From ca2e0dc3e0c5bccc81932e85ed16f8e914bb7e12 Mon Sep 17 00:00:00 2001 From: marcovidonis Date: Tue, 15 Oct 2024 11:41:16 +0100 Subject: [PATCH 2/4] do not support webtorrent/transport_test on wasm --- webtorrent/transport_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/webtorrent/transport_test.go b/webtorrent/transport_test.go index dcb170a061..753339d303 100644 --- a/webtorrent/transport_test.go +++ b/webtorrent/transport_test.go @@ -1,3 +1,6 @@ +//go:build !js +// +build !js + package webtorrent import ( From 8777cb7f9e19cc739eed46884dcebef49486e1d8 Mon Sep 17 00:00:00 2001 From: Marco Vidonis <31407403+marcovidonis@users.noreply.github.com> Date: Mon, 25 Mar 2024 16:32:34 +0000 Subject: [PATCH 3/4] make AnnounceStatus InfoHash into a HexString --- webtorrent/tracker-client.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/webtorrent/tracker-client.go b/webtorrent/tracker-client.go index 70f6d900aa..fe218b2c74 100644 --- a/webtorrent/tracker-client.go +++ b/webtorrent/tracker-client.go @@ -5,11 +5,12 @@ import ( "crypto/rand" "encoding/json" "fmt" - "github.com/anacrolix/torrent/types/infohash" "net/http" "sync" "time" + "github.com/anacrolix/torrent/types/infohash" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/gorilla/websocket" @@ -309,7 +310,7 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte Err: err, }, Event: "", - InfoHash: infohash.HashBytes(infoHash[:]).HexString(), + InfoHash: infohash.T(infoHash).HexString(), }) } if err != nil { @@ -340,7 +341,7 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte Err: nil, }, Event: req.Event, - InfoHash: binaryToJsonString(infoHash[:]), + InfoHash: infohash.T(infoHash).HexString(), } data, err := json.Marshal(req) From f165ff262137a1dd6ea4fe3a2ca2b1909de61aa2 Mon Sep 17 00:00:00 2001 From: marcovidonis Date: Tue, 15 Oct 2024 11:50:09 +0100 Subject: [PATCH 4/4] replace MakeMapIfNilAndSet --- webtorrent/tracker-client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/webtorrent/tracker-client.go b/webtorrent/tracker-client.go index fe218b2c74..00886ac00b 100644 --- a/webtorrent/tracker-client.go +++ b/webtorrent/tracker-client.go @@ -359,8 +359,9 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte return fmt.Errorf("write AnnounceRequest: %w", err) } tc.updateTrackerAnnounceStatus(announceStatus) + g.MakeMapIfNil(&tc.outboundOffers) for _, offer := range offers { - g.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue) + g.MapInsert(tc.outboundOffers, offer.offerId, offer.outboundOfferValue) } return nil }