import re import hashlib from typing import Optional, Tuple, List from dataclasses import dataclass from datetime import datetime from zoneinfo import ZoneInfo from pathlib import Path from tempfile import NamedTemporaryFile from threading import Semaphore from concurrent.futures import ThreadPoolExecutor, as_completed from requests import Session from requests.adapters import Retry from requests.cookies import create_cookie from bs4 import BeautifulSoup, Tag from cgi import parse_header from adapter import TimeoutHTTPAdapter @dataclass class Post: boardId: str postId: int authorId: Optional[str] = None authorName: Optional[str] = None category: Optional[str] = None title: Optional[str] = None body: Optional[Tag] = None created_at: Optional[datetime] = None class Scraper(Session): def __init__(self, concurrency = 5): super().__init__() self.semaphore = Semaphore(concurrency) self.headers['User-Agent'] = '(Android)' self.headers['Referer'] = 'https://m.dcinside.com/board/6974gay' self.cookies.set_cookie(create_cookie( name='list_count', value='200', domain='.dcinside.com' )) def download_attachment(self, url: str, save_dir: Path) -> Tuple[str, Path]: with self.semaphore: res = self.get(url, stream=True) res.raise_for_status() hash = hashlib.sha1() # fuck this shit _, parts = parse_header(res.headers.get('Content-Disposition')) fname = parts.get('filename', '') fext = fname.split('.').pop() with NamedTemporaryFile('wb', dir=save_dir) as file: for chunk in res.iter_content(chunk_size=8192): if chunk: file.write(chunk) hash.update(chunk) return url, Path(file.name).rename(save_dir / f'{hash.hexdigest()}.{fext}') def replace_attachment(self, post: Post, save_dir: Path): src_to_tags = { img.attrs['data-original'].strip(): img for img in post.body.select('img[data-original]') } with ThreadPoolExecutor() as executor: futures = [ executor.submit(self.download_attachment, src, save_dir) for src in src_to_tags.keys() ] for future in as_completed(futures): src, path = future.result() src_to_tags[src]['src'] = path def search(self, boardId: str, type: str, offset: int, value: str) -> List[Post]: with self.semaphore: res = self.get( f'https://m.dcinside.com/board/{boardId}', params={ 's_type': type, 's_pos': offset, 'serval': value }) res.raise_for_status() document = BeautifulSoup(res.text, 'html.parser') return [ Post( boardId=boardId, postId=int(re.findall(r'/\d+', tag.attrs.get('href'))[0][1:]) ) for tag in document.select('.gall-detail-lnktb a[href]:first-child') ] def view(self, post: Post, document_path: Path = None, attachment_path: Path = None) -> Post: with self.semaphore: res = self.get(f'https://m.dcinside.com/board/{post.boardId}/{post.postId}') res.raise_for_status() document = BeautifulSoup(res.text, 'html.parser') titleWrapTags = document.select('.gallview-tit-box .ginfo2 > li') timeTag = titleWrapTags.pop() authorTag = titleWrapTags.pop() authorAnchorTag = authorTag.select_one('a') titleParts = ( document .select_one('.gallview-tit-box .tit') .get_text(strip=True) .split('\r\n') ) post.title = titleParts.pop().strip() post.category = titleParts.pop()[1:-1].strip() if authorAnchorTag: post.authorId = re.findall(r'\/\w+$', authorAnchorTag.attrs.get('href'))[0][1:] post.authorName = authorAnchorTag.get_text(strip=True) else: authorParts = authorTag.get_text(strip=True).split('(') post.authorId = authorParts[1][:-1].strip() post.authorName = authorParts[0].strip() post.created_at = ( datetime .strptime(timeTag.get_text(strip=True), '%Y.%m.%d %H:%M') .replace(tzinfo=ZoneInfo('Asia/Seoul')) ) post.body = document.select_one('.thum-txtin') # ㅋㅋㅋㅋz if post.authorName != 'ori': return print(f'yoinked {post.boardId}/{post.postId}, written by {post.authorName}: {post.title}') if attachment_path: self.replace_attachment(post, attachment_path) if document_path: (document_path / f'{post.boardId}_{post.postId}.html').write_text(str(post.body)) scraper = Scraper(concurrency=5) retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[404]) scraper.mount('https://', TimeoutHTTPAdapter(timeout=1, max_retries=retries)) document_path = Path('archives') attachment_path = document_path / 'attachments' attachment_path.mkdir(parents=True, exist_ok=True) try: for offset in range(22760681, 22760681 + (10000 * 100), 10000): with ThreadPoolExecutor() as executor: posts = scraper.search('aoegame', 'name', -offset, 'ori') print(f'searching offset {-offset}, found {len(posts)} post(s)') as_completed([ executor.submit( scraper.view, post, document_path=document_path, attachment_path=attachment_path ) for post in scraper.search('aoegame', 'name', -offset, 'ori') ]) except KeyboardInterrupt: print(':-)')