| import threading | |
| import openai | |
| import os | |
| import time | |
| import random | |
| openai.api_key = os.environ["OPENAI_KEY"] | |
| games = {} | |
| all_games_lock = threading.Lock() | |
| TIME_TO_SUBMIT = 60 | |
| TIME_TO_VOTE = 30 | |
| TIME_TO_REVEAL = 18 | |
| GUESS_AI_PTS = 2 | |
| GUESSED_AS_AI_PTS = 1 | |
| with open("prompt.txt", "r") as f: | |
| PROMPT = f.read() | |
| def new_minigame(): | |
| return "Write a poem about mice!" | |
| def ai_response(question): | |
| prompt = PROMPT.format( | |
| question, | |
| ) | |
| completions = None | |
| while completions is None: | |
| try: | |
| completions = openai.Completion.create( | |
| engine="text-davinci-003", prompt=prompt, max_tokens=200, n=1, temperature=0.5 | |
| ) | |
| except: | |
| print("OpenAI connection error") | |
| time.sleep(1) | |
| response = completions["choices"][0]["text"].strip() | |
| return response | |
| class Game: | |
| def __init__(self, room_name): | |
| self.room_name = room_name | |
| self.command = None | |
| self.players = [] | |
| self.prompts = [] | |
| self.prompts_completed = 0 | |
| self.responses = {} | |
| self.response_list = [] | |
| self.ai_response_index = None | |
| self.lock = threading.Lock() | |
| self.started = False | |
| self.stage = None | |
| self.stage_start_time = None | |
| self.stage_time_remaining = None | |
| self.total_stage_time = None | |
| self.complete = False | |
| self.last_update_index = 0 | |
| self.input_allowed = False | |
| self.last_update_time = time.time() | |
| self.chat = [] | |
| self.player_status = {} | |
| self.player_scores = {} | |
| self.player_guess = {} | |
| self.ai_answer = None | |
| games[room_name] = self | |
| def update(self): | |
| self.last_update_index += 1 | |
| self.last_update_time = time.time() | |
| def start(self): | |
| self.started = True | |
| self.stage = "START" | |
| self.update() | |
| def get_player_state(self): | |
| return [ | |
| [player, self.player_status[player], self.player_scores[player]] | |
| for player in self.players | |
| ] | |
| def format_chat(self, player_name): | |
| formatted_chat = [[None, self.command]] | |
| if self.stage == "COLLECTING_RESPONSSES": | |
| if player_name in self.responses: | |
| formatted_chat.append([self.responses[player_name], None]) | |
| elif self.stage == "COLLECTING_VOTES": | |
| for msg_player, msg in self.response_list: | |
| if msg_player == player_name: | |
| formatted_chat.append([msg, None]) | |
| else: | |
| formatted_chat.append([None, msg]) | |
| elif self.stage == "REVEALING": | |
| for msg_player, msg in self.response_list: | |
| output_msg = f"**{msg_player or 'GPT'}**: {msg}\nGuessed by: {', '.join(player for player, guess in self.player_guess.items() if guess == msg_player)}" | |
| if msg_player == player_name: | |
| formatted_chat.append([output_msg, None]) | |
| else: | |
| formatted_chat.append([None, output_msg]) | |
| return formatted_chat | |
| def add_player(self, player_name, player_prompt): | |
| with self.lock: | |
| self.players.append(player_name) | |
| self.prompts.append(player_prompt) | |
| self.player_status[player_name] = None | |
| self.player_scores[player_name] = 0 | |
| if len(self.players) == 1: | |
| self.command = f"You have entered the game." | |
| else: | |
| self.command = f"{', '.join(self.players[:-1])} and {self.players[-1]} are in this room." | |
| self.update() | |
| def add_chat(self, player, message): | |
| with self.lock: | |
| self.player_status[player] = "Submitted" | |
| self.responses[player] = message | |
| def select_chat(self, player, index): | |
| with self.lock: | |
| if self.stage != "COLLECTING_VOTES": | |
| return | |
| index = index - 1 | |
| selected_player = self.response_list[index][0] | |
| self.player_guess[player] = selected_player | |
| self.player_status[player] = "Voted" | |
| def new_game(room_name): | |
| with all_games_lock: | |
| if room_name in games: | |
| game = games[room_name] | |
| return game | |
| return Game(room_name) | |
| def game_thread(): | |
| while True: | |
| now = time.time() | |
| for room_name, game in games.items(): | |
| if not game.started or game.complete: | |
| continue | |
| with game.lock: | |
| if game.stage == "START": | |
| prompt = game.prompts[game.prompts_completed] | |
| game.command = ( | |
| f"**{game.players[game.prompts_completed]}**: {prompt}" | |
| ) | |
| game.input_allowed = True | |
| game.player_status = {player: None for player in game.players} | |
| game.player_guess = {player: None for player in game.players} | |
| game.ai_answer = ai_response(prompt) | |
| game.stage = "COLLECTING_RESPONSES" | |
| game.responses = {} | |
| game.stage_start_time = now | |
| game.total_stage_time = TIME_TO_SUBMIT | |
| game.stage_time_remaining = TIME_TO_SUBMIT | |
| game.update() | |
| stage_time_remaining = int( | |
| game.total_stage_time - (now - game.stage_start_time) | |
| ) | |
| if stage_time_remaining != game.stage_time_remaining: | |
| game.stage_time_remaining = stage_time_remaining | |
| game.update() | |
| out_of_time = stage_time_remaining <= 0 | |
| if game.stage == "COLLECTING_RESPONSES": | |
| if out_of_time or all( | |
| val == "Submitted" for val in game.player_status.values() | |
| ): | |
| game.stage = "COLLECTING_VOTES" | |
| game.stage_start_time = now | |
| game.total_stage_time = TIME_TO_VOTE | |
| game.command = "Select the AI response!" | |
| game.player_status = {player: None for player in game.players} | |
| game.response_list = list(game.responses.items()) | |
| game.input_allowed = False | |
| ai_answer = (None, game.ai_answer) | |
| game.response_list.append(ai_answer) | |
| random.shuffle(game.response_list) | |
| game.ai_response_index = game.response_list.index(ai_answer) | |
| game.update() | |
| elif game.stage == "COLLECTING_VOTES": | |
| if out_of_time or all( | |
| val == "Voted" for val in game.player_status.values() | |
| ): | |
| game.stage = "REVEALING" | |
| game.command = "Here's the results!" | |
| game.stage_start_time = now | |
| game.total_stage_time = TIME_TO_REVEAL | |
| for player, guess in game.player_guess.items(): | |
| if guess == None: | |
| game.player_scores[player] += GUESS_AI_PTS | |
| else: | |
| game.player_scores[guess] += GUESSED_AS_AI_PTS | |
| game.update() | |
| elif game.stage == "REVEALING": | |
| if out_of_time: | |
| game.stage = "START" | |
| game.prompts_completed += 1 | |
| if game.prompts_completed == len(game.players): | |
| game.complete = True | |
| game.command = f"Game complete! Winners: {', '.join(player for player, score in game.player_scores.items() if score == max(game.player_scores.values()))}" | |
| game.update() | |
| time.sleep(0.1) | |
| thread = threading.Thread(target=game_thread) | |
| thread.daemon = True | |
| thread.start() | |