Skip to content

Commit

Permalink
fix: bilibili source not working
Browse files Browse the repository at this point in the history
  • Loading branch information
Mantouisyummy committed May 28, 2024
1 parent a1a8611 commit 5cd96ac
Showing 1 changed file with 18 additions and 85 deletions.
103 changes: 18 additions & 85 deletions lava/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from os import getenv
from typing import Union, Tuple, Optional

import requests
from bs4 import BeautifulSoup
from lavalink import Source, Client, LoadResult, LoadType, PlaylistInfo, DeferredAudioTrack
from spotipy import Spotify, SpotifyClientCredentials
from yt_dlp import YoutubeDL
Expand Down Expand Up @@ -67,11 +65,11 @@ class SpotifySource(BaseSource):
def __init__(self):
super().__init__()


self.priority = 5

spotify_client_id = getenv("SPOTIFY_CLIENT_ID")
spotify_client_secret = getenv("SPOTIFY_CLIENT_SECRET")

credentials = SpotifyClientCredentials(
client_id=spotify_client_id,
client_secret=spotify_client_secret
Expand Down Expand Up @@ -134,7 +132,6 @@ def __load_track(self, url: str) -> Union[SpotifyAudioTrack, None]:
)
return None


def __load_playlist(self, url: str) -> Tuple[list[SpotifyAudioTrack], Union[PlaylistInfo, None]]:
"""
Get tracks in a playlist with given url from spotify, None if not found
Expand Down Expand Up @@ -164,7 +161,8 @@ def __load_playlist(self, url: str) -> Tuple[list[SpotifyAudioTrack], Union[Play
'isStream': False,
'title': track['track']['name'],
'uri': f"https://open.spotify.com/track/{track['track']['id']}",
'artworkUrl': track['track']['images'][0]['url']
'artworkUrl': track['track']['album']['images'][0]['url']
if track['track']['album'].get('images') else None
},
requester=0
)
Expand Down Expand Up @@ -202,7 +200,7 @@ def __load_album(self, url: str) -> Tuple[list[SpotifyAudioTrack], Union[Playlis
'isStream': False,
'title': track['name'],
'uri': f"https://open.spotify.com/track/{track['id']}",
'artworkUrl': album['images'][0]['url']
'artworkUrl': album['images'][0]['url'] if album.get('images') else None
},
requester=0
)
Expand Down Expand Up @@ -266,106 +264,46 @@ def __init__(self):
super().__init__()

self.priority = 5
self.ytdl = YoutubeDL(
{"format": "bestaudio"}
)

def check_query(self, query: str) -> bool:
return query.startswith('https://www.bilibili.com/video/') or query.startswith('https://b23.tv/')

async def load_item(self, client: Client, query: str) -> Optional[LoadResult]:
audio_url, title, author = self.get_audio(query)
audio_url, title, author, thumbnail = self.get_audio(query)

track = (await client.get_tracks(audio_url, check_local=False)).tracks[0]

track.title = title
track.author = f'{author} / [Bilibili]({query})'
track.artwork_url = thumbnail

return LoadResult(
load_type=LoadType.TRACK,
tracks=[track],
playlist_info=None
)

@staticmethod
def get_video_info(bvid: str) -> Tuple[str, str]:
"""
Gets video info from a Bilibili video bvid
:param bvid: Bilibili video bvid
:return: Tuple of video cid, video session
"""
headers = {
'referer': 'https://www.bilibili.com/',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36'
}
video_index_url = f"https://www.bilibili.com/video/{bvid}"
resp = requests.get(video_index_url, headers=headers).text
cid = re.findall('"cid":(\\d+),', resp)[0]
session = re.findall('"session":"(.*?)"', resp)[0]
return cid, session

def get_audio_url(self, url: str):
"""
Gets audio URL from a Bilibili video URL
:param url: Bilibili video URL
:return: audio URL
"""
headers = {
'referer': 'https://www.bilibili.com/',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36'
}

bvid = re.search(r"/video/([^/?]+)", url).group(1)

cid, session = self.get_video_info(bvid)

play_url = 'https://api.bilibili.com/x/player/playurl'

params = {
'cid': cid,
'qn': '2',
'type': '',
'otype': 'json',
'fourk': '1',
'bvid': bvid,
'fnver': '0',
'fnval': '976',
'session': session,
}

for _ in range(20):
json_data = requests.get(url=play_url, params=params, headers=headers).json()
audio_url = json_data['data']['dash']['audio'][0]['baseUrl']
if audio_url.startswith("https://upos-hz-mirrorakam.akamaized.net/"):
return audio_url

def get_audio(self, url: str) -> Tuple[str, str, str]:
def get_audio(self, url: str) -> Tuple[str, str, str, str]:
"""
Gets audio from a Bilibili video URL
:param url: Bilibili video URL
:return: Tuple of audio URL, video title, video author.
:return: Tuple of audio URL, video title, video author, video thi,
"""
headers = {
'Connection': 'Keep-Alive',
'Accept-Language': 'en-US,en;q=0.8,zh-Hans-CN;q=0.5,zh-Hans;q=0.3',
'Accept': 'text/html, application/xhtml+xml, */*',
'referer': 'https://www.bilibili.com',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:99.0) Gecko/20100101 Firefox/99.0',
}
info = self.ytdl.extract_info(url, download=False)

video_html = requests.get(url, headers=headers)
audio_url = info['formats'][1]['url']

values = video_html.text
author = info.get('uploader', None)

text = BeautifulSoup(values, features='html.parser')
thumbnail = info.get('thumbnail', None)

title = text.find('title').contents[0].replace(' ', ',').replace('/', ',')
title = info.get('fulltitle', None)

author = text.select_one('div.up-detail-top a').text.replace("\n", "")

audio_url = self.get_audio_url(url)

return audio_url, title, author
return audio_url, title, author, thumbnail


class YTDLSource(BaseSource):
Expand Down Expand Up @@ -417,15 +355,12 @@ async def load_item(self, client: Client, query: str) -> Optional[LoadResult]:
)



class SourceManager(Source):
def __init__(self):
super().__init__(name='LavaSourceManager')
super().__init__(name='LavaSourceManager')

self.sources: list[BaseSource] = []

self.logger = getLogger('lava.sources')
self.logger = getLogger('lava.sources')

self.initial_sources()
Expand All @@ -448,7 +383,6 @@ async def load_item(self, client: Client, query: str) -> Optional[LoadResult]:

if not source.check_query(query):
self.logger.debug("Source %s does not match query %s, skipping...", source.__class__.__name__, query)
self.logger.debug("Source %s does not match query %s, skipping...", source.__class__.__name__, query)

continue

Expand All @@ -457,5 +391,4 @@ async def load_item(self, client: Client, query: str) -> Optional[LoadResult]:
return await source.load_item(client, query)

self.logger.info("No sources matched query %s, returning None", query)
return None

return None

0 comments on commit 5cd96ac

Please sign in to comment.