1
0

Compare commits

..

15 Commits

9 changed files with 346 additions and 80 deletions

12
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,12 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Run scraper",
"type": "debugpy",
"request": "launch",
"module": "scraper",
"justMyCode": true
}
]
}

13
COPYING Normal file
View File

@@ -0,0 +1,13 @@
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
Version 2, December 2004
Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
Everyone is permitted to copy and distribute verbatim or modified
copies of this license document, and changing it is allowed as long
as the name is changed.
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
0. You just DO WHAT THE FUCK YOU WANT TO.

33
cgi.py
View File

@@ -1,33 +0,0 @@
# https://github.com/python/cpython/blob/3511c2e546aaacda5880eb89a94f4e8514b3ce76/Lib/cgi.py#L226-L256
def _parseparam(s):
while s[:1] == ';':
s = s[1:]
end = s.find(';')
while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2:
end = s.find(';', end + 1)
if end < 0:
end = len(s)
f = s[:end]
yield f.strip()
s = s[end:]
def parse_header(line):
"""Parse a Content-type like header.
Return the main content-type and a dictionary of options.
"""
parts = _parseparam(';' + line)
key = parts.__next__()
pdict = {}
for p in parts:
i = p.find('=')
if i >= 0:
name = p[:i].strip().lower()
value = p[i+1:].strip()
if len(value) >= 2 and value[0] == value[-1] == '"':
value = value[1:-1]
value = value.replace('\\\\', '\\').replace('\\"', '"')
pdict[name] = value
return key, pdict

View File

@@ -1 +1 @@
from .post import Post from .post import Attachment, Post

View File

@@ -1,5 +1,5 @@
from typing import Optional from typing import Optional, Dict
from dataclasses import dataclass from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from bs4 import Tag from bs4 import Tag
@@ -7,14 +7,25 @@ from bs4 import Tag
from utils.typings import BoardPath from utils.typings import BoardPath
@dataclass
class Attachment:
url: str
source_url: str
source_filename: Optional[str] = None
source_suffix: Optional[str] = None
hash: Optional[str] = None
error: Optional[str] = None
@dataclass @dataclass
class Post: class Post:
id: int id: int
boardId: str board_id: str
boardPath: BoardPath board_path: BoardPath
authorId: Optional[str] = None author_id: Optional[str] = None
authorName: Optional[str] = None author_name: Optional[str] = None
category: Optional[str] = None category: Optional[str] = None
title: Optional[str] = None title: Optional[str] = None
body: Optional[Tag] = None body: Optional[Tag] = None
attachments: Dict[str, Attachment] = field(default_factory=lambda: {})
created_at: Optional[datetime] = None created_at: Optional[datetime] = None

View File

@@ -9,8 +9,10 @@ frozenlist==1.7.0
idna==3.10 idna==3.10
lxml==6.0.0 lxml==6.0.0
multidict==6.6.3 multidict==6.6.3
orjson==3.11.1
propcache==0.3.2 propcache==0.3.2
soupsieve==2.7 soupsieve==2.7
typing_extensions==4.14.1 typing_extensions==4.14.1
tzdata==2025.2
urllib3==2.5.0 urllib3==2.5.0
yarl==1.20.1 yarl==1.20.1

View File

@@ -1,21 +1,142 @@
import orjson
import asyncio import asyncio
from typing import Optional, List
from dataclasses import dataclass
from argparse import ArgumentParser
from pathlib import Path from pathlib import Path
from datetime import datetime
from utils.middlewares import SemaphoreMiddleware from bs4 import Tag
from utils.scraper import Scraper
from utils.typings import BoardPath, SearchType
from utils.middlewares import Semaphore
from utils.scraper import Scraper, LIST_MAX_POSTS
from models.post import Post
@dataclass
class ArgumentInterface:
board_id: str
board_path: BoardPath
page: int
json_dir: Path
json_format: str
attachment_dir: Path
concurrency: int
search_type: Optional[SearchType]
search_position: Optional[int]
search_date: datetime
search_value: Optional[str]
parser = ArgumentParser()
parser.add_argument('board_id', type=str)
parser.add_argument('--board-path', type=str, default='board')
parser.add_argument('--page', type=int, default=1)
parser.add_argument('--json-dir', type=Path, default=Path('archives'))
parser.add_argument('--json-format', type=str, default='{board_id}_{id}.json')
parser.add_argument('--attachment-dir', type=Path, default=Path('archives/attachments'))
parser.add_argument('--concurrency', type=int, default=5)
parser.add_argument('--search-type', type=str, default=None)
parser.add_argument('--search-position', type=int, default=None)
parser.add_argument('--search-date', type=datetime.fromisoformat, default=datetime.now())
parser.add_argument('--search-value', type=str, default=None)
args = ArgumentInterface(**vars(parser.parse_args()))
def default(obj):
if isinstance(obj, Tag):
return str(obj)
raise TypeError
async def main(): async def main():
args.json_dir.mkdir(parents=True, exist_ok=True)
args.attachment_dir.mkdir(parents=True, exist_ok=True)
middlewares = ( middlewares = (
SemaphoreMiddleware(5), Semaphore(args.concurrency),
) )
async with Scraper(middlewares=middlewares) as scraper: async with Scraper(middlewares=middlewares) as scraper:
posts = await scraper.list('roh', 'person') base_kwargs = {
'board_id': args.board_id,
'board_path': args.board_path,
'search_type': args.search_type,
'search_position': args.search_position,
'search_value': args.search_value
}
# TODO: 존재하는 게시판인지 확인하기
for future in asyncio.as_completed([scraper.view(p) for p in posts]): if args.search_type:
await future # 작성일을 기준으로 검색 시작 위치 가져오기
if not args.search_position:
url = 'https://m.dcinside.com/ajax/searchPrevDay'
data = {
'id': args.board_id, # TODO: board prefix (mi$, person$)
's_type': args.search_type,
'serval': args.search_value,
'date': args.search_date.strftime('%Y-%m-%d'),
}
async with await scraper.post(url, data=data) as response:
payload = await response.json()
print(f'received search position for {args.search_date}: {payload}')
# TODO: 오류 핸들링 (`payload['result'] != true` 또는 빈 페이지 반환)
base_kwargs['search_position'] = payload['s_pos']
search_looped_count = 0
while True:
search_kwargs = {
# 검색 중이라면 검색할 위치 가져오기
'search_position': (
base_kwargs['search_position'] + (10000 * search_looped_count) if
base_kwargs['search_position'] else
None
)
}
search_looped_count += 1
# 게시판 전체를 검색했다면 작업 종료하기
if search_kwargs['search_position'] >= -10000:
break
pages: List[List[Post]] = []
# 모든 페이지 가져오기
# FIXME: 이따구로 코드짜면 검색 결과가 아닌 단순 페이지 열람 불가능함
while (
len(pages) < 1 or
len(pages[-1]) >= LIST_MAX_POSTS
):
kwargs = {
**base_kwargs,
**search_kwargs,
'page': len(pages) + 1
}
posts = await scraper.list(**kwargs)
pages.append(posts)
print(f'found {len(posts)} post(s) from {kwargs['page']} page: {kwargs}')
for future in asyncio.as_completed([
scraper.view(post)
for page in pages for post in page
]):
try:
post = await future
await scraper.download_attachments(post, args.attachment_dir)
(args.json_dir / args.json_format.format(**kwargs, id=post.id)).write_bytes(
orjson.dumps(post, default=default)
)
print(f'{post.board_id}/{post.id}: {post.title} by {post.author_name} ({post.author_id})')
except Exception as e:
print(f'[Exception] {post.board_id}/{post.id}:', e)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -2,7 +2,7 @@ import asyncio
from aiohttp import ClientRequest, ClientResponse, ClientHandlerType from aiohttp import ClientRequest, ClientResponse, ClientHandlerType
class SemaphoreMiddleware(asyncio.Semaphore): class Semaphore(asyncio.Semaphore):
async def __call__(self, req: ClientRequest, handler: ClientHandlerType) -> ClientResponse: async def __call__(self, req: ClientRequest, handler: ClientHandlerType) -> ClientResponse:
async with self: async with self:
return await handler(req) return await handler(req)

View File

@@ -1,13 +1,24 @@
import re import re
import shutil
import hashlib
import mimetypes
import asyncio
import urllib.parse as urlparse
from typing import Optional, List from typing import Optional, List
from pathlib import Path
from tempfile import NamedTemporaryFile
from datetime import datetime from datetime import datetime
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
from aiohttp import ClientSession from aiohttp import ClientSession, ClientError
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from .typings import BoardPath, SearchType from .typings import BoardPath, SearchType
from models import Post from models import Attachment, Post
LIST_MAX_POSTS = 200
class Scraper(ClientSession): class Scraper(ClientSession):
@@ -16,48 +27,47 @@ class Scraper(ClientSession):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
# 모바일 페이지 및 첨부 파일 요청 시 필요한 기본 헤더 값 # 모바일 페이지 및 첨부 파일 요청 시 필요한 기본 헤더 값
self.headers['Accept'] = 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*'
self.headers['User-Agent'] = '(Android)' self.headers['User-Agent'] = '(Android)'
self.headers['Referer'] = 'https://m.dcinside.com/board/aoegame' self.headers['Referer'] = 'https://m.dcinside.com/board/aoegame'
# 게시글 목록 조회로 한 번에 불러올 항목 수 # 게시글 목록 조회로 한 번에 불러올 항목 수
self.cookie_jar.update_cookies({ self.cookie_jar.update_cookies({
'list_count': '200' 'list_count': LIST_MAX_POSTS
}) })
async def __aenter__(self) -> 'Scraper': async def __aenter__(self) -> 'Scraper':
return self return self
async def list( async def list(
self, self,
boardId: str, board_id: str,
boardPath: BoardPath = 'board', board_path: BoardPath = 'board',
page: int = 1, page: int = 1,
categoryId: int = 0, category_id: int = 0,
only_recommended: bool = False, only_recommended: bool = False,
only_notice: bool = False, only_notice: bool = False,
search_type: Optional[SearchType] = None, search_type: Optional[SearchType] = None,
search_position: Optional[int] = None, search_position: Optional[int] = None,
search_value: Optional[str] = None search_value: Optional[str] = None
) -> List[Post]: ) -> List[Post]:
""" """
특정 게시판으로부터 특정 조건에 맞는 게시글 목록을 가져옵니다 특정 게시판으로부터 특정 조건에 맞는 게시글 목록을 가져옵니다
:param boardId: 게시판 아이디 :param board_id: 게시판 아이디
:param boardPath: 게시판 경로(종류) :param board_path: 게시판 경로(종류)
:param page: 페이지 번호 :param page: 페이지 번호
:param categoryId: 말머리 아이디 :param category_id: 말머리 아이디
:param only_recommended: 개념글 게시글만 조회할지? :param only_recommended: 개념글 게시글만 조회할지?
:param only_notice: 공지 게시글만 조회할지? :param only_notice: 공지 게시글만 조회할지?
:param search_type: 검색 종류 :param search_type: 검색 종류
:param search_position: 검색 지점 :param search_position: 검색 지점
:param search_value: 검색어 :param search_value: 검색어
""" """
url = f'https://m.dcinside.com/{board_path}/{board_id}'
url = f'https://m.dcinside.com/{boardPath}/{boardId}'
params = { params = {
'page': page, 'page': page,
'headid': categoryId, 'headid': category_id,
'recommend': only_recommended and '1' or '0', 'recommend': only_recommended and '1' or '0',
'notice': only_notice and '1' or '0', 'notice': only_notice and '1' or '0',
's_type': search_type or '', 's_type': search_type or '',
@@ -71,49 +81,48 @@ class Scraper(ClientSession):
return [ return [
Post( Post(
id=int(re.findall(r'/\d+', tag.select_one('a[href]:first-child')['href'])[0][1:]), id=int(re.findall(r'/\d+', tag.select_one('a[href]')['href'])[0][1:]),
boardId=boardId, board_id=board_id,
boardPath=boardPath board_path=board_path
) )
for tag in document.select('.gall-detail-lnktb') for tag in document.select('.gall-detail-lnktb')
] ]
async def view(self, post: Post): async def view(self, post: Post) -> Post:
""" """
게시글 내용을 조회합니다 게시글 내용을 조회합니다
:param post: 조회할 게시글 인스턴스 :param post: 조회할 게시글 인스턴스
""" """
async with self.get(f'https://m.dcinside.com/{post.board_path}/{post.board_id}/{post.id}') as response:
async with self.get(f'https://m.dcinside.com/{post.boardPath}/{post.boardId}/{post.id}') as response:
html = await response.text() html = await response.text()
document = BeautifulSoup(html, 'lxml') document = BeautifulSoup(html, 'lxml')
# 상단 제목 요소는 `li`로 나누어져있고 무슨 지랄을 해도 정확히 2개임 # 상단 제목 요소는 `li`로 나누어져있고 무슨 지랄을 해도 정확히 2개임
# 만약 아니라면 어처피 파싱 무결성 전체가 깨질테니 예외 처리는 나도 몰?루 # 만약 아니라면 어처피 파싱 무결성 전체가 깨질테니 예외 처리는 나도 몰?루
authorTag, timestampTag, *_ = document.select('.gallview-tit-box .ginfo2 > li') author_tag, timestamp_tag, *_ = document.select('.gallview-tit-box .ginfo2 > li')
authorAnchorTag = authorTag.select_one('a') author_anchor_tag = author_tag.select_one('a')
# 작성일 파싱 # 작성일 파싱
post.created_at = ( post.created_at = (
datetime datetime
.strptime(timestampTag.get_text(strip=True), '%Y.%m.%d %H:%M') .strptime(timestamp_tag.get_text(strip=True), '%Y.%m.%d %H:%M')
.replace(tzinfo=ZoneInfo('Asia/Seoul')) .replace(tzinfo=ZoneInfo('Asia/Seoul'))
) )
# 작성자 정보 파싱 # 작성자 정보 파싱
if authorAnchorTag: if author_anchor_tag:
# 작성자 요소에 앵커 태그가 있다면 갤로그가 존재하는 상태임 # 작성자 요소에 앵커 태그가 있다면 갤로그가 존재하는 상태임
post.authorId = re.findall(r'\/\w+$', authorAnchorTag['href'])[0][1:] post.author_id = re.findall(r'\/\w+$', author_anchor_tag['href'])[0][1:]
post.authorName = authorAnchorTag.get_text(strip=True) post.author_name = author_anchor_tag.get_text(strip=True)
else: else:
authorParts = authorTag.get_text(strip=True).split('(') author_parts = author_tag.get_text(strip=True).split('(')
post.authorId = authorParts.pop()[:-1].strip() # 123.123) -> 123.123 post.author_id = author_parts.pop()[:-1].strip() # 123.123) -> 123.123
post.authorName = authorParts.pop().strip() post.author_name = author_parts.pop().strip()
# 모바일 웹에서 말머리와 제목은 `\n`으로 분리되어있음 # 모바일 웹에서 말머리와 제목은 `\n`으로 분리되어있음
titleTexts = ( title_texts = (
document document
.select_one('.gallview-tit-box .tit') .select_one('.gallview-tit-box .tit')
.get_text(strip=True) .get_text(strip=True)
@@ -121,8 +130,10 @@ class Scraper(ClientSession):
) )
# 제목과 말머리 파싱 # 제목과 말머리 파싱
post.title = titleTexts.pop().strip() post.title = title_texts.pop().strip()
post.category = titleTexts.pop()[1:~1].strip() # [XX] -> XX
if title_texts:
post.category = title_texts.pop()[1:~1].strip() # [XX] -> XX
# 본문 파싱 # 본문 파싱
post.body = document.select_one('.thum-txtin') post.body = document.select_one('.thum-txtin')
@@ -130,5 +141,134 @@ class Scraper(ClientSession):
# 불필요한 본문 요소 제거 # 불필요한 본문 요소 제거
for tag in post.body.select('script, style'): for tag in post.body.select('script, style'):
tag.extract() tag.extract()
return post
print(f'{post.boardId}/{post.id}: {post.title}') async def fetch_voice(self, id: str):
"""
모바일 웹의 보이스 리플 iframe 페이지로부터 실제 파일 경로를 가져옵니다
:param id: 보이스 리플 아이디 (`vr` 인자)
"""
params = {
'vr': id,
'vr_open': 1
}
async with await self.get('https://m.dcinside.com/voice/player', params=params) as response:
html = await response.text()
document = BeautifulSoup(html, 'lxml')
return document.select_one('input')['value']
async def fetch_video(self, id: str):
"""
모바일 웹의 동영상 iframe 페이지로부터 실제 파일 경로를 가져옵니다
:param id: 동영상 아이디 (`no` 인자)
"""
params = {
'no': id
}
async with await self.get('https://m.dcinside.com/movie/player', params=params) as response:
html = await response.text()
document = BeautifulSoup(html, 'lxml')
return document.select_one('source')['src']
async def download_attachment(self, url: str, save_dir: Path) -> Attachment:
"""
첨부 파일을 받아옵니다
:param url: 받아올 첨부 파일의 주소
:param save_dir: 받아질 로컬 디렉터리 경로
"""
url_parsed = urlparse.urlparse(url)
url_params = urlparse.parse_qs(url_parsed.query)
hash = hashlib.sha1()
attachment = Attachment(
url=url,
source_url=url
)
if url.startswith('https://m.dcinside.com/voice/player'):
# 보이스 리플
attachment.source_url = await self.fetch_voice(url_params.get('vr'))
elif url.startswith('https://m.dcinside.com/movie/player'):
# 동영상
attachment.source_url = await self.fetch_video(url_params.get('no'))
with NamedTemporaryFile('wb') as temp_file:
async with await self.get(attachment.source_url) as response:
async for chunk, _ in response.content.iter_chunks():
temp_file.write(chunk)
hash.update(chunk)
temp_file.flush()
# Content-Type 헤더로부터 확장자 알아내기
attachment.source_suffix = mimetypes.guess_extension(response.content_type)
# Content-Disposition 헤더로부터 실제 파일 이름과 확장자 알아내기
# FIXME: 이런 개시발 디시에서 헤더 인코딩을 터트려서 보내주는군요 (latin-1?)
if response.content_disposition and response.content_disposition.filename:
attachment.source_filename = response.content_disposition.filename
attachment.source_suffix = Path(attachment.source_filename).suffix
attachment.hash = hash.hexdigest()
saved_path = save_dir / f'{attachment.hash}{attachment.source_suffix}'
# 임시로 받은 파일 옮기기
if not saved_path.exists():
shutil.copy2(temp_file.name, saved_path)
return attachment
async def download_attachments(
self,
post: Post,
save_dir: Path
):
"""
게시글에 첨부된 이미지, 동영상, 음성 등 첨부 파일을 받아옵니다
:param post: 게시글 인스턴스
:param save_dir: 받아질 로컬 디렉터리 경로
"""
urls = [
# 이미지
*[
# 움짤은 자동 변환 후 `data-gif`와 `data-mp4`로 반환됨
# TODO: bad code, clean shit up
tag.attrs.get('data-mp4', tag['data-original']).strip()
for tag in post.body.select('img[data-original]')
],
# 보이스 리플 및 동영상
*filter(
lambda url: (
url.startswith('https://m.dcinside.com/voice/player') or
url.startswith('https://m.dcinside.com/movie/player')
),
[
tag['src'].strip()
for tag in post.body.select('iframe')
]
)
]
futures = [
self.download_attachment(url, save_dir)
for url in filter(
lambda x: x not in post.attachments,
urls
)
]
async for future in asyncio.as_completed(futures):
# TODO: 오류 핸들링
attachment = await future
post.attachments[attachment.url] = attachment