Khinsider_Scrapper/Scrapper.py
2022-04-10 04:24:09 +02:00

311 lines
11 KiB
Python

import mimetypes
import time as _time
from abc import ABC
import re
from urllib.request import unquote
import requests
from bs4 import BeautifulSoup as BeautifulSoup
from Objects import ALBUM, ENTRY, OjectTypeNotSupported
from pathlib import Path
from os.path import basename, abspath, normpath, join as joinpath
from Tools import progress_bar
'''
Classes used to scrap pages and gather their information.
'''
class __default(object):
def __init__(self):
self._found_entries = []
_found_entries: [ENTRY, ALBUM]
source = None
domain_url = None
# supported_entries = [ENTRY, ALBUM]
supported_entries = [ENTRY, ALBUM]
# def _generate_index(self, sleep=0, re_pattern=None, url=None):
# """
# Finds entries and stores them in self._found_entries
# :param url: Give an url to generate an index (unused)
# :param sleep: used to avoid bombarding the page suddenly
# :param re_pattern: used to filter by the entries found.
# """
# raise NotImplementedError()
def generate_index(self, sleep=0, re_pattern=None, url=None):
"""
Gets a list of entries generated from the main index
:param url: Give an url to generate an index (unused)
:param sleep: used to avoid bombarding the page suddenly
:param re_pattern: used to filter by the entries found.
:return: list of Album items
"""
self._found_entries = []
self.append_index(sleep=sleep, re_pattern=re_pattern, url=url)
def _append_index(self, sleep=0, re_pattern=None, url=None):
"""
Finds entries and stores them in self._found_entries
:param url: Give an url to generate an index (unused)
:param sleep: used to avoid bombarding the page suddenly
:param re_pattern: used to filter by the entries found.
"""
raise NotImplementedError()
def append_index(self, sleep=0, re_pattern=None, url=None):
"""
Gets a list of entries generated from the main index
:param url: Give an url to generate an index (unused)
:param sleep: used to avoid bombarding the page suddenly
:param re_pattern: used to filter by the entries found.
:return: list of Album items
"""
self._append_index(sleep=sleep, re_pattern=re_pattern, url=url)
# def generate_music_index(self, sleep=0, re_pattern=None, url=None) -> list[ALBUM]:
# """
# Gets a list of entries generated from the main index
# :param sleep: used to avoid bombarding the page suddenly
# :param re_pattern: used to filter by the entries found.
# :return: list of Album items
# """
# raise NotImplementedError()
def __scrap_index(self, url) -> list:
"""
Returns a list of elements found in the index url given
:param url: Url used
:return: list of
"""
raise NotImplementedError()
def _inspect_entry(self, obj: ENTRY):
raise NotImplementedError()
def inspect(self, obj: ENTRY) -> None:
"""
Checks the type of the object given.
If valid: redirects the object given to __inspect_entry
If not valid: Raises an Exception.
"""
if type(obj) not in self.supported_entries:
raise OjectTypeNotSupported()
else:
self._inspect_entry(obj)
def _get_files(self, obj) -> list[str]:
"""
Returns a list of files from the url given
:param url:
:return: list
"""
raise NotImplementedError()
def get_files(self, obj, images=True) -> list[str]:
"""
Checks the type of the object given.
If valid: redirects the object given to _return_files
If not valid: Raises an Exception.
"""
if type(obj) not in self.supported_entries:
raise OjectTypeNotSupported()
else:
return self._get_files(obj)
def download_album(self, album: ALBUM | ENTRY, deposit_folder: str):
print(f"\t> {album.name}")
# create folder(s)
album_folder = normpath(abspath(joinpath(deposit_folder, album.name.replace(":", "_"))))
# get files
files = self.get_files(album)
# skip not available
# download files
for url in files:
self.download_file(url=url, destination_folder=album_folder)
print()
@staticmethod
def download_file(url: str, destination_folder: str, file_name: str = None) -> None:
url = unquote(url)
file_name = file_name or basename(url)
"""Downloads the file and places it to the given destination"""
mime, encoding = mimetypes.guess_type(url)
if mime:
Path(destination_folder).mkdir(parents=True, exist_ok=True)
_file_path: str = f'{destination_folder}/{file_name}'
with requests.get(url, stream=True) as response:
with open(abspath(normpath(_file_path)), 'wb') as f:
total_length = response.headers.get('content-length')
if total_length is None: # no content length header
f.write(response.content)
else:
progress = 0
total_length = int(total_length)
for chunk in response.iter_content(chunk_size=1024):
progress += len(chunk)
f.write(chunk)
progress_bar(filename=file_name, total=total_length, current=progress)
@property
def found_entries(self) -> [ENTRY, ALBUM]:
return self._found_entries
@found_entries.setter
def found_entries(self, x):
# validations?
if type(x) is not list:
x = [x]
self._found_entries = x
def find_entry(self, keyword) -> [ENTRY, ALBUM]:
entry_list: [ENTRY, ALBUM] = []
def __string__cleanup(text: str):
text = text.lower()
re.sub(r'[^\w]', '', text)
return text
for entry in self._found_entries:
# find?
if __string__cleanup(keyword) in __string__cleanup(entry.name):
entry_list.append(entry)
return entry_list
def __download_found(self):
raise NotImplementedError
def download_found(self):
"""
Download all found albums
"""
self.__download_found()
class Khinsider(__default, ABC):
"""
This page only returns Albums
"""
source = 'khinsider'
domain = "khinsider.com"
supported_entries = [ENTRY, ALBUM]
__domain_url = f"https://downloads.{domain}"
__base_index_url = f"{__domain_url}/game-soundtracks/browse"
def _append_index(self, sleep=0, re_pattern=None, url=None):
if url:
self.__scrap_index(url)
_time.sleep(sleep)
else:
for char in "#ABCDEFGHIJKLMNOPQRSTUVWXYZ":
self.append_index(sleep=sleep, re_pattern=re_pattern, url=f'{self.__base_index_url}/{char}')
def __scrap_index(self, url: str) -> None:
print(f"Scrapping index from {url}")
rq = requests.get(url)
if rq.status_code != 200:
raise Exception('F')
else:
soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find(name='p',
attrs={'align': "left"})
for link in soup.findAll('a'):
if link.contents is None:
pass
else:
_json: dict = {
'name': link.contents[0],
'url': f'{self.__domain_url}{link.get("href")}',
'source': self.source,
'available': True
}
self._found_entries.append(ALBUM(**_json))
def _album_from_url(self, url: str) -> ALBUM:
# https://downloads.khinsider.com/game-soundtracks/album/persona-5-royal
_json: dict = {
'name': "",
'url': url,
'source': self.source,
'available': False
}
rq = requests.get(url)
if rq.status_code != 200:
available = False
raise Exception('Album not available')
else:
available = True
soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find(name='p', attrs={
'align': "left"})
name = any([line.lower().replace("Album name: ".lower(), "").strip() for line in soup.text.splitlines() if
"Number of Files:".lower() in line.lower()])
_json['name'] = name
_json['available'] = available
return ALBUM(**_json)
def _inspect_entry(self, obj) -> None:
if type(obj) is ALBUM:
obj: ALBUM
rq = requests.get(obj.url)
if rq.status_code != 200:
obj.available = False
raise Exception('Album not available')
else:
obj.date_added = None
soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find(name='p', attrs={
'align': "left"})
obj.number_of_tracks = \
[line.lower().replace("Number of Files: ".lower(), "").strip() for line in soup.text.splitlines() if
"Number of Files:".lower() in line.lower()][0]
obj.date_added = \
[line.lower().replace("Date added: ".lower(), "").strip() for line in soup.text.splitlines() if
"Date added: ".lower() in line.lower()][0]
obj.available = True
def _get_files(self, obj, get_images=True) -> list[str]:
file_list: [ENTRY, ALBUM, ] = []
if type(obj) is ALBUM:
obj: ALBUM
rq = requests.get(obj.url)
if rq.status_code != 200:
obj.available = False
raise Exception('F')
else:
obj.date_added = None
music_soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').find(
name='table', id='songlist').findAll('tr')
if get_images:
image_soup = BeautifulSoup(rq.text, 'html.parser').find(name='div', id='pageContent').findNext(
name="table").findAll(name="a")
for img in image_soup:
_url = img.get('href')
file_list.append(_url)
for element in music_soup:
td = element.find(name='td', attrs={'class': 'clickable-row'})
if td:
_url = f"{self.__domain_url}{td.find('a').get('href')}"
rq2 = requests.get(_url)
if rq2.status_code != 200:
raise Exception('F')
else:
soup2 = [p.find('a') for p in
BeautifulSoup(rq2.text, 'html.parser').find(name='div', id='pageContent').findAll(
'p')[2:] if p.find(name='a')]
for element2 in soup2:
file_list.append(element2.get("href"))
return file_list
def __download_found(self):
for entry in self.found_entries:
self.download_album(entry)