Spaces:
Runtime error
Runtime error
| import threading | |
| import random | |
| from typing import Tuple | |
| import re | |
| import copy | |
| import os | |
| import json | |
| import openai | |
| import time | |
| openai.api_key = os.environ["OPENAI_KEY"] | |
| SIZE = 15 | |
| RIDDLE_COUNT = 3 | |
| MIN_WORD_SIZE = 4 | |
| MAX_WORD_SIZE = 10 | |
| WORD_LIMIT = 7500 | |
| COMPLETE_GAME_TIMEOUT = 60 * 60 | |
| INCOMPLETE_GAME_TIMEOUT = 60 * 60 * 24 | |
| GUESS_TIMEOUT = 30 | |
| with open("words.json", "r") as f: | |
| words = json.loads(f.read()) | |
| with open("prompt.txt", "r") as f: | |
| RIDDLE_PROMPT = f.read() | |
| search_text = "\n".join(words) | |
| games = {} | |
| all_games_lock = threading.Lock() | |
| def get_riddle(answer): | |
| prompt = RIDDLE_PROMPT.format( | |
| answer, | |
| ) | |
| while True: | |
| try: | |
| completions = openai.Completion.create( | |
| engine="text-davinci-003", prompt=prompt, max_tokens=200, n=1, temperature=0.5 | |
| ) | |
| riddle = completions["choices"][0]["text"] | |
| return riddle | |
| except Exception as e: | |
| print("OpenAI Error", e, "Retrying...") | |
| time.sleep(0.5) | |
| class Clue: | |
| def __init__(self, answer, location, across, solved): | |
| self.answer: str = answer | |
| self.location: Tuple[int, int] = location | |
| self.across: bool = across | |
| self.solved: bool = solved | |
| self.riddle: str = "" | |
| self.create_time: int | |
| self.solver: str = None | |
| self.timed_out: bool = False | |
| def __repr__(self): | |
| return f"{self.answer}: {self.location}, {'Across' if self.across else 'Down'}, {'Solved' if self.solved else 'Unsolved'}" | |
| class Game: | |
| def __init__(self, room_name, competitive, init_word): | |
| self.room_name = room_name | |
| self.competitive = competitive | |
| self.player_scores = {} | |
| self.grid = [[None for i in range(SIZE)] for j in range(SIZE)] | |
| self.grid = place_on_grid( | |
| self.grid, init_word, (SIZE // 2, SIZE // 2 - len(init_word) // 2), True | |
| ) | |
| self.clues = [None] * RIDDLE_COUNT | |
| self.previous_clues = [] | |
| self.lock = threading.Lock() | |
| self.complete = False | |
| self.last_update_index = 0 | |
| self.last_update_time = time.time() | |
| self.last_riddle_update_time = None | |
| self.pending_request = False | |
| games[room_name] = self | |
| def update(self): | |
| self.last_update_index += 1 | |
| self.last_update_time = time.time() | |
| def replace_clue(self, index): | |
| clue_grid = copy.deepcopy(self.grid) | |
| for j, clue in enumerate(self.clues): | |
| if clue and index != j: | |
| clue_grid = place_on_grid(clue_grid, clue.answer, clue.location, clue.across) | |
| if self.clues[index]: | |
| self.previous_clues.append(self.clues[index]) | |
| clue = find_clue(clue_grid) | |
| if clue is None: | |
| self.complete = True | |
| return | |
| clue.create_time = time.time() | |
| self.pending_request = True | |
| clue.riddle = get_riddle(clue.answer) | |
| self.pending_request = False | |
| self.last_riddle_update_time = time.time() | |
| self.clues[index] = clue | |
| def add_player(self, player_name): | |
| with self.lock: | |
| self.update() | |
| if player_name not in self.player_scores: | |
| self.player_scores[player_name] = 0 | |
| def player_guess(self, player_name, guess): | |
| guess = guess.lower() | |
| if self.pending_request: | |
| return | |
| with self.lock: | |
| self.update() | |
| matched_clues = [ | |
| clue for clue in self.clues if not clue.solved and clue.answer == guess | |
| ] | |
| if len(matched_clues) == 0: | |
| return False | |
| for clue in matched_clues: | |
| clue.solved = True | |
| clue.solver = player_name | |
| place_on_grid(self.grid, clue.answer, clue.location, clue.across) | |
| self.player_scores[player_name] += 1 | |
| def place_on_grid(grid, word, location, across): | |
| x, y = location | |
| if across: | |
| grid[x][y : y + len(word)] = word | |
| else: | |
| for i, letter in enumerate(word): | |
| grid[x + i][y] = letter | |
| return grid | |
| def find_clue(grid) -> Clue: | |
| all_coordinate_pairs = [ | |
| (i, j) for i in range(SIZE) for j in range(SIZE) if grid[i][j] is not None | |
| ] | |
| random.shuffle(all_coordinate_pairs) | |
| for i, j in all_coordinate_pairs: | |
| regexes = [] | |
| if (j == 0 or grid[i][j - 1] is None) and ( | |
| j + 1 == SIZE or grid[i][j + 1] is None | |
| ): | |
| running_regex = "" | |
| possible_across_regexes = [] | |
| for k in range(j, -1, -1): | |
| if ( | |
| (i != 0 and grid[i - 1][k] is not None) | |
| or (i != SIZE - 1 and grid[i + 1][k] is not None) | |
| ) and grid[i][k] is None: | |
| break | |
| valid = k == 0 or grid[i][k - 1] is None | |
| running_regex = (grid[i][k] or ".") + running_regex | |
| possible_across_regexes.append(((i, k), running_regex, valid)) | |
| possible_across_regexes = [p for p in possible_across_regexes if p[2]] | |
| for k in range(j + 1, SIZE): | |
| if ( | |
| (i != 0 and grid[i - 1][k] is not None) | |
| or (i != SIZE - 1 and grid[i + 1][k] is not None) | |
| ) and grid[i][k] is None: | |
| break | |
| valid = k == SIZE - 1 or grid[i][k + 1] is None | |
| for start, possible_across_regex, _ in possible_across_regexes[:]: | |
| if start[1] + len(possible_across_regex) == k: | |
| possible_across_regexes.append( | |
| (start, possible_across_regex + (grid[i][k] or "."), valid) | |
| ) | |
| possible_across_regexes = [ | |
| (loc, regex, True) | |
| for loc, regex, valid in possible_across_regexes | |
| if len(regex) >= MIN_WORD_SIZE and valid | |
| ] | |
| regexes.extend(possible_across_regexes) | |
| elif (i == 0 or grid[i - 1][j] is None) and ( | |
| i + 1 == SIZE or grid[i + 1][j] is None | |
| ): | |
| running_regex = "" | |
| possible_down_regexes = [] | |
| for k in range(i, -1, -1): | |
| if ( | |
| (j != 0 and grid[k][j - 1] is not None) | |
| or (j != SIZE - 1 and grid[k][j + 1] is not None) | |
| ) and grid[k][j] is None: | |
| break | |
| valid = k == 0 or grid[k - 1][j] is None | |
| running_regex = (grid[k][j] or ".") + running_regex | |
| possible_down_regexes.append(((k, j), running_regex, valid)) | |
| possible_down_regexes = [p for p in possible_down_regexes if p[2]] | |
| for k in range(i + 1, SIZE): | |
| if ( | |
| (j != 0 and grid[k][j - 1] is not None) | |
| or (j != SIZE - 1 and grid[k][j + 1] is not None) | |
| ) and grid[k][j] is None: | |
| break | |
| valid = k == SIZE - 1 or grid[k + 1][j] is None | |
| for start, possible_down_regex, _ in possible_down_regexes[:]: | |
| if start[0] + len(possible_down_regex) == k: | |
| possible_down_regexes.append( | |
| (start, possible_down_regex + (grid[k][j] or "."), valid) | |
| ) | |
| possible_down_regexes = [ | |
| (loc, regex, False) | |
| for loc, regex, valid in possible_down_regexes | |
| if len(regex) >= MIN_WORD_SIZE and valid | |
| ] | |
| regexes.extend(possible_down_regexes) | |
| random.shuffle(regexes) | |
| for loc, regex, across in regexes: | |
| matches = re.findall("^" + regex + "$", search_text, re.MULTILINE) | |
| if len(matches) > 1: | |
| random.shuffle(matches) | |
| answer = matches[0] | |
| clue = Clue(answer, loc, across, False) | |
| return clue | |
| return None | |
| def new_game(room_name): | |
| competitive = room_name != "" | |
| with all_games_lock: | |
| if room_name in games: | |
| return games[room_name] | |
| if not competitive: | |
| while room_name == "" or room_name in games: | |
| room_name = str(random.randint(0, 999999)) | |
| init_word = random.choice(words) | |
| else: | |
| init_word = room_name | |
| return Game(room_name, competitive, init_word) | |
| def game_thread(): | |
| while True: | |
| now = time.time() | |
| for room_name, game in games.items(): | |
| idle_time = now - game.last_update_time | |
| if (game.complete and idle_time > COMPLETE_GAME_TIMEOUT) or ( | |
| idle_time > INCOMPLETE_GAME_TIMEOUT | |
| ): | |
| del games[room_name] | |
| continue | |
| for i, clue in enumerate(game.clues): | |
| timed_out = now - clue.create_time > GUESS_TIMEOUT if clue is not None else None | |
| if timed_out: | |
| game.clues[i].timed_out = True | |
| if clue is None or clue.solved or timed_out: | |
| game.replace_clue(i) | |
| time.sleep(0.1) | |
| thread = threading.Thread(target=game_thread) | |
| thread.daemon = True | |
| thread.start() |