1
0

Compare commits

...

18 Commits

12 changed files with 476 additions and 212 deletions

12
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,12 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Run scraper",
"type": "debugpy",
"request": "launch",
"module": "scraper",
"justMyCode": true
}
]
}

13
COPYING Normal file
View File

@@ -0,0 +1,13 @@
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
Version 2, December 2004
Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
Everyone is permitted to copy and distribute verbatim or modified
copies of this license document, and changing it is allowed as long
as the name is changed.
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
0. You just DO WHAT THE FUCK YOU WANT TO.

View File

@@ -1,14 +0,0 @@
from requests.adapters import HTTPAdapter
class WhyTheFuckRequestsHasNoTimeoutInAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
if "timeout" in kwargs:
self.timeout = kwargs["timeout"]
del kwargs["timeout"]
super().__init__(*args, **kwargs)
def send(self, request, **kwargs):
timeout = kwargs.get("timeout")
if timeout is None and hasattr(self, 'timeout'):
kwargs["timeout"] = self.timeout
return super().send(request, **kwargs)

33
cgi.py
View File

@@ -1,33 +0,0 @@
# https://github.com/python/cpython/blob/3511c2e546aaacda5880eb89a94f4e8514b3ce76/Lib/cgi.py#L226-L256
def _parseparam(s):
while s[:1] == ';':
s = s[1:]
end = s.find(';')
while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2:
end = s.find(';', end + 1)
if end < 0:
end = len(s)
f = s[:end]
yield f.strip()
s = s[end:]
def parse_header(line):
"""Parse a Content-type like header.
Return the main content-type and a dictionary of options.
"""
parts = _parseparam(';' + line)
key = parts.__next__()
pdict = {}
for p in parts:
i = p.find('=')
if i >= 0:
name = p[:i].strip().lower()
value = p[i+1:].strip()
if len(value) >= 2 and value[0] == value[-1] == '"':
value = value[1:-1]
value = value.replace('\\\\', '\\').replace('\\"', '"')
pdict[name] = value
return key, pdict

1
models/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .post import Attachment, Post

31
models/post.py Normal file
View File

@@ -0,0 +1,31 @@
from typing import Optional, Dict
from dataclasses import dataclass, field
from datetime import datetime
from bs4 import Tag
from utils.typings import BoardPath
@dataclass
class Attachment:
url: str
source_url: str
source_filename: Optional[str] = None
source_suffix: Optional[str] = None
hash: Optional[str] = None
error: Optional[str] = None
@dataclass
class Post:
id: int
board_id: str
board_path: BoardPath
author_id: Optional[str] = None
author_name: Optional[str] = None
category: Optional[str] = None
title: Optional[str] = None
body: Optional[Tag] = None
attachments: Dict[str, Attachment] = field(default_factory=lambda: {})
created_at: Optional[datetime] = None

View File

@@ -1,8 +1,18 @@
aiohappyeyeballs==2.6.1
aiohttp==3.12.15
aiosignal==1.4.0
attrs==25.3.0
beautifulsoup4==4.13.4
certifi==2025.7.14
charset-normalizer==3.4.2
frozenlist==1.7.0
idna==3.10
requests==2.32.4
lxml==6.0.0
multidict==6.6.3
orjson==3.11.1
propcache==0.3.2
soupsieve==2.7
typing_extensions==4.14.1
tzdata==2025.2
urllib3==2.5.0
yarl==1.20.1

View File

@@ -1,187 +1,143 @@
import re
import hashlib
import orjson
import asyncio
from typing import Optional, Tuple, List
from typing import Optional, List
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo
from argparse import ArgumentParser
from pathlib import Path
from tempfile import NamedTemporaryFile
from datetime import datetime
from threading import Semaphore
from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import Tag
from requests import Session
from requests.adapters import Retry
from requests.cookies import create_cookie
from bs4 import BeautifulSoup, Tag
from cgi import parse_header
from adapter import WhyTheFuckRequestsHasNoTimeoutInAdapter
from utils.typings import BoardPath, SearchType
from utils.middlewares import Semaphore
from utils.scraper import Scraper, LIST_MAX_POSTS
from models.post import Post
@dataclass
class Post:
boardId: str
postId: int
authorId: Optional[str] = None
authorName: Optional[str] = None
category: Optional[str] = None
title: Optional[str] = None
body: Optional[Tag] = None
created_at: Optional[datetime] = None
class ArgumentInterface:
board_id: str
board_path: BoardPath
page: int
json_dir: Path
json_format: str
attachment_dir: Path
concurrency: int
search_type: Optional[SearchType]
search_position: Optional[int]
search_date: datetime
search_value: Optional[str]
parser = ArgumentParser()
parser.add_argument('board_id', type=str)
parser.add_argument('--board-path', type=str, default='board')
parser.add_argument('--page', type=int, default=1)
parser.add_argument('--json-dir', type=Path, default=Path('archives'))
parser.add_argument('--json-format', type=str, default='{board_id}_{id}.json')
parser.add_argument('--attachment-dir', type=Path, default=Path('archives/attachments'))
parser.add_argument('--concurrency', type=int, default=5)
parser.add_argument('--search-type', type=str, default=None)
parser.add_argument('--search-position', type=int, default=None)
parser.add_argument('--search-date', type=datetime.fromisoformat, default=datetime.now())
parser.add_argument('--search-value', type=str, default=None)
args = ArgumentInterface(**vars(parser.parse_args()))
class Scraper(Session):
def default(obj):
if isinstance(obj, Tag):
return str(obj)
raise TypeError
def __init__(self, concurrency = 5):
super().__init__()
async def main():
args.json_dir.mkdir(parents=True, exist_ok=True)
args.attachment_dir.mkdir(parents=True, exist_ok=True)
self.semaphore = Semaphore(concurrency)
middlewares = (
Semaphore(args.concurrency),
)
self.headers['User-Agent'] = '(Android)'
self.headers['Referer'] = 'https://m.dcinside.com/board/6974gay'
self.cookies.set_cookie(create_cookie(
name='list_count',
value='200',
domain='.dcinside.com'
))
def download_attachment(self, url: str, save_dir: Path) -> Tuple[str, Path]:
with self.semaphore:
res = self.get(url, stream=True)
res.raise_for_status()
hash = hashlib.sha1()
# fuck this shit
_, parts = parse_header(res.headers.get('Content-Disposition'))
fname = parts.get('filename', '')
fext = fname.split('.').pop()
with NamedTemporaryFile('wb', dir=save_dir) as file:
for chunk in res.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
hash.update(chunk)
return url, Path(file.name).rename(save_dir / f'{hash.hexdigest()}.{fext}')
def replace_attachment(self, post: Post, save_dir: Path):
src_to_tags = {
img.attrs['data-original'].strip(): img
for img in post.body.select('img[data-original]')
async with Scraper(middlewares=middlewares) as scraper:
base_kwargs = {
'board_id': args.board_id,
'board_path': args.board_path,
'search_type': args.search_type,
'search_position': args.search_position,
'search_value': args.search_value
}
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.download_attachment, src, save_dir)
for src in src_to_tags.keys()
]
# TODO: 존재하는 게시판인지 확인하기
for future in as_completed(futures):
src, path = future.result()
src_to_tags[src]['src'] = path
if args.search_type:
# 작성일을 기준으로 검색 시작 위치 가져오기
if not args.search_position:
url = 'https://m.dcinside.com/ajax/searchPrevDay'
data = {
'id': args.board_id, # TODO: board prefix (mi$, person$)
's_type': args.search_type,
'serval': args.search_value,
'date': args.search_date.strftime('%Y-%m-%d'),
}
def search(self, boardId: str, type: str, offset: int, value: str) -> List[Post]:
with self.semaphore:
res = self.get(
f'https://m.dcinside.com/board/{boardId}',
params={
's_type': type,
's_pos': offset,
'serval': value
})
res.raise_for_status()
async with await scraper.post(url, data=data) as response:
payload = await response.json()
print(f'received search position for {args.search_date}: {payload}')
document = BeautifulSoup(res.text, 'html.parser')
return [
Post(
boardId=boardId,
postId=int(re.findall(r'/\d+', tag.attrs.get('href'))[0][1:])
# TODO: 오류 핸들링 (`payload['result'] != true` 또는 빈 페이지 반환)
base_kwargs['search_position'] = payload['s_pos']
search_looped_count = 0
while True:
search_kwargs = {
# 검색 중이라면 검색할 위치 가져오기
'search_position': (
base_kwargs['search_position'] + (10000 * search_looped_count) if
base_kwargs['search_position'] else
None
)
for tag in document.select('.gall-detail-lnktb a[href]:first-child')
]
}
def view(self, post: Post, document_path: Path = None, attachment_path: Path = None) -> Post:
with self.semaphore:
res = self.get(f'https://m.dcinside.com/board/{post.boardId}/{post.postId}')
res.raise_for_status()
search_looped_count += 1
document = BeautifulSoup(res.text, 'html.parser')
# 게시판 전체를 검색했다면 작업 종료하기
if search_kwargs['search_position'] >= -10000:
break
titleWrapTags = document.select('.gallview-tit-box .ginfo2 > li')
pages: List[List[Post]] = []
timeTag = titleWrapTags.pop()
authorTag = titleWrapTags.pop()
authorAnchorTag = authorTag.select_one('a')
# 모든 페이지 가져오기
# FIXME: 이따구로 코드짜면 검색 결과가 아닌 단순 페이지 열람 불가능함
while (
len(pages) < 1 or
len(pages[-1]) >= LIST_MAX_POSTS
):
kwargs = {
**base_kwargs,
**search_kwargs,
'page': len(pages) + 1
}
posts = await scraper.list(**kwargs)
pages.append(posts)
print(f'found {len(posts)} post(s) from {kwargs['page']} page: {kwargs}')
titleParts = (
document
.select_one('.gallview-tit-box .tit')
.get_text(strip=True)
.split('\r\n')
for future in asyncio.as_completed([
scraper.view(post)
for page in pages for post in page
]):
try:
post = await future
await scraper.download_attachments(post, args.attachment_dir)
(args.json_dir / args.json_format.format(**kwargs, id=post.id)).write_bytes(
orjson.dumps(post, default=default)
)
print(f'{post.board_id}/{post.id}: {post.title} by {post.author_name} ({post.author_id})')
post.title = titleParts.pop().strip()
post.category = titleParts.pop()[1:-1].strip()
if authorAnchorTag:
post.authorId = re.findall(r'\/\w+$', authorAnchorTag.attrs.get('href'))[0][1:]
post.authorName = authorAnchorTag.get_text(strip=True)
else:
authorParts = authorTag.get_text(strip=True).split('(')
post.authorId = authorParts[1][:-1].strip()
post.authorName = authorParts[0].strip()
post.created_at = (
datetime
.strptime(timeTag.get_text(strip=True), '%Y.%m.%d %H:%M')
.replace(tzinfo=ZoneInfo('Asia/Seoul'))
)
post.body = document.select_one('.thum-txtin')
# ㅋㅋㅋㅋz
if post.authorName != 'ori':
return
print(f'yoinked {post.boardId}/{post.postId}, written by {post.authorName}: {post.title}')
if attachment_path:
self.replace_attachment(post, attachment_path)
if document_path:
(document_path / f'{post.boardId}_{post.postId}.html').write_text(str(post.body))
except Exception as e:
print(f'[Exception] {post.board_id}/{post.id}:', e)
scraper = Scraper(concurrency=5)
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[404])
scraper.mount('https://', WhyTheFuckRequestsHasNoTimeoutInAdapter(timeout=1, max_retries=retries))
document_path = Path('archives')
attachment_path = document_path / 'attachments'
attachment_path.mkdir(parents=True, exist_ok=True)
try:
for offset in range(22760681, 22760681 + (10000 * 100), 10000):
with ThreadPoolExecutor() as executor:
posts = scraper.search('aoegame', 'name', -offset, 'ori')
print(f'searching offset {-offset}, found {len(posts)} post(s)')
as_completed([
executor.submit(
scraper.view,
post,
document_path=document_path,
attachment_path=attachment_path
)
for post in scraper.search('aoegame', 'name', -offset, 'ori')
])
except KeyboardInterrupt:
print(':-)')
if __name__ == '__main__':
asyncio.run(main())

0
utils/__init__.py Normal file
View File

8
utils/middlewares.py Normal file
View File

@@ -0,0 +1,8 @@
import asyncio
from aiohttp import ClientRequest, ClientResponse, ClientHandlerType
class Semaphore(asyncio.Semaphore):
async def __call__(self, req: ClientRequest, handler: ClientHandlerType) -> ClientResponse:
async with self:
return await handler(req)

274
utils/scraper.py Normal file
View File

@@ -0,0 +1,274 @@
import re
import shutil
import hashlib
import mimetypes
import asyncio
import urllib.parse as urlparse
from typing import Optional, List
from pathlib import Path
from tempfile import NamedTemporaryFile
from datetime import datetime
from zoneinfo import ZoneInfo
from aiohttp import ClientSession, ClientError
from bs4 import BeautifulSoup
from .typings import BoardPath, SearchType
from models import Attachment, Post
LIST_MAX_POSTS = 200
class Scraper(ClientSession):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 모바일 페이지 및 첨부 파일 요청 시 필요한 기본 헤더 값
self.headers['Accept'] = 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*'
self.headers['User-Agent'] = '(Android)'
self.headers['Referer'] = 'https://m.dcinside.com/board/aoegame'
# 게시글 목록 조회로 한 번에 불러올 항목 수
self.cookie_jar.update_cookies({
'list_count': LIST_MAX_POSTS
})
async def __aenter__(self) -> 'Scraper':
return self
async def list(
self,
board_id: str,
board_path: BoardPath = 'board',
page: int = 1,
category_id: int = 0,
only_recommended: bool = False,
only_notice: bool = False,
search_type: Optional[SearchType] = None,
search_position: Optional[int] = None,
search_value: Optional[str] = None
) -> List[Post]:
"""
특정 게시판으로부터 특정 조건에 맞는 게시글 목록을 가져옵니다
:param board_id: 게시판 아이디
:param board_path: 게시판 경로(종류)
:param page: 페이지 번호
:param category_id: 말머리 아이디
:param only_recommended: 개념글 게시글만 조회할지?
:param only_notice: 공지 게시글만 조회할지?
:param search_type: 검색 종류
:param search_position: 검색 지점
:param search_value: 검색어
"""
url = f'https://m.dcinside.com/{board_path}/{board_id}'
params = {
'page': page,
'headid': category_id,
'recommend': only_recommended and '1' or '0',
'notice': only_notice and '1' or '0',
's_type': search_type or '',
's_pos': search_position or '',
'serval': search_value or ''
}
async with self.get(url, params=params) as response:
html = await response.text()
document = BeautifulSoup(html, 'lxml')
return [
Post(
id=int(re.findall(r'/\d+', tag.select_one('a[href]')['href'])[0][1:]),
board_id=board_id,
board_path=board_path
)
for tag in document.select('.gall-detail-lnktb')
]
async def view(self, post: Post) -> Post:
"""
게시글 내용을 조회합니다
:param post: 조회할 게시글 인스턴스
"""
async with self.get(f'https://m.dcinside.com/{post.board_path}/{post.board_id}/{post.id}') as response:
html = await response.text()
document = BeautifulSoup(html, 'lxml')
# 상단 제목 요소는 `li`로 나누어져있고 무슨 지랄을 해도 정확히 2개임
# 만약 아니라면 어처피 파싱 무결성 전체가 깨질테니 예외 처리는 나도 몰?루
author_tag, timestamp_tag, *_ = document.select('.gallview-tit-box .ginfo2 > li')
author_anchor_tag = author_tag.select_one('a')
# 작성일 파싱
post.created_at = (
datetime
.strptime(timestamp_tag.get_text(strip=True), '%Y.%m.%d %H:%M')
.replace(tzinfo=ZoneInfo('Asia/Seoul'))
)
# 작성자 정보 파싱
if author_anchor_tag:
# 작성자 요소에 앵커 태그가 있다면 갤로그가 존재하는 상태임
post.author_id = re.findall(r'\/\w+$', author_anchor_tag['href'])[0][1:]
post.author_name = author_anchor_tag.get_text(strip=True)
else:
author_parts = author_tag.get_text(strip=True).split('(')
post.author_id = author_parts.pop()[:-1].strip() # 123.123) -> 123.123
post.author_name = author_parts.pop().strip()
# 모바일 웹에서 말머리와 제목은 `\n`으로 분리되어있음
title_texts = (
document
.select_one('.gallview-tit-box .tit')
.get_text(strip=True)
.split('\n')
)
# 제목과 말머리 파싱
post.title = title_texts.pop().strip()
if title_texts:
post.category = title_texts.pop()[1:~1].strip() # [XX] -> XX
# 본문 파싱
post.body = document.select_one('.thum-txtin')
# 불필요한 본문 요소 제거
for tag in post.body.select('script, style'):
tag.extract()
return post
async def fetch_voice(self, id: str):
"""
모바일 웹의 보이스 리플 iframe 페이지로부터 실제 파일 경로를 가져옵니다
:param id: 보이스 리플 아이디 (`vr` 인자)
"""
params = {
'vr': id,
'vr_open': 1
}
async with await self.get('https://m.dcinside.com/voice/player', params=params) as response:
html = await response.text()
document = BeautifulSoup(html, 'lxml')
return document.select_one('input')['value']
async def fetch_video(self, id: str):
"""
모바일 웹의 동영상 iframe 페이지로부터 실제 파일 경로를 가져옵니다
:param id: 동영상 아이디 (`no` 인자)
"""
params = {
'no': id
}
async with await self.get('https://m.dcinside.com/movie/player', params=params) as response:
html = await response.text()
document = BeautifulSoup(html, 'lxml')
return document.select_one('source')['src']
async def download_attachment(self, url: str, save_dir: Path) -> Attachment:
"""
첨부 파일을 받아옵니다
:param url: 받아올 첨부 파일의 주소
:param save_dir: 받아질 로컬 디렉터리 경로
"""
url_parsed = urlparse.urlparse(url)
url_params = urlparse.parse_qs(url_parsed.query)
hash = hashlib.sha1()
attachment = Attachment(
url=url,
source_url=url
)
if url.startswith('https://m.dcinside.com/voice/player'):
# 보이스 리플
attachment.source_url = await self.fetch_voice(url_params.get('vr'))
elif url.startswith('https://m.dcinside.com/movie/player'):
# 동영상
attachment.source_url = await self.fetch_video(url_params.get('no'))
with NamedTemporaryFile('wb') as temp_file:
async with await self.get(attachment.source_url) as response:
async for chunk, _ in response.content.iter_chunks():
temp_file.write(chunk)
hash.update(chunk)
temp_file.flush()
# Content-Type 헤더로부터 확장자 알아내기
attachment.source_suffix = mimetypes.guess_extension(response.content_type)
# Content-Disposition 헤더로부터 실제 파일 이름과 확장자 알아내기
# FIXME: 이런 개시발 디시에서 헤더 인코딩을 터트려서 보내주는군요 (latin-1?)
if response.content_disposition and response.content_disposition.filename:
attachment.source_filename = response.content_disposition.filename
attachment.source_suffix = Path(attachment.source_filename).suffix
attachment.hash = hash.hexdigest()
saved_path = save_dir / f'{attachment.hash}{attachment.source_suffix}'
# 임시로 받은 파일 옮기기
if not saved_path.exists():
shutil.copy2(temp_file.name, saved_path)
return attachment
async def download_attachments(
self,
post: Post,
save_dir: Path
):
"""
게시글에 첨부된 이미지, 동영상, 음성 등 첨부 파일을 받아옵니다
:param post: 게시글 인스턴스
:param save_dir: 받아질 로컬 디렉터리 경로
"""
urls = [
# 이미지
*[
# 움짤은 자동 변환 후 `data-gif`와 `data-mp4`로 반환됨
# TODO: bad code, clean shit up
tag.attrs.get('data-mp4', tag['data-original']).strip()
for tag in post.body.select('img[data-original]')
],
# 보이스 리플 및 동영상
*filter(
lambda url: (
url.startswith('https://m.dcinside.com/voice/player') or
url.startswith('https://m.dcinside.com/movie/player')
),
[
tag['src'].strip()
for tag in post.body.select('iframe')
]
)
]
futures = [
self.download_attachment(url, save_dir)
for url in filter(
lambda x: x not in post.attachments,
urls
)
]
async for future in asyncio.as_completed(futures):
# TODO: 오류 핸들링
attachment = await future
post.attachments[attachment.url] = attachment

6
utils/typings.py Normal file
View File

@@ -0,0 +1,6 @@
from typing import Literal
BoardPath = Literal['board', 'mini', 'person']
SearchType = Literal['subject_m', 'subject', 'memo', 'name', 'comment']