1
0

feat: add simple scraping script

This commit is contained in:
2025-08-04 14:50:13 +09:00
parent 4efd92bcb0
commit 65ba2f7ee2
2 changed files with 125 additions and 13 deletions

View File

@@ -9,6 +9,7 @@ frozenlist==1.7.0
idna==3.10
lxml==6.0.0
multidict==6.6.3
orjson==3.11.1
propcache==0.3.2
soupsieve==2.7
typing_extensions==4.14.1

View File

@@ -1,31 +1,142 @@
import json
import orjson
import asyncio
from typing import Optional, List
from dataclasses import dataclass
from argparse import ArgumentParser
from pathlib import Path
from datetime import datetime
from utils.middlewares import SemaphoreMiddleware
from utils.scraper import Scraper
from bs4 import Tag
from utils.typings import BoardPath, SearchType
from utils.middlewares import Semaphore
from utils.scraper import Scraper, LIST_MAX_POSTS
from models.post import Post
archive_dir = Path('archives')
@dataclass
class ArgumentInterface:
board_id: str
board_path: BoardPath
page: int
json_dir: Path
json_format: str
attachment_dir: Path
concurrency: int
search_type: Optional[SearchType]
search_position: Optional[int]
search_date: datetime
search_value: Optional[str]
parser = ArgumentParser()
parser.add_argument('board_id', type=str)
parser.add_argument('--board-path', type=str, default='board')
parser.add_argument('--page', type=int, default=1)
parser.add_argument('--json-dir', type=Path, default=Path('archives'))
parser.add_argument('--json-format', type=str, default='{board_id}_{id}.json')
parser.add_argument('--attachment-dir', type=Path, default=Path('archives/attachments'))
parser.add_argument('--concurrency', type=int, default=5)
parser.add_argument('--search-type', type=str, default=None)
parser.add_argument('--search-position', type=int, default=None)
parser.add_argument('--search-date', type=datetime.fromisoformat, default=datetime.now())
parser.add_argument('--search-value', type=str, default=None)
args = ArgumentInterface(**vars(parser.parse_args()))
def default(obj):
if isinstance(obj, Tag):
return str(obj)
raise TypeError
async def main():
args.json_dir.mkdir(parents=True, exist_ok=True)
args.attachment_dir.mkdir(parents=True, exist_ok=True)
middlewares = (
SemaphoreMiddleware(5),
Semaphore(args.concurrency),
)
async with Scraper(middlewares=middlewares) as scraper:
post = Post(
id=2341247,
boardId='event_voicere',
boardPath='board'
base_kwargs = {
'board_id': args.board_id,
'board_path': args.board_path,
'search_type': args.search_type,
'search_position': args.search_position,
'search_value': args.search_value
}
# TODO: 존재하는 게시판인지 확인하기
if args.search_type:
# 작성일을 기준으로 검색 시작 위치 가져오기
if not args.search_position:
url = 'https://m.dcinside.com/ajax/searchPrevDay'
data = {
'id': args.board_id, # TODO: board prefix (mi$, person$)
's_type': args.search_type,
'serval': args.search_value,
'date': args.search_date.strftime('%Y-%m-%d'),
}
async with await scraper.post(url, data=data) as response:
payload = await response.json()
print(f'received search position for {args.search_date}: {payload}')
# TODO: 오류 핸들링 (`payload['result'] != true` 또는 빈 페이지 반환)
base_kwargs['search_position'] = payload['s_pos']
search_looped_count = 0
while True:
search_kwargs = {
# 검색 중이라면 검색할 위치 가져오기
'search_position': (
base_kwargs['search_position'] + (10000 * search_looped_count) if
base_kwargs['search_position'] else
None
)
}
await scraper.view(post)
await scraper.download_attachments(post, archive_dir)
search_looped_count += 1
print(post)
# 게시판 전체를 검색했다면 작업 종료하기
if search_kwargs['search_position'] >= -10000:
break
pages: List[List[Post]] = []
# 모든 페이지 가져오기
# FIXME: 이따구로 코드짜면 검색 결과가 아닌 단순 페이지 열람 불가능함
while (
len(pages) < 1 or
len(pages[-1]) >= LIST_MAX_POSTS
):
kwargs = {
**base_kwargs,
**search_kwargs,
'page': len(pages) + 1
}
posts = await scraper.list(**kwargs)
pages.append(posts)
print(f'found {len(posts)} post(s) from {kwargs['page']} page: {kwargs}')
for future in asyncio.as_completed([
scraper.view(post)
for page in pages for post in page
]):
try:
post = await future
await scraper.download_attachments(post, args.attachment_dir)
(args.json_dir / args.json_format.format(**kwargs, id=post.id)).write_bytes(
orjson.dumps(post, default=default)
)
print(f'{post.board_id}/{post.id}: {post.title} by {post.author_name} ({post.author_id})')
except Exception as e:
print(f'[Exception] {post.board_id}/{post.id}:', e)
if __name__ == '__main__':