1
0
This commit is contained in:
2025-08-02 23:05:28 +09:00
commit 1d7a27c89f
4 changed files with 238 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
__pycache__/
.venv/
archives/
*.tar

14
adapter.py Normal file
View File

@@ -0,0 +1,14 @@
from requests.adapters import HTTPAdapter
class WhyTheFuckRequestsHasNoTimeoutInAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
if "timeout" in kwargs:
self.timeout = kwargs["timeout"]
del kwargs["timeout"]
super().__init__(*args, **kwargs)
def send(self, request, **kwargs):
timeout = kwargs.get("timeout")
if timeout is None and hasattr(self, 'timeout'):
kwargs["timeout"] = self.timeout
return super().send(request, **kwargs)

33
cgi.py Normal file
View File

@@ -0,0 +1,33 @@
# https://github.com/python/cpython/blob/3511c2e546aaacda5880eb89a94f4e8514b3ce76/Lib/cgi.py#L226-L256
def _parseparam(s):
while s[:1] == ';':
s = s[1:]
end = s.find(';')
while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2:
end = s.find(';', end + 1)
if end < 0:
end = len(s)
f = s[:end]
yield f.strip()
s = s[end:]
def parse_header(line):
"""Parse a Content-type like header.
Return the main content-type and a dictionary of options.
"""
parts = _parseparam(';' + line)
key = parts.__next__()
pdict = {}
for p in parts:
i = p.find('=')
if i >= 0:
name = p[:i].strip().lower()
value = p[i+1:].strip()
if len(value) >= 2 and value[0] == value[-1] == '"':
value = value[1:-1]
value = value.replace('\\\\', '\\').replace('\\"', '"')
pdict[name] = value
return key, pdict

187
scraper.py Normal file
View File

@@ -0,0 +1,187 @@
import re
import hashlib
from typing import Optional, Tuple, List
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo
from pathlib import Path
from tempfile import NamedTemporaryFile
from threading import Semaphore
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests import Session
from requests.adapters import Retry
from requests.cookies import create_cookie
from bs4 import BeautifulSoup, Tag
from cgi import parse_header
from adapter import WhyTheFuckRequestsHasNoTimeoutInAdapter
@dataclass
class Post:
boardId: str
postId: int
authorId: Optional[str] = None
authorName: Optional[str] = None
category: Optional[str] = None
title: Optional[str] = None
body: Optional[Tag] = None
created_at: Optional[datetime] = None
class Scraper(Session):
def __init__(self, concurrency = 5):
super().__init__()
self.semaphore = Semaphore(concurrency)
self.headers['User-Agent'] = '(Android)'
self.headers['Referer'] = 'https://m.dcinside.com/board/6974gay'
self.cookies.set_cookie(create_cookie(
name='list_count',
value='200',
domain='.dcinside.com'
))
def download_attachment(self, url: str, save_dir: Path) -> Tuple[str, Path]:
with self.semaphore:
res = self.get(url, stream=True)
res.raise_for_status()
hash = hashlib.sha1()
# fuck this shit
_, parts = parse_header(res.headers.get('Content-Disposition'))
fname = parts.get('filename', '')
fext = fname.split('.').pop()
with NamedTemporaryFile('wb', dir=save_dir) as file:
for chunk in res.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
hash.update(chunk)
return url, Path(file.name).rename(save_dir / f'{hash.hexdigest()}.{fext}')
def replace_attachment(self, post: Post, save_dir: Path):
src_to_tags = {
img.attrs['data-original'].strip(): img
for img in post.body.select('img[data-original]')
}
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.download_attachment, src, save_dir)
for src in src_to_tags.keys()
]
for future in as_completed(futures):
src, path = future.result()
src_to_tags[src]['src'] = path
def search(self, boardId: str, type: str, offset: int, value: str) -> List[Post]:
with self.semaphore:
res = self.get(
f'https://m.dcinside.com/board/{boardId}',
params={
's_type': type,
's_pos': offset,
'serval': value
})
res.raise_for_status()
document = BeautifulSoup(res.text, 'html.parser')
return [
Post(
boardId=boardId,
postId=int(re.findall(r'/\d+', tag.attrs.get('href'))[0][1:])
)
for tag in document.select('.gall-detail-lnktb a[href]:first-child')
]
def view(self, post: Post, document_path: Path = None, attachment_path: Path = None) -> Post:
with self.semaphore:
res = self.get(f'https://m.dcinside.com/board/{post.boardId}/{post.postId}')
res.raise_for_status()
document = BeautifulSoup(res.text, 'html.parser')
titleWrapTags = document.select('.gallview-tit-box .ginfo2 > li')
timeTag = titleWrapTags.pop()
authorTag = titleWrapTags.pop()
authorAnchorTag = authorTag.select_one('a')
titleParts = (
document
.select_one('.gallview-tit-box .tit')
.get_text(strip=True)
.split('\r\n')
)
post.title = titleParts.pop().strip()
post.category = titleParts.pop()[1:-1].strip()
if authorAnchorTag:
post.authorId = re.findall(r'\/\w+$', authorAnchorTag.attrs.get('href'))[0][1:]
post.authorName = authorAnchorTag.get_text(strip=True)
else:
authorParts = authorTag.get_text(strip=True).split('(')
post.authorId = authorParts[1][:-1].strip()
post.authorName = authorParts[0].strip()
post.created_at = (
datetime
.strptime(timeTag.get_text(strip=True), '%Y.%m.%d %H:%M')
.replace(tzinfo=ZoneInfo('Asia/Seoul'))
)
post.body = document.select_one('.thum-txtin')
# ㅋㅋㅋㅋz
if post.authorName != 'ori':
return
print(f'yoinked {post.boardId}/{post.postId}, written by {post.authorName}: {post.title}')
if attachment_path:
self.replace_attachment(post, attachment_path)
if document_path:
(document_path / f'{post.boardId}_{post.postId}.html').write_text(str(post.body))
scraper = Scraper(concurrency=5)
retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[404])
scraper.mount('https://', WhyTheFuckRequestsHasNoTimeoutInAdapter(timeout=1, max_retries=retries))
document_path = Path('archives')
attachment_path = document_path / 'attachments'
attachment_path.mkdir(parents=True, exist_ok=True)
try:
for offset in range(22760681, 22760681 + (10000 * 100), 10000):
with ThreadPoolExecutor() as executor:
posts = scraper.search('aoegame', 'name', -offset, 'ori')
print(f'searching offset {-offset}, found {len(posts)} post(s)')
as_completed([
executor.submit(
scraper.view,
post,
document_path=document_path,
attachment_path=attachment_path
)
for post in scraper.search('aoegame', 'name', -offset, 'ori')
])
except KeyboardInterrupt:
print(':-)')