Spaces:
Runtime error
Runtime error
| from contextlib import contextmanager | |
| import gradio as gr | |
| from database.operation import * | |
| from memorize import * | |
| from database import SessionLocal, engine, Base | |
| import database.schema as schema | |
| from database import constant | |
| import time | |
| import asyncio | |
| import pandas as pd | |
| from collections import defaultdict | |
| from datetime import datetime | |
| Base.metadata.create_all(bind=engine) | |
| db = SessionLocal() | |
| def session_scope(): | |
| try: | |
| yield db | |
| db.commit() | |
| except Exception: | |
| db.rollback() | |
| raise | |
| finally: | |
| db.close() | |
| intro = """\ | |
| 目标场景:只考虑记住单词及其意思,使得能无障碍阅读,不考虑用于写作。 | |
| 主要想法:批量记单词,每批 n 个单词,这 n 个单词用 AI 生成故事,复述故事即可记住单词。 | |
| 为什么? | |
| - 批量记单词,一次可以记住 n 个单词,而不是一个一个记,效率高。 | |
| - 复述故事,即费曼学习法,故事是单词的记忆之锚。 | |
| - 复述故事而不是复述单词,故事具有连续性,更符合人类天性,容易记。 | |
| ### 使用建议 | |
| 1. 记单词前,先完整过一遍全部单词,剔除已记住的单词,从而提高新词密度 | |
| 2. 先看单词表格,然后看英文故事,对照中文完成记忆 | |
| 3. 记忆完成后需要一个一个勾选已记住的单词,勾选时尝试复述单词意思,以此来检验记忆效果 | |
| > 本项目基于[开源数据集](https://github.com/LinXueyuanStdio/DictionaryData),并且[开源代码](https://github.com/LinXueyuanStdio/oh-my-words),欢迎大家贡献代码~ | |
| """ | |
| with gr.Blocks(title="批量记单词") as demo: | |
| # gr.Markdown("# 批量记单词") | |
| gr.HTML("<h1 align=\"center\">批量记单词</h1>") | |
| user = gr.State(value={}) | |
| # 0. 登录 | |
| with gr.Tab("主页"): | |
| gr.Markdown(intro) | |
| gr.Markdown(f"共 {get_book_count(db)} 本书") | |
| gr.HTML("""<iframe src="https://ghbtns.com/github-btn.html?user=LinXueyuanStdio&repo=oh-my-words&type=star&count=true&size=small" frameborder="0" scrolling="0" width="170" height="30" title="GitHub"></iframe>""") | |
| with gr.Row(): | |
| with gr.Column(): | |
| email = gr.TextArea(value=constant.email, lines=1, label="邮箱") | |
| password = gr.TextArea(value=constant.password, lines=1, label="密码") | |
| login_btn = gr.Button("登录") | |
| with gr.Column(): | |
| register_email = gr.TextArea(value='', lines=1, label="邮箱") | |
| register_password = gr.TextArea(value='', lines=1, label="密码") | |
| register_btn = gr.Button("立即注册", variant="primary") | |
| user_status = gr.Textbox("", lines=1, label="用户状态") | |
| # 1. 创建记忆计划 | |
| tab1 = gr.Tab("创建记忆计划", visible=False) | |
| with tab1: | |
| select_book = gr.Dropdown([], label="单词书", info="选择一本单词书") | |
| batch_size = gr.Number(value=10, label="批次大小") | |
| randomize = gr.Checkbox(value=True, label="以单词乱序进行记忆") | |
| title = gr.TextArea(value='单词书', lines=1, label="记忆计划的名称") | |
| btn = gr.Button("创建记忆计划") | |
| status = gr.Textbox("", lines=1, label="状态") | |
| def submit(user: Dict[str, str], book, title, randomize, batch_size): | |
| user_id = user.get("id", None) | |
| if user_id is None: | |
| gr.Error("请先登录") | |
| return "请先登录" | |
| book_id = book.split(" [")[1][:-1] | |
| user_book = create_user_book(db, UserBookCreate( | |
| owner_id=user_id, | |
| book_id=book_id, | |
| title=title, | |
| random=randomize, | |
| batch_size=batch_size | |
| )) | |
| if user_book is not None: | |
| return "成功" | |
| else: | |
| return "失败" | |
| btn.click(submit, [user, select_book, title, randomize, batch_size], [status]) | |
| def on_select(user: Dict[str, str], evt: gr.SelectData): | |
| user_id = user.get("id", None) | |
| new_options = [] | |
| if user_id is None: | |
| return gr.Dropdown(choices=new_options), "请先登录" | |
| books = get_all_books_for_user(db, user_id) | |
| new_options = [f"{'⭐ ' if book.permission == 'private' else ''}{book.bk_name} (共 {book.bk_item_num} 词) [{book.bk_id}]" for book in books] | |
| return gr.Dropdown(choices=new_options), f"您好,{user['email']}" | |
| tab1.select(on_select, [user], [select_book, status]) | |
| # 2. 选择单词分批 | |
| with gr.Tab("选择单词分批") as tab2: | |
| select_user_book = gr.Dropdown( | |
| [], label="记忆计划", info="请选择记忆计划" | |
| ) | |
| word_count = gr.Number(value=0, label="单词个数") | |
| known_words = gr.CheckboxGroup( | |
| [], label="已学会的单词", info="正式记忆前将去除已学会的单词,提高每个批次的新词密度,进而提高效率" | |
| ) | |
| btn = gr.Button("生成批次") | |
| status = gr.Textbox("3000 词大概要 2 小时才能写完所有的故事", lines=1, label="生成结果") | |
| def on_select_user(user): | |
| user_id = user.get("id", None) | |
| if user_id is None: | |
| gr.Error("请先登录") | |
| return gr.Dropdown(choices=[]), "请先登录" | |
| new_options = [] | |
| user_book = get_user_books_by_owner_id(db, user_id) | |
| new_options = [f"{book.title} | {book.batch_size}个单词一组 [{book.id}]" for book in user_book] | |
| return gr.Dropdown(choices=new_options), "3000 词大概要 2 小时才能写完所有的故事" | |
| def on_select_user_book(user_book): | |
| logger.debug(f'user_book {user_book}') | |
| if user_book is None: | |
| return 0, gr.CheckboxGroup(choices=[]) | |
| new_options = [] | |
| user_book_id = user_book.split(" [")[1][:-1] | |
| user_book = get_user_book(db, user_book_id) | |
| book_id = user_book.book_id | |
| book = get_book(db, book_id) | |
| if book is None: | |
| return 0, gr.CheckboxGroup(choices=[]) | |
| words = get_words_for_book(db, user_book) | |
| new_options = [f"{word.vc_vocabulary}" for word in words] | |
| return len(words), gr.CheckboxGroup(choices=new_options) | |
| select_user_book.select(on_select_user_book, inputs=[select_user_book], outputs=[word_count, known_words]) | |
| tab2.select(on_select_user, [user], [select_user_book, status]) | |
| def submit(user_book, known_words): | |
| start_time = time.time() | |
| user_book_id = user_book.split(" [")[1][:-1] | |
| user_book = get_user_book(db, user_book_id) | |
| all_words = get_words_for_book(db, user_book) | |
| unknown_words = [] | |
| for w in all_words: | |
| if w.vc_vocabulary not in known_words: | |
| unknown_words.append(w) | |
| track(db, user_book, unknown_words) | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| return f"成功!分为 {len(unknown_words) // user_book.batch_size} 个批次,共 {len(unknown_words)} 个单词,耗时 {duration:.2f} 秒" | |
| btn.click(submit, [select_user_book, known_words], [status]) | |
| # 3. 记忆 | |
| with gr.Tab("记忆") as tab3: | |
| select_user_book = gr.Dropdown( | |
| [], label="记忆计划", info="请选择记忆计划" | |
| ) | |
| info = gr.Accordion(f"新词", open=False) | |
| with info: | |
| gr.Markdown(f"新词") | |
| dataframe_header = ["单词", "中文词意", "英式音标", "美式音标", "记忆量"] | |
| memorizing_dataframe = gr.Dataframe( | |
| headers=dataframe_header, | |
| datatype=["str"] * len(dataframe_header), | |
| col_count=(len(dataframe_header), "fixed"), | |
| wrap=True, | |
| ) | |
| batches = gr.State(value=[]) | |
| current_batch_index = gr.State(value=-1) | |
| user_book_id = gr.State(value=None) | |
| with gr.Row(): | |
| # story = gr.HighlightedText([]) | |
| # translated_story = gr.HighlightedText([]) | |
| # story = gr.Textbox() | |
| # translated_story = gr.Textbox() | |
| story = gr.Markdown() | |
| translated_story = gr.Markdown() | |
| # 试了一下,还是 markdown 的显示效果好 | |
| memorize_action = gr.CheckboxGroup(choices=[], label="记住的单词", info="能够复述出意思才算记住") | |
| with gr.Row(): | |
| previous_batch_btn = gr.Button("上一批") | |
| regenerate_btn = gr.Button("重新生成故事") | |
| next_batch_btn = gr.Button("下一批", variant="primary") | |
| progress = gr.Slider(1, 1, value=1, step=1, label="进度", info="") | |
| def on_select_user(user): | |
| user_id = user.get("id", None) | |
| if user_id is None: | |
| gr.Error("请先登录") | |
| return gr.Dropdown(choices=[]) | |
| new_options = [] | |
| user_book = get_user_books_by_owner_id(db, user_id) | |
| new_options = [f"{book.title} | {book.batch_size}个单词一组 [{book.id}]" for book in user_book] | |
| return gr.Dropdown(choices=new_options) | |
| def update_from_batch(memorizing_batch: UserMemoryBatch): | |
| new_options = [] | |
| word_df = [] | |
| # logger.debug(get_user_memory_batch(db, memorizing_batch.id)) | |
| # logger.debug(memorizing_batch.id) | |
| # logger.debug(get_user_memory_words_by_batch_id(db, memorizing_batch.id)) | |
| # logger.debug(get_words_by_ids(db, [w.word_id for w in get_user_memory_words_by_batch_id(db, memorizing_batch.id)])) | |
| # words = get_words_in_batch(db, memorizing_batch.id) | |
| # words = get_words_by_ids(db, [w.word_id for w in memorizing_words]) | |
| memorizing_words = get_user_memory_words_by_batch_id(db, memorizing_batch.id) | |
| words = get_words_by_ids(db, [w.word_id for w in memorizing_words]) | |
| # 统计记忆量 | |
| actions = get_actions_at_each_word(db, [w.word_id for w in memorizing_words]) | |
| remember_count = defaultdict(int) | |
| forget_count = defaultdict(int) | |
| for a in actions: | |
| if a.action == "remember": | |
| remember_count[a.word_id] += 1 | |
| else: | |
| forget_count[a.word_id] += 1 | |
| # 统计记忆效率 | |
| batch_actions = get_user_memory_batch_actions_by_user_memory_batch_id(db, memorizing_batch.id) | |
| batch_actions.sort(key=lambda x: x.create_time) | |
| start, end = None, None | |
| total_duration = None | |
| for a in batch_actions: | |
| if a.action == "start": | |
| start: datetime = a.create_time | |
| elif a.action == "end": | |
| end: datetime = a.create_time | |
| if start is None: | |
| continue | |
| if total_duration is None: | |
| total_duration = end - start | |
| else: | |
| total_duration += end - start | |
| memory_speed = f"{memorizing_batch.batch_type}" | |
| if total_duration is not None: | |
| sec = total_duration.total_seconds() | |
| minutes = sec / 60 | |
| memory_speed += f":当前批次记忆效率 {len(memorizing_words) / minutes:.2f} 词/分钟,{minutes:.2f} 分钟/批次" | |
| # 单词信息表格与勾选 | |
| for w in words: | |
| new_options.append(f"{w.vc_vocabulary}") | |
| word_df.append([ | |
| w.vc_vocabulary, # 单词 | |
| w.vc_translation, # 中文词意 | |
| w.vc_phonetic_uk, # 英式音标 | |
| w.vc_phonetic_us, # 美式音标 | |
| f"{remember_count[w.vc_id]} / {remember_count[w.vc_id] + forget_count[w.vc_id]}", # 记忆量 | |
| ]) | |
| df = pd.DataFrame(word_df, columns=dataframe_header) | |
| if memorizing_batch.batch_type == "回忆": | |
| df = pd.DataFrame([[row[0], "", row[2], row[3], row[4]] for row in word_df], columns=dataframe_header) | |
| # 故事 | |
| story = memorizing_batch.story | |
| translated_story = memorizing_batch.translated_story | |
| if len(story) == 0 or len(translated_story) == 0: | |
| story, translated_story = regenerate_for_batch(memorizing_batch, words) | |
| logger.info("计算批次信息") | |
| logger.info(new_options) | |
| logger.info(story) | |
| logger.info(translated_story) | |
| logger.info("=" * 8) | |
| return (gr.Accordion(label=memory_speed), df, story, translated_story, gr.CheckboxGroup(choices=new_options)) | |
| def on_select_user_book(user_book_id: str): | |
| """ | |
| 1. 当前单词 | |
| 2. 对当前单词的操作 | |
| 3. 故事 | |
| """ | |
| logger.debug(f'user_book {user_book_id}') | |
| if user_book_id is None: | |
| # 为什么会空?这里返回的东西可能会爆炸,但好像执行不到这里 | |
| # 不管了,放个告示牌在这里,大家看见这个坑请绕着走 | |
| return [], gr.CheckboxGroup(choices=[]) | |
| user_book_id: str = user_book_id.split(" [")[1][:-1] | |
| user_book = get_user_book(db, user_book_id) | |
| batches = get_new_user_memory_batches_by_user_book_id(db, user_book_id) # 只缓存新词 | |
| batch_id = user_book.memorizing_batch | |
| memorizing_batch = get_user_memory_batch(db, batch_id) | |
| current_batch_index = -1 | |
| if memorizing_batch is not None: | |
| for index, b in enumerate(batches): | |
| if b.id == memorizing_batch.id: | |
| current_batch_index = index | |
| break | |
| if current_batch_index == -1: | |
| # 当前还没开始记忆,或者当前批次不是新词批次 | |
| current_batch_index = 0 | |
| memorizing_batch = batches[0] | |
| batch_id = memorizing_batch.id | |
| user_book.memorizing_batch = batch_id | |
| update_user_book(db, user_book_id, UserBookUpdate( | |
| owner_id=user_book.owner_id, | |
| book_id=user_book.book_id, | |
| title=user_book.title, | |
| random=user_book.random, | |
| batch_size=user_book.batch_size, | |
| memorizing_batch=batch_id | |
| )) | |
| updates = update_from_batch(memorizing_batch) | |
| on_batch_start(db, memorizing_batch.id) | |
| asyncio.run(pregenerate(batches, current_batch_index)) | |
| return (batches, current_batch_index, user_book) + updates + ( | |
| gr.Slider( | |
| minimum=1, | |
| maximum=len(batches), | |
| value=current_batch_index, | |
| ),) | |
| batch_widget = [info, memorizing_dataframe, story, translated_story, memorize_action] | |
| tab3.select(on_select_user, inputs=[user], outputs=[select_user_book]) | |
| select_user_book.select( | |
| on_select_user_book, | |
| inputs=[select_user_book], | |
| outputs=[batches, current_batch_index, user_book_id] + batch_widget + [progress] | |
| ) | |
| async def worker_regenerate_for_batch(batches: List[UserMemoryBatch], index: int): | |
| started_at = time.monotonic() | |
| logger.info(f"started {index}") | |
| # start | |
| batch = batches[index] | |
| story = batch.story | |
| translated_story = batch.translated_story | |
| if len(story) == 0 or len(translated_story) == 0: | |
| batch_words = get_words_in_batch(db, batch.id) | |
| regenerate_for_batch(batch, batch_words) | |
| # end | |
| total = time.monotonic() - started_at | |
| logger.info(f'completed in {total:.2f} seconds') | |
| async def pregenerate(batches: List[UserMemoryBatch], current_batch_index: int): | |
| logger.info("开始预生成故事") | |
| indexes = [current_batch_index+i+1 for i in range(3)]+[current_batch_index-i-1 for i in range(3)] | |
| indexes = [i for i in indexes if 0 <= i < len(batches)] | |
| for index in indexes: | |
| asyncio.ensure_future(worker_regenerate_for_batch(batches, index)) | |
| logger.info("结束预生成故事") | |
| def submit_batch(batches: List[UserMemoryBatch], current_batch_index: int): | |
| memorizing_batch = batches[current_batch_index] | |
| return set_memorizing_batch(batches, current_batch_index, memorizing_batch) | |
| def set_memorizing_batch(batches: List[UserMemoryBatch], current_batch_index: int, memorizing_batch: UserMemoryBatch): | |
| updates = update_from_batch(memorizing_batch) | |
| asyncio.run(pregenerate(batches, current_batch_index)) | |
| logger.info("pregenerated") | |
| return updates + (gr.Slider(value=current_batch_index+1), current_batch_index) | |
| def save_progress(old_batch: UserMemoryBatch, memorize_action: List[str]): | |
| # 保存单词记忆进度 | |
| actions = [] | |
| words = get_words_in_batch(db, old_batch.id) | |
| for word in words: | |
| if word.vc_vocabulary in memorize_action: | |
| actions.append((word.vc_id, "remember")) | |
| else: | |
| actions.append((word.vc_id, "forget")) | |
| save_memorizing_word_action(db, old_batch.id, actions) | |
| def previous_batch(batches: List[UserMemoryBatch], current_batch_index: int, user_book: schema.UserBook, memorize_action: List[str]): | |
| old_index = current_batch_index | |
| if current_batch_index <= 0: | |
| current_batch_index = 0 | |
| elif current_batch_index > 0: | |
| current_batch_index -= 1 | |
| if current_batch_index != old_index: | |
| # 下一页之前需要保存记忆进度 | |
| # logger.info("下一页之前需要保存记忆进度") | |
| # logger.info(memorize_action) | |
| # 保存批次进度 | |
| old_batch = batches[old_index] | |
| current_batch = batches[current_batch_index] | |
| save_progress(old_batch, memorize_action) | |
| on_batch_end(db, old_batch.id) | |
| on_batch_start(db, current_batch.id) | |
| user_book_id = user_book.id | |
| update_user_book_memorizing_batch(db, user_book_id, current_batch.id) | |
| return submit_batch(batches, current_batch_index) | |
| def next_batch(batches: List[UserMemoryBatch], current_batch_index: int, user_book: schema.UserBook, memorize_action: List[str]): | |
| old_index = current_batch_index | |
| if current_batch_index >= len(batches)-1: | |
| current_batch_index = len(batches)-1 | |
| elif current_batch_index < len(batches) - 1: | |
| current_batch_index += 1 | |
| if current_batch_index != old_index: | |
| # 下一页之前需要保存记忆进度 | |
| # logger.info("下一页之前需要保存记忆进度") | |
| # logger.info(memorize_action) | |
| # 保存批次进度 | |
| old_batch = batches[old_index] | |
| memorizing_batch = get_user_memory_batch(db, user_book.memorizing_batch) | |
| if memorizing_batch is not None: | |
| old_batch = memorizing_batch | |
| current_batch = batches[current_batch_index] | |
| save_progress(old_batch, memorize_action) | |
| on_batch_end(db, old_batch.id) | |
| next_batch = generate_next_batch(db, user_book, minutes=60, k=3) | |
| if next_batch is not None: | |
| current_batch = next_batch | |
| on_batch_start(db, current_batch.id) | |
| user_book_id = user_book.id | |
| update_user_book_memorizing_batch(db, user_book_id, current_batch.id) | |
| if next_batch is not None: | |
| return set_memorizing_batch(batches, old_index, current_batch) | |
| else: | |
| return set_memorizing_batch(batches, current_batch_index, current_batch) | |
| else: | |
| memorizing_batch = get_user_memory_batch(db, user_book.memorizing_batch) | |
| current_batch = batches[current_batch_index] | |
| save_progress(memorizing_batch, memorize_action) | |
| on_batch_end(db, memorizing_batch.id) | |
| next_batch = generate_next_batch(db, user_book, minutes=60, k=3) | |
| if next_batch is not None: | |
| current_batch = next_batch | |
| on_batch_start(db, current_batch.id) | |
| user_book_id = user_book.id | |
| update_user_book_memorizing_batch(db, user_book_id, current_batch.id) | |
| if next_batch is not None: | |
| return set_memorizing_batch(batches, old_index, current_batch) | |
| else: | |
| return set_memorizing_batch(batches, current_batch_index, current_batch) | |
| previous_batch_btn.click( | |
| previous_batch, | |
| inputs=[batches, current_batch_index, user_book_id, memorize_action], | |
| outputs=batch_widget + [progress, current_batch_index] | |
| ) | |
| next_batch_btn.click( | |
| next_batch, | |
| inputs=[batches, current_batch_index, user_book_id, memorize_action], | |
| outputs=batch_widget + [progress, current_batch_index] | |
| ) | |
| def regenerate_for_batch(memorizing_batch: UserMemoryBatch, batch_words: List[Word]): | |
| batch_words_str_list = [word.vc_vocabulary for word in batch_words] | |
| logger.info(f"生成故事 {batch_words_str_list}") | |
| story, translated_story = generate_story_and_translated_story(batch_words_str_list) | |
| memorizing_batch.story = story | |
| memorizing_batch.translated_story = translated_story | |
| db.commit() | |
| db.refresh(memorizing_batch) | |
| create_user_memory_batch_generation_history(db, UserMemoryBatchGenerationHistoryCreate( | |
| batch_id=memorizing_batch.id, | |
| story=story, | |
| translated_story=translated_story | |
| )) | |
| logger.info(story) | |
| logger.info(translated_story) | |
| return story, translated_story | |
| def regenerate(batches: List[UserMemoryBatch], current_batch_index: int): | |
| # 重新生成故事 | |
| memorizing_batch = batches[current_batch_index] | |
| batch_words = get_words_in_batch(db, memorizing_batch.id) | |
| story, translated_story = regenerate_for_batch(memorizing_batch, batch_words) | |
| return story, translated_story | |
| regenerate_btn.click(regenerate, inputs=[batches, current_batch_index], outputs=[story, translated_story]) | |
| # 4. 从记忆计划中创建单词书 | |
| with gr.Tab("从记忆计划中创建单词书") as tab4: | |
| select_user_book = gr.Dropdown( | |
| [], label="记忆计划", info="请选择记忆计划" | |
| ) | |
| word_count = gr.Number(value=0, label="单词个数") | |
| known_words = gr.CheckboxGroup( | |
| [], label="已学会的单词", info="请检查已学会的单词,这些单词将不会被包含在新的单词书中" | |
| ) | |
| title = gr.TextArea(value='单词书', lines=1, label="单词书的名称") | |
| btn = gr.Button("从记忆计划中创建单词书") | |
| status = gr.Textbox("", lines=1, label="状态") | |
| def on_select_user(user): | |
| user_id = user.get("id", None) | |
| if user_id is None: | |
| gr.Error("请先登录") | |
| return gr.Dropdown(choices=[]) | |
| new_options = [] | |
| user_book = get_user_books_by_owner_id(db, user_id) | |
| new_options = [f"{book.title} | {book.batch_size}个单词一组 [{book.id}]" for book in user_book] | |
| return gr.Dropdown(choices=new_options) | |
| def on_select_user_book(user_book): | |
| logger.debug(f'user_book {user_book}') | |
| if user_book is None: | |
| return 0, gr.CheckboxGroup(choices=[]) | |
| new_options = [] | |
| user_book_id = user_book.split(" [")[1][:-1] | |
| words = get_words_in_user_book(db, user_book_id) | |
| new_options = [f"{word.vc_vocabulary}" for word in words] | |
| return len(words), gr.CheckboxGroup(choices=new_options) | |
| tab4.select(on_select_user, inputs=[user], outputs=[select_user_book]) | |
| select_user_book.select(on_select_user_book, inputs=[select_user_book], outputs=[word_count, known_words]) | |
| def submit(user, user_book, known_words, title): | |
| user_id = user.get("id", None) | |
| if user_id is None: | |
| gr.Error("请先登录") | |
| return "请先登录" | |
| user_book_id = user_book.split(" [")[1][:-1] | |
| all_words = get_words_in_user_book(db, user_book_id) | |
| unknown_words = [] | |
| for w in all_words: | |
| if w.vc_vocabulary not in known_words: | |
| unknown_words.append(w) | |
| # all_words = get_words_by_vocabulary(db, known_words) | |
| book = save_words_as_book(db, user_id, unknown_words, title) | |
| if book is not None: | |
| return f"成功生成一本单词书:{book.bk_name}" | |
| else: | |
| return "失败" | |
| btn.click(submit, [user, select_user_book, known_words, title], [status]) | |
| # 5. 统计 | |
| with gr.Tab("统计") as tab5: | |
| # 5.1. 故事生成历史 | |
| with gr.Tab("AI 历史记录") as tab51: | |
| select_user_book = gr.Dropdown( | |
| [], label="记忆计划", info="请选择记忆计划" | |
| ) | |
| history_header = ["单词", "故事", "中文故事", "生成时间"] | |
| history_dataframe = gr.Dataframe( | |
| headers=history_header, | |
| datatype=["str"] * len(history_header), | |
| col_count=(len(history_header), "fixed"), | |
| wrap=True, | |
| min_width=320, | |
| height=800, | |
| ) | |
| def on_select_user(user): | |
| user_id = user.get("id", None) | |
| if user_id is None: | |
| gr.Error("请先登录") | |
| return gr.Dropdown(choices=[]) | |
| new_options = [] | |
| user_book = get_user_books_by_owner_id(db, user_id) | |
| new_options = [f"{book.title} | {book.batch_size}个单词一组 [{book.id}]" for book in user_book] | |
| return gr.Dropdown(choices=new_options) | |
| def on_select_user_book(user_book_id): | |
| logger.debug(f'user_book {user_book_id}') | |
| if user_book_id is None: | |
| return 0, gr.CheckboxGroup(choices=[]) | |
| user_book_id = user_book_id.split(" [")[1][:-1] | |
| batch_id_to_words_and_history = get_generation_hostorys_by_user_book_id(db, user_book_id) | |
| data = [] | |
| for batch_id, (words, histories) in batch_id_to_words_and_history.items(): | |
| for history in histories: | |
| word = ", ".join([w.vc_vocabulary for w in words]) | |
| story = history.story | |
| translated_story = history.translated_story | |
| create_time = history.create_time | |
| data.append([word, story, translated_story, create_time]) | |
| df = pd.DataFrame(data, columns=history_header) | |
| return df | |
| tab51.select(on_select_user, inputs=[user], outputs=[select_user_book]) | |
| select_user_book.select(on_select_user_book, inputs=[select_user_book], outputs=[history_dataframe]) | |
| # 5.2. 记忆历史记录 | |
| with gr.Tab("记忆历史记录") as tab52: | |
| select_user_book = gr.Dropdown( | |
| [], label="记忆计划", info="请选择记忆计划" | |
| ) | |
| batch_history_header = ["单词", "故事", "中文故事", "批次类型", "记忆情况", "生成时间"] | |
| batch_history_dataframe = gr.Dataframe( | |
| headers=batch_history_header, | |
| datatype=["str"] * len(batch_history_header), | |
| col_count=(len(batch_history_header), "fixed"), | |
| wrap=True, | |
| min_width=320, | |
| height=800, | |
| ) | |
| def on_select_user(user): | |
| user_id = user.get("id", None) | |
| if user_id is None: | |
| gr.Error("请先登录") | |
| return gr.Dropdown(choices=[]) | |
| new_options = [] | |
| user_book = get_user_books_by_owner_id(db, user_id) | |
| new_options = [f"{book.title} | {book.batch_size}个单词一组 [{book.id}]" for book in user_book] | |
| return gr.Dropdown(choices=new_options) | |
| def on_select_user_book(user_book_id): | |
| logger.debug(f'user_book {user_book_id}') | |
| if user_book_id is None: | |
| return 0, gr.CheckboxGroup(choices=[]) | |
| user_book_id = user_book_id.split(" [")[1][:-1] | |
| actions, batch_id_to_batch, batch_id_to_words, batch_id_to_actions = get_user_memory_batch_history(db, user_book_id) | |
| data = [] | |
| for action in actions: | |
| batch_id = action.batch_id | |
| words = batch_id_to_words[batch_id] | |
| word = ", ".join([w.vc_vocabulary for w in words]) | |
| batch = batch_id_to_batch[batch_id] | |
| story = batch.story | |
| translated_story = batch.translated_story | |
| batch_type = batch.batch_type | |
| memory_actions = batch_id_to_actions.get(batch_id, []) | |
| remember_word_ids = {a.word_id for a in memory_actions if a.action == "remember"} | |
| remember_words = [] | |
| forget_words = [] | |
| for w in words: | |
| if w.vc_id in remember_word_ids: | |
| remember_words.append(w.vc_vocabulary) | |
| else: | |
| forget_words.append(w.vc_vocabulary) | |
| memory_status = f"记住 {len(remember_words)} 个单词,忘记 {len(forget_words)} 个单词" | |
| memory_status += f",记住的单词:{', '.join(remember_words)}" | |
| memory_status += f",忘记的单词:{', '.join(forget_words)}" | |
| create_time = action.create_time | |
| data.append([word, story, translated_story, batch_type, memory_status, create_time]) | |
| df = pd.DataFrame(data, columns=batch_history_header) | |
| return df | |
| tab52.select(on_select_user, inputs=[user], outputs=[select_user_book]) | |
| select_user_book.select(on_select_user_book, inputs=[select_user_book], outputs=[batch_history_dataframe]) | |
| on_login_success_ui = [email, password, login_btn, register_email, register_password, register_btn] | |
| on_login_success_ui += [tab1] | |
| def on_login(login_success): | |
| return ( | |
| gr.TextArea(visible=not login_success), | |
| gr.TextArea(visible=not login_success), | |
| gr.Button(visible=not login_success), | |
| gr.TextArea(visible=not login_success), | |
| gr.TextArea(visible=not login_success), | |
| gr.Button(visible=not login_success), | |
| # gr.Accordion(visible=not login_success), | |
| ) + ( | |
| gr.Tab(visible=login_success), | |
| ) | |
| def login(email, password): | |
| user = get_user_by_email(db, email) | |
| if password is None or len(password) == 0: | |
| return { | |
| "id": "", | |
| "email": "", | |
| }, "登录失败", *on_login(False) | |
| if user is None or not user.verify_password(password): | |
| return { | |
| "id": "", | |
| "email": "", | |
| }, "登录失败", *on_login(False) | |
| return { | |
| "id": user.id, | |
| "email": user.email, | |
| }, "登录成功", *on_login(True) | |
| login_btn.click(login, [email, password], [user, user_status] + on_login_success_ui) | |
| def register(email, password): | |
| user = get_user_by_email(db, email) | |
| if user is not None: | |
| return { | |
| "id": "", | |
| "email": "", | |
| }, "注册失败,该邮箱已被注册", *on_login(False) | |
| else: | |
| user = create_user(db, email=email, password=password) | |
| return { | |
| "id": user.id, | |
| "email": user.email, | |
| }, "注册并登录成功", *on_login(True) | |
| register_btn.click(register, [register_email, register_password], [user, user_status] + on_login_success_ui) | |
| if __name__ == "__main__": | |
| # import os | |
| # os.environ["no_proxy"] = "localhost,127.0.0.1,::1" | |
| # demo.launch(server_name="127.0.0.1", server_port=8090, debug=True) | |
| logger.add(f"output/logs/web_{date_str}.log", rotation="1 day", retention="7 days", level="INFO") | |
| demo.launch() | |