Compare commits
18 Commits
920ecae2cd
...
main
Author | SHA1 | Date | |
---|---|---|---|
6a5ac1d7d1
|
|||
dd3315b067
|
|||
9856fbecc4
|
|||
59772371a7
|
|||
26e49009ac
|
|||
65ba2f7ee2
|
|||
4efd92bcb0
|
|||
753da82a48
|
|||
f3344c57c3
|
|||
69a5fd2e97
|
|||
6987c26b11
|
|||
6b5477e1eb
|
|||
5679ce1d6a
|
|||
02fd0ae13e
|
|||
85e1d74a1d
|
|||
8ad93caa90
|
|||
540a84e772
|
|||
6dbc4a4e54
|
12
.vscode/launch.json
vendored
Normal file
12
.vscode/launch.json
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Python: Run scraper",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"module": "scraper",
|
||||
"justMyCode": true
|
||||
}
|
||||
]
|
||||
}
|
13
COPYING
Normal file
13
COPYING
Normal file
@@ -0,0 +1,13 @@
|
||||
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
|
||||
Version 2, December 2004
|
||||
|
||||
Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
|
||||
|
||||
Everyone is permitted to copy and distribute verbatim or modified
|
||||
copies of this license document, and changing it is allowed as long
|
||||
as the name is changed.
|
||||
|
||||
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
|
||||
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
|
||||
|
||||
0. You just DO WHAT THE FUCK YOU WANT TO.
|
14
adapter.py
14
adapter.py
@@ -1,14 +0,0 @@
|
||||
from requests.adapters import HTTPAdapter
|
||||
|
||||
class WhyTheFuckRequestsHasNoTimeoutInAdapter(HTTPAdapter):
|
||||
def __init__(self, *args, **kwargs):
|
||||
if "timeout" in kwargs:
|
||||
self.timeout = kwargs["timeout"]
|
||||
del kwargs["timeout"]
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def send(self, request, **kwargs):
|
||||
timeout = kwargs.get("timeout")
|
||||
if timeout is None and hasattr(self, 'timeout'):
|
||||
kwargs["timeout"] = self.timeout
|
||||
return super().send(request, **kwargs)
|
33
cgi.py
33
cgi.py
@@ -1,33 +0,0 @@
|
||||
# https://github.com/python/cpython/blob/3511c2e546aaacda5880eb89a94f4e8514b3ce76/Lib/cgi.py#L226-L256
|
||||
|
||||
def _parseparam(s):
|
||||
while s[:1] == ';':
|
||||
s = s[1:]
|
||||
end = s.find(';')
|
||||
while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2:
|
||||
end = s.find(';', end + 1)
|
||||
if end < 0:
|
||||
end = len(s)
|
||||
f = s[:end]
|
||||
yield f.strip()
|
||||
s = s[end:]
|
||||
|
||||
def parse_header(line):
|
||||
"""Parse a Content-type like header.
|
||||
|
||||
Return the main content-type and a dictionary of options.
|
||||
|
||||
"""
|
||||
parts = _parseparam(';' + line)
|
||||
key = parts.__next__()
|
||||
pdict = {}
|
||||
for p in parts:
|
||||
i = p.find('=')
|
||||
if i >= 0:
|
||||
name = p[:i].strip().lower()
|
||||
value = p[i+1:].strip()
|
||||
if len(value) >= 2 and value[0] == value[-1] == '"':
|
||||
value = value[1:-1]
|
||||
value = value.replace('\\\\', '\\').replace('\\"', '"')
|
||||
pdict[name] = value
|
||||
return key, pdict
|
1
models/__init__.py
Normal file
1
models/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .post import Attachment, Post
|
31
models/post.py
Normal file
31
models/post.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from typing import Optional, Dict
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
|
||||
from bs4 import Tag
|
||||
|
||||
from utils.typings import BoardPath
|
||||
|
||||
|
||||
@dataclass
|
||||
class Attachment:
|
||||
url: str
|
||||
source_url: str
|
||||
source_filename: Optional[str] = None
|
||||
source_suffix: Optional[str] = None
|
||||
hash: Optional[str] = None
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class Post:
|
||||
id: int
|
||||
board_id: str
|
||||
board_path: BoardPath
|
||||
author_id: Optional[str] = None
|
||||
author_name: Optional[str] = None
|
||||
category: Optional[str] = None
|
||||
title: Optional[str] = None
|
||||
body: Optional[Tag] = None
|
||||
attachments: Dict[str, Attachment] = field(default_factory=lambda: {})
|
||||
created_at: Optional[datetime] = None
|
@@ -1,8 +1,18 @@
|
||||
aiohappyeyeballs==2.6.1
|
||||
aiohttp==3.12.15
|
||||
aiosignal==1.4.0
|
||||
attrs==25.3.0
|
||||
beautifulsoup4==4.13.4
|
||||
certifi==2025.7.14
|
||||
charset-normalizer==3.4.2
|
||||
frozenlist==1.7.0
|
||||
idna==3.10
|
||||
requests==2.32.4
|
||||
lxml==6.0.0
|
||||
multidict==6.6.3
|
||||
orjson==3.11.1
|
||||
propcache==0.3.2
|
||||
soupsieve==2.7
|
||||
typing_extensions==4.14.1
|
||||
tzdata==2025.2
|
||||
urllib3==2.5.0
|
||||
yarl==1.20.1
|
||||
|
274
scraper.py
274
scraper.py
@@ -1,187 +1,143 @@
|
||||
import re
|
||||
import hashlib
|
||||
import orjson
|
||||
import asyncio
|
||||
|
||||
from typing import Optional, Tuple, List
|
||||
from typing import Optional, List
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
from argparse import ArgumentParser
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from datetime import datetime
|
||||
|
||||
from threading import Semaphore
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from bs4 import Tag
|
||||
|
||||
from requests import Session
|
||||
from requests.adapters import Retry
|
||||
from requests.cookies import create_cookie
|
||||
|
||||
from bs4 import BeautifulSoup, Tag
|
||||
|
||||
from cgi import parse_header
|
||||
from adapter import WhyTheFuckRequestsHasNoTimeoutInAdapter
|
||||
from utils.typings import BoardPath, SearchType
|
||||
from utils.middlewares import Semaphore
|
||||
from utils.scraper import Scraper, LIST_MAX_POSTS
|
||||
from models.post import Post
|
||||
|
||||
|
||||
@dataclass
|
||||
class Post:
|
||||
boardId: str
|
||||
postId: int
|
||||
authorId: Optional[str] = None
|
||||
authorName: Optional[str] = None
|
||||
category: Optional[str] = None
|
||||
title: Optional[str] = None
|
||||
body: Optional[Tag] = None
|
||||
created_at: Optional[datetime] = None
|
||||
class ArgumentInterface:
|
||||
board_id: str
|
||||
board_path: BoardPath
|
||||
page: int
|
||||
json_dir: Path
|
||||
json_format: str
|
||||
attachment_dir: Path
|
||||
concurrency: int
|
||||
|
||||
search_type: Optional[SearchType]
|
||||
search_position: Optional[int]
|
||||
search_date: datetime
|
||||
search_value: Optional[str]
|
||||
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument('board_id', type=str)
|
||||
parser.add_argument('--board-path', type=str, default='board')
|
||||
parser.add_argument('--page', type=int, default=1)
|
||||
parser.add_argument('--json-dir', type=Path, default=Path('archives'))
|
||||
parser.add_argument('--json-format', type=str, default='{board_id}_{id}.json')
|
||||
parser.add_argument('--attachment-dir', type=Path, default=Path('archives/attachments'))
|
||||
parser.add_argument('--concurrency', type=int, default=5)
|
||||
|
||||
parser.add_argument('--search-type', type=str, default=None)
|
||||
parser.add_argument('--search-position', type=int, default=None)
|
||||
parser.add_argument('--search-date', type=datetime.fromisoformat, default=datetime.now())
|
||||
parser.add_argument('--search-value', type=str, default=None)
|
||||
args = ArgumentInterface(**vars(parser.parse_args()))
|
||||
|
||||
|
||||
class Scraper(Session):
|
||||
def default(obj):
|
||||
if isinstance(obj, Tag):
|
||||
return str(obj)
|
||||
raise TypeError
|
||||
|
||||
def __init__(self, concurrency = 5):
|
||||
super().__init__()
|
||||
async def main():
|
||||
args.json_dir.mkdir(parents=True, exist_ok=True)
|
||||
args.attachment_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.semaphore = Semaphore(concurrency)
|
||||
middlewares = (
|
||||
Semaphore(args.concurrency),
|
||||
)
|
||||
|
||||
self.headers['User-Agent'] = '(Android)'
|
||||
self.headers['Referer'] = 'https://m.dcinside.com/board/6974gay'
|
||||
|
||||
self.cookies.set_cookie(create_cookie(
|
||||
name='list_count',
|
||||
value='200',
|
||||
domain='.dcinside.com'
|
||||
))
|
||||
|
||||
def download_attachment(self, url: str, save_dir: Path) -> Tuple[str, Path]:
|
||||
with self.semaphore:
|
||||
res = self.get(url, stream=True)
|
||||
res.raise_for_status()
|
||||
|
||||
hash = hashlib.sha1()
|
||||
|
||||
# fuck this shit
|
||||
_, parts = parse_header(res.headers.get('Content-Disposition'))
|
||||
fname = parts.get('filename', '')
|
||||
fext = fname.split('.').pop()
|
||||
|
||||
with NamedTemporaryFile('wb', dir=save_dir) as file:
|
||||
for chunk in res.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
file.write(chunk)
|
||||
hash.update(chunk)
|
||||
|
||||
return url, Path(file.name).rename(save_dir / f'{hash.hexdigest()}.{fext}')
|
||||
|
||||
|
||||
def replace_attachment(self, post: Post, save_dir: Path):
|
||||
src_to_tags = {
|
||||
img.attrs['data-original'].strip(): img
|
||||
for img in post.body.select('img[data-original]')
|
||||
async with Scraper(middlewares=middlewares) as scraper:
|
||||
base_kwargs = {
|
||||
'board_id': args.board_id,
|
||||
'board_path': args.board_path,
|
||||
'search_type': args.search_type,
|
||||
'search_position': args.search_position,
|
||||
'search_value': args.search_value
|
||||
}
|
||||
|
||||
with ThreadPoolExecutor() as executor:
|
||||
futures = [
|
||||
executor.submit(self.download_attachment, src, save_dir)
|
||||
for src in src_to_tags.keys()
|
||||
]
|
||||
# TODO: 존재하는 게시판인지 확인하기
|
||||
|
||||
for future in as_completed(futures):
|
||||
src, path = future.result()
|
||||
src_to_tags[src]['src'] = path
|
||||
if args.search_type:
|
||||
# 작성일을 기준으로 검색 시작 위치 가져오기
|
||||
if not args.search_position:
|
||||
url = 'https://m.dcinside.com/ajax/searchPrevDay'
|
||||
data = {
|
||||
'id': args.board_id, # TODO: board prefix (mi$, person$)
|
||||
's_type': args.search_type,
|
||||
'serval': args.search_value,
|
||||
'date': args.search_date.strftime('%Y-%m-%d'),
|
||||
}
|
||||
|
||||
def search(self, boardId: str, type: str, offset: int, value: str) -> List[Post]:
|
||||
with self.semaphore:
|
||||
res = self.get(
|
||||
f'https://m.dcinside.com/board/{boardId}',
|
||||
params={
|
||||
's_type': type,
|
||||
's_pos': offset,
|
||||
'serval': value
|
||||
})
|
||||
res.raise_for_status()
|
||||
async with await scraper.post(url, data=data) as response:
|
||||
payload = await response.json()
|
||||
print(f'received search position for {args.search_date}: {payload}')
|
||||
|
||||
document = BeautifulSoup(res.text, 'html.parser')
|
||||
return [
|
||||
Post(
|
||||
boardId=boardId,
|
||||
postId=int(re.findall(r'/\d+', tag.attrs.get('href'))[0][1:])
|
||||
# TODO: 오류 핸들링 (`payload['result'] != true` 또는 빈 페이지 반환)
|
||||
base_kwargs['search_position'] = payload['s_pos']
|
||||
|
||||
search_looped_count = 0
|
||||
|
||||
while True:
|
||||
search_kwargs = {
|
||||
# 검색 중이라면 검색할 위치 가져오기
|
||||
'search_position': (
|
||||
base_kwargs['search_position'] + (10000 * search_looped_count) if
|
||||
base_kwargs['search_position'] else
|
||||
None
|
||||
)
|
||||
for tag in document.select('.gall-detail-lnktb a[href]:first-child')
|
||||
]
|
||||
}
|
||||
|
||||
def view(self, post: Post, document_path: Path = None, attachment_path: Path = None) -> Post:
|
||||
with self.semaphore:
|
||||
res = self.get(f'https://m.dcinside.com/board/{post.boardId}/{post.postId}')
|
||||
res.raise_for_status()
|
||||
search_looped_count += 1
|
||||
|
||||
document = BeautifulSoup(res.text, 'html.parser')
|
||||
# 게시판 전체를 검색했다면 작업 종료하기
|
||||
if search_kwargs['search_position'] >= -10000:
|
||||
break
|
||||
|
||||
titleWrapTags = document.select('.gallview-tit-box .ginfo2 > li')
|
||||
pages: List[List[Post]] = []
|
||||
|
||||
timeTag = titleWrapTags.pop()
|
||||
authorTag = titleWrapTags.pop()
|
||||
authorAnchorTag = authorTag.select_one('a')
|
||||
|
||||
titleParts = (
|
||||
document
|
||||
.select_one('.gallview-tit-box .tit')
|
||||
.get_text(strip=True)
|
||||
.split('\r\n')
|
||||
)
|
||||
|
||||
post.title = titleParts.pop().strip()
|
||||
post.category = titleParts.pop()[1:-1].strip()
|
||||
|
||||
if authorAnchorTag:
|
||||
post.authorId = re.findall(r'\/\w+$', authorAnchorTag.attrs.get('href'))[0][1:]
|
||||
post.authorName = authorAnchorTag.get_text(strip=True)
|
||||
else:
|
||||
authorParts = authorTag.get_text(strip=True).split('(')
|
||||
post.authorId = authorParts[1][:-1].strip()
|
||||
post.authorName = authorParts[0].strip()
|
||||
|
||||
post.created_at = (
|
||||
datetime
|
||||
.strptime(timeTag.get_text(strip=True), '%Y.%m.%d %H:%M')
|
||||
.replace(tzinfo=ZoneInfo('Asia/Seoul'))
|
||||
)
|
||||
|
||||
post.body = document.select_one('.thum-txtin')
|
||||
|
||||
# ㅋㅋㅋㅋz
|
||||
if post.authorName != 'ori':
|
||||
return
|
||||
|
||||
print(f'yoinked {post.boardId}/{post.postId}, written by {post.authorName}: {post.title}')
|
||||
|
||||
if attachment_path:
|
||||
self.replace_attachment(post, attachment_path)
|
||||
|
||||
if document_path:
|
||||
(document_path / f'{post.boardId}_{post.postId}.html').write_text(str(post.body))
|
||||
|
||||
|
||||
scraper = Scraper(concurrency=5)
|
||||
|
||||
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[404])
|
||||
scraper.mount('https://', WhyTheFuckRequestsHasNoTimeoutInAdapter(timeout=1, max_retries=retries))
|
||||
|
||||
document_path = Path('archives')
|
||||
attachment_path = document_path / 'attachments'
|
||||
attachment_path.mkdir(parents=True, exist_ok=True)
|
||||
# 모든 페이지 가져오기
|
||||
# FIXME: 이따구로 코드짜면 검색 결과가 아닌 단순 페이지 열람 불가능함
|
||||
while (
|
||||
len(pages) < 1 or
|
||||
len(pages[-1]) >= LIST_MAX_POSTS
|
||||
):
|
||||
kwargs = {
|
||||
**base_kwargs,
|
||||
**search_kwargs,
|
||||
'page': len(pages) + 1
|
||||
}
|
||||
posts = await scraper.list(**kwargs)
|
||||
pages.append(posts)
|
||||
print(f'found {len(posts)} post(s) from {kwargs['page']} page: {kwargs}')
|
||||
|
||||
for future in asyncio.as_completed([
|
||||
scraper.view(post)
|
||||
for page in pages for post in page
|
||||
]):
|
||||
try:
|
||||
for offset in range(22760681, 22760681 + (10000 * 100), 10000):
|
||||
with ThreadPoolExecutor() as executor:
|
||||
posts = scraper.search('aoegame', 'name', -offset, 'ori')
|
||||
print(f'searching offset {-offset}, found {len(posts)} post(s)')
|
||||
as_completed([
|
||||
executor.submit(
|
||||
scraper.view,
|
||||
post,
|
||||
document_path=document_path,
|
||||
attachment_path=attachment_path
|
||||
post = await future
|
||||
await scraper.download_attachments(post, args.attachment_dir)
|
||||
(args.json_dir / args.json_format.format(**kwargs, id=post.id)).write_bytes(
|
||||
orjson.dumps(post, default=default)
|
||||
)
|
||||
for post in scraper.search('aoegame', 'name', -offset, 'ori')
|
||||
])
|
||||
print(f'{post.board_id}/{post.id}: {post.title} by {post.author_name} ({post.author_id})')
|
||||
|
||||
except Exception as e:
|
||||
print(f'[Exception] {post.board_id}/{post.id}:', e)
|
||||
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print(':-)')
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
|
0
utils/__init__.py
Normal file
0
utils/__init__.py
Normal file
8
utils/middlewares.py
Normal file
8
utils/middlewares.py
Normal file
@@ -0,0 +1,8 @@
|
||||
import asyncio
|
||||
|
||||
from aiohttp import ClientRequest, ClientResponse, ClientHandlerType
|
||||
|
||||
class Semaphore(asyncio.Semaphore):
|
||||
async def __call__(self, req: ClientRequest, handler: ClientHandlerType) -> ClientResponse:
|
||||
async with self:
|
||||
return await handler(req)
|
274
utils/scraper.py
Normal file
274
utils/scraper.py
Normal file
@@ -0,0 +1,274 @@
|
||||
import re
|
||||
import shutil
|
||||
import hashlib
|
||||
import mimetypes
|
||||
import asyncio
|
||||
import urllib.parse as urlparse
|
||||
|
||||
from typing import Optional, List
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from aiohttp import ClientSession, ClientError
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from .typings import BoardPath, SearchType
|
||||
from models import Attachment, Post
|
||||
|
||||
|
||||
LIST_MAX_POSTS = 200
|
||||
|
||||
|
||||
class Scraper(ClientSession):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# 모바일 페이지 및 첨부 파일 요청 시 필요한 기본 헤더 값
|
||||
self.headers['Accept'] = 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*'
|
||||
self.headers['User-Agent'] = '(Android)'
|
||||
self.headers['Referer'] = 'https://m.dcinside.com/board/aoegame'
|
||||
|
||||
# 게시글 목록 조회로 한 번에 불러올 항목 수
|
||||
self.cookie_jar.update_cookies({
|
||||
'list_count': LIST_MAX_POSTS
|
||||
})
|
||||
|
||||
async def __aenter__(self) -> 'Scraper':
|
||||
return self
|
||||
|
||||
async def list(
|
||||
self,
|
||||
board_id: str,
|
||||
board_path: BoardPath = 'board',
|
||||
page: int = 1,
|
||||
category_id: int = 0,
|
||||
only_recommended: bool = False,
|
||||
only_notice: bool = False,
|
||||
search_type: Optional[SearchType] = None,
|
||||
search_position: Optional[int] = None,
|
||||
search_value: Optional[str] = None
|
||||
) -> List[Post]:
|
||||
"""
|
||||
특정 게시판으로부터 특정 조건에 맞는 게시글 목록을 가져옵니다
|
||||
|
||||
:param board_id: 게시판 아이디
|
||||
:param board_path: 게시판 경로(종류)
|
||||
:param page: 페이지 번호
|
||||
:param category_id: 말머리 아이디
|
||||
:param only_recommended: 개념글 게시글만 조회할지?
|
||||
:param only_notice: 공지 게시글만 조회할지?
|
||||
:param search_type: 검색 종류
|
||||
:param search_position: 검색 지점
|
||||
:param search_value: 검색어
|
||||
"""
|
||||
url = f'https://m.dcinside.com/{board_path}/{board_id}'
|
||||
params = {
|
||||
'page': page,
|
||||
'headid': category_id,
|
||||
'recommend': only_recommended and '1' or '0',
|
||||
'notice': only_notice and '1' or '0',
|
||||
's_type': search_type or '',
|
||||
's_pos': search_position or '',
|
||||
'serval': search_value or ''
|
||||
}
|
||||
|
||||
async with self.get(url, params=params) as response:
|
||||
html = await response.text()
|
||||
document = BeautifulSoup(html, 'lxml')
|
||||
|
||||
return [
|
||||
Post(
|
||||
id=int(re.findall(r'/\d+', tag.select_one('a[href]')['href'])[0][1:]),
|
||||
board_id=board_id,
|
||||
board_path=board_path
|
||||
)
|
||||
for tag in document.select('.gall-detail-lnktb')
|
||||
]
|
||||
|
||||
async def view(self, post: Post) -> Post:
|
||||
"""
|
||||
게시글 내용을 조회합니다
|
||||
|
||||
:param post: 조회할 게시글 인스턴스
|
||||
"""
|
||||
async with self.get(f'https://m.dcinside.com/{post.board_path}/{post.board_id}/{post.id}') as response:
|
||||
html = await response.text()
|
||||
document = BeautifulSoup(html, 'lxml')
|
||||
|
||||
# 상단 제목 요소는 `li`로 나누어져있고 무슨 지랄을 해도 정확히 2개임
|
||||
# 만약 아니라면 어처피 파싱 무결성 전체가 깨질테니 예외 처리는 나도 몰?루
|
||||
author_tag, timestamp_tag, *_ = document.select('.gallview-tit-box .ginfo2 > li')
|
||||
author_anchor_tag = author_tag.select_one('a')
|
||||
|
||||
# 작성일 파싱
|
||||
post.created_at = (
|
||||
datetime
|
||||
.strptime(timestamp_tag.get_text(strip=True), '%Y.%m.%d %H:%M')
|
||||
.replace(tzinfo=ZoneInfo('Asia/Seoul'))
|
||||
|
||||
)
|
||||
|
||||
# 작성자 정보 파싱
|
||||
if author_anchor_tag:
|
||||
# 작성자 요소에 앵커 태그가 있다면 갤로그가 존재하는 상태임
|
||||
post.author_id = re.findall(r'\/\w+$', author_anchor_tag['href'])[0][1:]
|
||||
post.author_name = author_anchor_tag.get_text(strip=True)
|
||||
else:
|
||||
author_parts = author_tag.get_text(strip=True).split('(')
|
||||
post.author_id = author_parts.pop()[:-1].strip() # 123.123) -> 123.123
|
||||
post.author_name = author_parts.pop().strip()
|
||||
|
||||
# 모바일 웹에서 말머리와 제목은 `\n`으로 분리되어있음
|
||||
title_texts = (
|
||||
document
|
||||
.select_one('.gallview-tit-box .tit')
|
||||
.get_text(strip=True)
|
||||
.split('\n')
|
||||
)
|
||||
|
||||
# 제목과 말머리 파싱
|
||||
post.title = title_texts.pop().strip()
|
||||
|
||||
if title_texts:
|
||||
post.category = title_texts.pop()[1:~1].strip() # [XX] -> XX
|
||||
|
||||
# 본문 파싱
|
||||
post.body = document.select_one('.thum-txtin')
|
||||
|
||||
# 불필요한 본문 요소 제거
|
||||
for tag in post.body.select('script, style'):
|
||||
tag.extract()
|
||||
|
||||
return post
|
||||
|
||||
async def fetch_voice(self, id: str):
|
||||
"""
|
||||
모바일 웹의 보이스 리플 iframe 페이지로부터 실제 파일 경로를 가져옵니다
|
||||
|
||||
:param id: 보이스 리플 아이디 (`vr` 인자)
|
||||
"""
|
||||
params = {
|
||||
'vr': id,
|
||||
'vr_open': 1
|
||||
}
|
||||
|
||||
async with await self.get('https://m.dcinside.com/voice/player', params=params) as response:
|
||||
html = await response.text()
|
||||
document = BeautifulSoup(html, 'lxml')
|
||||
|
||||
return document.select_one('input')['value']
|
||||
|
||||
async def fetch_video(self, id: str):
|
||||
"""
|
||||
모바일 웹의 동영상 iframe 페이지로부터 실제 파일 경로를 가져옵니다
|
||||
|
||||
:param id: 동영상 아이디 (`no` 인자)
|
||||
"""
|
||||
params = {
|
||||
'no': id
|
||||
}
|
||||
|
||||
async with await self.get('https://m.dcinside.com/movie/player', params=params) as response:
|
||||
html = await response.text()
|
||||
document = BeautifulSoup(html, 'lxml')
|
||||
|
||||
return document.select_one('source')['src']
|
||||
|
||||
async def download_attachment(self, url: str, save_dir: Path) -> Attachment:
|
||||
"""
|
||||
첨부 파일을 받아옵니다
|
||||
|
||||
:param url: 받아올 첨부 파일의 주소
|
||||
:param save_dir: 받아질 로컬 디렉터리 경로
|
||||
"""
|
||||
url_parsed = urlparse.urlparse(url)
|
||||
url_params = urlparse.parse_qs(url_parsed.query)
|
||||
|
||||
hash = hashlib.sha1()
|
||||
attachment = Attachment(
|
||||
url=url,
|
||||
source_url=url
|
||||
)
|
||||
|
||||
if url.startswith('https://m.dcinside.com/voice/player'):
|
||||
# 보이스 리플
|
||||
attachment.source_url = await self.fetch_voice(url_params.get('vr'))
|
||||
|
||||
elif url.startswith('https://m.dcinside.com/movie/player'):
|
||||
# 동영상
|
||||
attachment.source_url = await self.fetch_video(url_params.get('no'))
|
||||
|
||||
with NamedTemporaryFile('wb') as temp_file:
|
||||
async with await self.get(attachment.source_url) as response:
|
||||
async for chunk, _ in response.content.iter_chunks():
|
||||
temp_file.write(chunk)
|
||||
hash.update(chunk)
|
||||
temp_file.flush()
|
||||
|
||||
# Content-Type 헤더로부터 확장자 알아내기
|
||||
attachment.source_suffix = mimetypes.guess_extension(response.content_type)
|
||||
|
||||
# Content-Disposition 헤더로부터 실제 파일 이름과 확장자 알아내기
|
||||
# FIXME: 이런 개시발 디시에서 헤더 인코딩을 터트려서 보내주는군요 (latin-1?)
|
||||
if response.content_disposition and response.content_disposition.filename:
|
||||
attachment.source_filename = response.content_disposition.filename
|
||||
attachment.source_suffix = Path(attachment.source_filename).suffix
|
||||
|
||||
attachment.hash = hash.hexdigest()
|
||||
|
||||
saved_path = save_dir / f'{attachment.hash}{attachment.source_suffix}'
|
||||
|
||||
# 임시로 받은 파일 옮기기
|
||||
if not saved_path.exists():
|
||||
shutil.copy2(temp_file.name, saved_path)
|
||||
|
||||
return attachment
|
||||
|
||||
async def download_attachments(
|
||||
self,
|
||||
post: Post,
|
||||
save_dir: Path
|
||||
):
|
||||
"""
|
||||
게시글에 첨부된 이미지, 동영상, 음성 등 첨부 파일을 받아옵니다
|
||||
|
||||
:param post: 게시글 인스턴스
|
||||
:param save_dir: 받아질 로컬 디렉터리 경로
|
||||
"""
|
||||
urls = [
|
||||
# 이미지
|
||||
*[
|
||||
# 움짤은 자동 변환 후 `data-gif`와 `data-mp4`로 반환됨
|
||||
# TODO: bad code, clean shit up
|
||||
tag.attrs.get('data-mp4', tag['data-original']).strip()
|
||||
for tag in post.body.select('img[data-original]')
|
||||
],
|
||||
|
||||
# 보이스 리플 및 동영상
|
||||
*filter(
|
||||
lambda url: (
|
||||
url.startswith('https://m.dcinside.com/voice/player') or
|
||||
url.startswith('https://m.dcinside.com/movie/player')
|
||||
),
|
||||
[
|
||||
tag['src'].strip()
|
||||
for tag in post.body.select('iframe')
|
||||
]
|
||||
)
|
||||
]
|
||||
|
||||
futures = [
|
||||
self.download_attachment(url, save_dir)
|
||||
for url in filter(
|
||||
lambda x: x not in post.attachments,
|
||||
urls
|
||||
)
|
||||
]
|
||||
|
||||
async for future in asyncio.as_completed(futures):
|
||||
# TODO: 오류 핸들링
|
||||
attachment = await future
|
||||
post.attachments[attachment.url] = attachment
|
6
utils/typings.py
Normal file
6
utils/typings.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from typing import Literal
|
||||
|
||||
|
||||
BoardPath = Literal['board', 'mini', 'person']
|
||||
|
||||
SearchType = Literal['subject_m', 'subject', 'memo', 'name', 'comment']
|
Reference in New Issue
Block a user