import orjson import asyncio from typing import Optional, List from dataclasses import dataclass from argparse import ArgumentParser from pathlib import Path from datetime import datetime from bs4 import Tag from utils.typings import BoardPath, SearchType from utils.middlewares import Semaphore from utils.scraper import Scraper, LIST_MAX_POSTS from models.post import Post @dataclass class ArgumentInterface: board_id: str board_path: BoardPath page: int json_dir: Path json_format: str attachment_dir: Path concurrency: int search_type: Optional[SearchType] search_position: Optional[int] search_date: datetime search_value: Optional[str] parser = ArgumentParser() parser.add_argument('board_id', type=str) parser.add_argument('--board-path', type=str, default='board') parser.add_argument('--page', type=int, default=1) parser.add_argument('--json-dir', type=Path, default=Path('archives')) parser.add_argument('--json-format', type=str, default='{board_id}_{id}.json') parser.add_argument('--attachment-dir', type=Path, default=Path('archives/attachments')) parser.add_argument('--concurrency', type=int, default=5) parser.add_argument('--search-type', type=str, default=None) parser.add_argument('--search-position', type=int, default=None) parser.add_argument('--search-date', type=datetime.fromisoformat, default=datetime.now()) parser.add_argument('--search-value', type=str, default=None) args = ArgumentInterface(**vars(parser.parse_args())) def default(obj): if isinstance(obj, Tag): return str(obj) raise TypeError async def main(): args.json_dir.mkdir(parents=True, exist_ok=True) args.attachment_dir.mkdir(parents=True, exist_ok=True) middlewares = ( Semaphore(args.concurrency), ) async with Scraper(middlewares=middlewares) as scraper: base_kwargs = { 'board_id': args.board_id, 'board_path': args.board_path, 'search_type': args.search_type, 'search_position': args.search_position, 'search_value': args.search_value } # TODO: 존재하는 게시판인지 확인하기 if args.search_type: # 작성일을 기준으로 검색 시작 위치 가져오기 if not args.search_position: url = 'https://m.dcinside.com/ajax/searchPrevDay' data = { 'id': args.board_id, # TODO: board prefix (mi$, person$) 's_type': args.search_type, 'serval': args.search_value, 'date': args.search_date.strftime('%Y-%m-%d'), } async with await scraper.post(url, data=data) as response: payload = await response.json() print(f'received search position for {args.search_date}: {payload}') # TODO: 오류 핸들링 (`payload['result'] != true` 또는 빈 페이지 반환) base_kwargs['search_position'] = payload['s_pos'] search_looped_count = 0 while True: search_kwargs = { # 검색 중이라면 검색할 위치 가져오기 'search_position': ( base_kwargs['search_position'] + (10000 * search_looped_count) if base_kwargs['search_position'] else None ) } search_looped_count += 1 # 게시판 전체를 검색했다면 작업 종료하기 if search_kwargs['search_position'] >= -10000: break pages: List[List[Post]] = [] # 모든 페이지 가져오기 # FIXME: 이따구로 코드짜면 검색 결과가 아닌 단순 페이지 열람 불가능함 while ( len(pages) < 1 or len(pages[-1]) >= LIST_MAX_POSTS ): kwargs = { **base_kwargs, **search_kwargs, 'page': len(pages) + 1 } posts = await scraper.list(**kwargs) pages.append(posts) print(f'found {len(posts)} post(s) from {kwargs['page']} page: {kwargs}') for future in asyncio.as_completed([ scraper.view(post) for page in pages for post in page ]): try: post = await future await scraper.download_attachments(post, args.attachment_dir) (args.json_dir / args.json_format.format(**kwargs, id=post.id)).write_bytes( orjson.dumps(post, default=default) ) print(f'{post.board_id}/{post.id}: {post.title} by {post.author_name} ({post.author_id})') except Exception as e: print(f'[Exception] {post.board_id}/{post.id}:', e) if __name__ == '__main__': asyncio.run(main())