diff --git a/requirements.txt b/requirements.txt index b1762ae..6a2feb6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ SQLAlchemy==1.4.40 appdirs~=1.4.4 beautifulsoup4==4.11.1 PySide6-Essentials==6.3.1 -Pillow==9.2.0 +Pillow==9.4.0 reportlab==3.6.11 packaging~=21.3 traitlets==5.3.0 @@ -10,4 +10,5 @@ requests==2.28.1 markdown2==2.4.3 lxml==4.9.1 Jinja2==3.1.2 -alembic==1.8.1 \ No newline at end of file +alembic==1.8.1 +aiohttp~=3.8.4 \ No newline at end of file diff --git a/src/dataset_downloader.py b/src/dataset_downloader.py index b28baab..74b666d 100644 --- a/src/dataset_downloader.py +++ b/src/dataset_downloader.py @@ -1,9 +1,11 @@ +import asyncio +from collections import defaultdict from dataclasses import dataclass from datetime import datetime from typing import List -import requests -from PySide6.QtCore import QThread, Signal +import aiohttp +from PySide6.QtCore import QThread, Signal, QObject from PySide6.QtWidgets import QDialog from bs4 import BeautifulSoup @@ -47,6 +49,10 @@ def toDict(self): } +class LoginFailedException(Exception): + pass + + class DatasetDownloadDialog(QDialog, Ui_DownloadDialog): def __init__(self, parent): super().__init__(parent) @@ -55,117 +61,168 @@ def __init__(self, parent): self.setWindowTitle("Quelle auswählen") self.ui.source_combobox.addItem("bfv.sr-regeltest.de") - self.ui.buttonBox.accepted.connect(self.login) + self.ui.buttonBox.accepted.connect(self.download_data) self.session = None self.data = None - def login(self): - self.session = requests.Session() + self.download_progress = 0 + self.max_items = -1 + + self.downloader = None + + def receive_download_items(self, value: int): + self.max_items = value + + def download_done(self): + self.download_progress += 1 + print(f"{self.download_progress} / {self.max_items}") + + def download_data(self): + loop = asyncio.get_event_loop() if self.ui.source_combobox.currentIndex() == 0: - def parse_regelfragen(soup_filtered, session): - output_questions = [] - output_questiongroups = {} - group_25_count = 0 - for element in soup_filtered: - rows = element.findAll("td") - - question_url = rows[0].find('a').attrs['href'] - regel_id = rows[0].find('a').contents[0] - group_name = rows[1].contents[0] - - try: - int(regel_id) - regel_id = regel_id.zfill(5) - group_id = int(regel_id[0:2]) - question_id = int(regel_id[2:]) - - except ValueError: - group_25_count += 1 - group_id = 25 - question_id = group_25_count - - if group_id not in output_questiongroups: - output_questiongroups[group_id] = QuestionGroupJSON(group_id, group_name) - - detail_page = BeautifulSoup(session.get(base_url + question_url).content, 'html.parser') - content = detail_page.findAll("div", {"class": "card-body"}) - question = content[0].findAll("p")[1].contents[0].strip() - if len(content[1].findAll("tr", {"class": "wrong-answer"})) > 0: - # multiple choice! - multiple_choice = [] - for i, answers in enumerate(content[1].findAll("tr")): - multiple_choice_answer = answers.find("td").contents[0].strip() - if answers["class"][0] == 'correct-answer': - answer_index = i - answer_text = multiple_choice_answer - multiple_choice += [answers.find("td").contents[0].strip()] - else: - answer_elements = content[1].find("p") - if answer_elements: - answer_text = answer_elements.contents[0].strip() - else: - answer_text = "" - print(f"Regelgruppe {group_id} - Regel-ID {question_id} hat eine leere Antwort!") - multiple_choice = [] - answer_index = -1 - - output_questions += [ - QuestionJSON( - group_id, - question_id, - question, - answer_index, - answer_text, - str(datetime.strptime(rows[3].contents[0], '%d.%m.%Y').date()), - str(datetime.strptime(rows[4].contents[0], '%d.%m.%Y').date()), - multiple_choice, - )] - return output_questiongroups, output_questions - - r = self.session.get("https://bfv.sr-regeltest.de/users/sign_in") - base_url = "https://bfv.sr-regeltest.de" - - login_page = BeautifulSoup(r.content, 'html.parser') - authenticity_token = login_page.find('input', {'name': 'authenticity_token'})['value'] + self.downloader = BfvSrRegeltest(self.ui.username_lineedit.text(), self.ui.password_lineedit.text()) + self.downloader.available_questions.connect(self.receive_download_items) + self.downloader.downloaded_element.connect(self.download_done) - # login + regelgruppen_list, regelfragen_list = loop.run_until_complete(self.downloader.download_loop()) + self.data = {"question_groups": regelgruppen_list, "questions": regelfragen_list} + self.accept() + + +class BfvSrRegeltest(QObject): + base_url = "https://bfv.sr-regeltest.de" + available_questions = Signal(int) + downloaded_element = Signal() + login_successful = Signal(bool) + + def __init__(self, username, password): + super().__init__() + self.username = username + self.password = password - username = self.ui.username_lineedit.text() - password = self.ui.password_lineedit.text() + async def login(self, session): + async with session.get("/users/sign_in") as resp: + r = await resp.text() - url = 'https://bfv.sr-regeltest.de/users/sign_in' - myobj = {'authenticity_token': authenticity_token, - 'user[email]': username, - 'user[password]': password, - 'user[remember_me]': "0", - 'commit': "Anmelden"} + login_page = BeautifulSoup(r, 'html.parser') + authenticity_token = login_page.find('input', {'name': 'authenticity_token'})['value'] - login_answer = self.session.post(url, data=myobj) + url = '/users/sign_in' + myobj = {'authenticity_token': authenticity_token, + 'user[email]': self.username, + 'user[password]': self.password, + 'user[remember_me]': "0", + 'commit': "Anmelden"} - if 'Passwort ungültig' in login_answer.content.decode(): - self.session = None - self.ui.password_lineedit.setText("") + async with session.post(url, data=myobj) as resp: + r = await resp.text() + if 'Passwort ungültig' in r: + raise LoginFailedException() + + async def _fetch_question(self, session, soup_element): + rows = soup_element.findAll("td") + + question_url = rows[0].find('a').attrs['href'] + regel_id = rows[0].find('a').contents[0] + group_name = rows[1].contents[0] + + try: + int(regel_id) + regel_id = regel_id.zfill(5) + group_id = int(regel_id[0:2]) + question_id = int(regel_id[2:]) + except ValueError: + group_id = 25 + question_id = -1 + + async with session.get(question_url) as resp: + detail_page = await resp.text() + detail_page = BeautifulSoup(detail_page, 'html.parser') + content = detail_page.findAll("div", {"class": "card-body"}) + question = content[0].findAll("p")[1].contents[0].strip() + if len(content[1].findAll("tr", {"class": "wrong-answer"})) > 0: + # multiple choice! + multiple_choice = [] + for i, answers in enumerate(content[1].findAll("tr")): + multiple_choice_answer = answers.find("td").contents[0].strip() + if answers["class"][0] == 'correct-answer': + answer_index = i + answer_text = multiple_choice_answer + multiple_choice += [answers.find("td").contents[0].strip()] + else: + answer_elements = content[1].find("p") + if answer_elements: + answer_text = answer_elements.contents[0].strip() + else: + answer_text = "" + print(f"Regelgruppe {group_id} - Regel-ID {question_id} hat eine leere Antwort!") + multiple_choice = [] + answer_index = -1 + + self.downloaded_element.emit() + + return QuestionJSON( + group_id, + question_id, + question, + answer_index, + answer_text, + str(datetime.strptime(rows[3].contents[0], '%d.%m.%Y').date()), + str(datetime.strptime(rows[4].contents[0], '%d.%m.%Y').date()), + multiple_choice, + ), QuestionGroupJSON(group_id, group_name) + + @staticmethod + async def _fetch_list(session, page_number: int): + async with session.get(f'/questions?page={page_number}') as resp: + content = await resp.text() + soup = BeautifulSoup(content, 'html.parser') + return soup.find("table").find("tbody").findAll("tr") + + async def download_loop(self): + async with aiohttp.ClientSession("https://bfv.sr-regeltest.de") as session: + # login + try: + await self.login(session) + except LoginFailedException: + self.login_successful.emit(False) return - question_page_1 = self.session.get(f'https://bfv.sr-regeltest.de/questions?page=1') - soup = BeautifulSoup(question_page_1.content, 'html.parser') + self.login_successful.emit(True) + + async with session.get('/questions?page=1') as resp: + question_page_1 = await resp.text() + soup = BeautifulSoup(question_page_1, 'html.parser') last_page = int(soup.find(text="Letzte »").parent["href"].split("=")[1]) - regelfragen_tables = [] - for i in range(1, last_page + 1): - response = self.session.get(f'https://bfv.sr-regeltest.de/questions?page={i}') - soup = BeautifulSoup(response.content, 'html.parser') - regelfragen_tables += soup.find("table").find("tbody").findAll("tr") + + tasks = [asyncio.ensure_future(self._fetch_list(session, page_number)) for page_number in + range(1, last_page + 1)] + regelfragen_tables = [item for sublist in await asyncio.gather(*tasks) for item in sublist] print(f"{len(regelfragen_tables)} Regelfragen gefunden!") + self.available_questions.emit(len(regelfragen_tables)) + + tasks = [asyncio.ensure_future(self._fetch_question(session, soup_set)) for soup_set in + regelfragen_tables] + responses = await asyncio.gather(*tasks) + + regelfragen, regelgruppen = list(zip(*responses)) - regelgruppen, regelfragen = parse_regelfragen(regelfragen_tables, self.session) - regelgruppen_sorted = list(regelgruppen.values()) - regelgruppen_sorted = sorted(regelgruppen_sorted, key=lambda x: x.id) + regelgruppen_sorted = sorted(regelgruppen, key=lambda x: x.id) + re_id = defaultdict(lambda: 1) + + for question in regelfragen: + if question.question_id == -1: + question.question_id = re_id[question.group_id] + re_id[question.group_id] += 1 regelgruppen_list = [regelgruppe.toDict() for regelgruppe in regelgruppen_sorted] + regelgruppen_filtered = [] + for group in regelgruppen_list: + if group not in regelgruppen_filtered: + regelgruppen_filtered += [group] regelfragen_list = [regelfrage.toDict() for regelfrage in regelfragen] - self.data = {"question_groups": regelgruppen_list, "questions": regelfragen_list} - self.accept() + return regelgruppen_filtered, regelfragen_list class DownloadProgress(QDialog, Ui_DownloadProgress):