120 lines
3.8 KiB
Python
120 lines
3.8 KiB
Python
import asyncio
|
|
|
|
from typing import Dict
|
|
from contextlib import AbstractAsyncContextManager
|
|
from pathlib import Path
|
|
|
|
from playwright.async_api import async_playwright
|
|
from playwright.async_api import Playwright, Browser, Page
|
|
from playwright.async_api import Response
|
|
|
|
from imagehash import ImageHash, phash
|
|
|
|
|
|
class Tracker(AbstractAsyncContextManager):
|
|
playwright: Playwright
|
|
browser: Browser
|
|
|
|
pages: Dict[str, Page] = {}
|
|
pages_lock = asyncio.Lock()
|
|
pages_active_lock = asyncio.Lock()
|
|
|
|
drawing_hashes: Dict[str, ImageHash] = {}
|
|
|
|
|
|
def __init__(self, *browser_args, **browser_kwargs):
|
|
self.browser_args = browser_args
|
|
self.browser_kwargs = browser_kwargs
|
|
|
|
async def __aenter__(self):
|
|
self.playwright = await async_playwright().start()
|
|
self.browser = await self.playwright.firefox.launch(
|
|
*self.browser_args,
|
|
**self.browser_kwargs
|
|
)
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.playwright.stop()
|
|
|
|
|
|
async def open_drawing(self, drawing_id: str) -> Page:
|
|
async with self.pages_lock:
|
|
if drawing_id in self.pages:
|
|
if not self.pages[drawing_id].is_closed():
|
|
return self.pages[drawing_id]
|
|
|
|
await self.pages[drawing_id].close()
|
|
del self.pages[drawing_id]
|
|
|
|
self.pages[drawing_id] = await self.browser.new_page()
|
|
print(f'open new tab for drawing#{drawing_id}')
|
|
|
|
# 앵귤러 프로젝트라 외부에서 데이터 긁어오기 쉽지 않음
|
|
# `POST /open-socket` 요청으로부터 현 프로젝트의 모든 이미지 아이디 긁어올 예정
|
|
socket_response = asyncio.Queue()
|
|
socket_response_count = 0
|
|
|
|
async def on_response(response: Response):
|
|
nonlocal socket_response_count
|
|
|
|
if response.request.method != 'POST':
|
|
return
|
|
if not response.request.url.endswith('/open-socket'):
|
|
return
|
|
|
|
# 첫 데이터는 자신만을 담고 있기 때문에 두번째 데이터를 가져와야함
|
|
socket_response_count += 1
|
|
if socket_response_count < 2:
|
|
return
|
|
|
|
drawing_ids = response.request.post_data_json['ids']
|
|
await asyncio.gather(*[
|
|
self.open_drawing(id)
|
|
for id in drawing_ids
|
|
])
|
|
|
|
await socket_response.put(drawing_ids)
|
|
|
|
|
|
page = self.pages[drawing_id]
|
|
page.on('response', on_response)
|
|
|
|
await page.goto(f'https://magma.com/d/{drawing_id}')
|
|
|
|
# 로그인 페이지가 나를 괴롭혀서 마음이 아파요
|
|
await page.add_style_tag(content='modals-box { display: none !important }')
|
|
|
|
# 현 프로젝트의 모든 그림 아이디 가져올 때까지 대기하기
|
|
await socket_response.get()
|
|
|
|
return page
|
|
|
|
async def download(
|
|
self,
|
|
drawing_id: str,
|
|
path_to_save: Path = None
|
|
):
|
|
page = await self.open_drawing(drawing_id)
|
|
|
|
previous_hash = self.drawing_hashes.get(drawing_id)
|
|
|
|
async with page.expect_download() as event:
|
|
# 파이어폭스는 활성화된 창에서만 Hover -> Click 이 가능함
|
|
async with self.pages_active_lock:
|
|
await page.click('button[aria-label=File]')
|
|
await page.hover('.dropdown-submenu:has(button[command=save-psd])')
|
|
await page.click('button[command=save-psd]')
|
|
|
|
print(f'downloading drawing#{drawing_id}')
|
|
|
|
file = await event.value
|
|
|
|
path = await file.path()
|
|
|
|
if path_to_save:
|
|
path.rename(path_to_save)
|
|
return path_to_save
|
|
|
|
return path
|