import csv import datetime import logging from collections import defaultdict from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile from fastapi.responses import HTMLResponse from sqlalchemy.orm import Session from .decks import list_decks from .games import create_game, latest_game, list_games from .players import list_players from ..templates import jinja_templates from ..sql import crud, schemas from ..sql.database import get_db from ..sql.models import Format, WinType LOGGER = logging.getLogger(__name__) api_router = APIRouter(prefix="/seed", tags=["seed"]) html_router = APIRouter( prefix="/seed", include_in_schema=False, default_response_class=HTMLResponse ) @api_router.post("/players") def seed_players(file: UploadFile, db: Session = Depends(get_db)): file_contents = file.file.read().decode("utf-8").split("\n") reader = csv.reader(file_contents, delimiter=",") for row in reader: if not row: continue player_name = row[1] crud.create_player(db=db, player=schemas.PlayerCreate(name=player_name)) return "OK!" @api_router.post("/decks") def seed_decks(file: UploadFile, db: Session = Depends(get_db)): file_contents = file.file.read().decode("utf-8").split("\n") reader = csv.DictReader(file_contents, delimiter=",") for row in reader: if not row: continue crud.create_deck( db=db, deck=schemas.DeckCreate( **{key: row[key] for key in ["name", "description", "owner_id"]} ), ) return "OK!" @api_router.post("/games") def seed_games(file: UploadFile, db: Session = Depends(get_db)): file_contents = file.file.read().decode("utf-8").split("\n") reader = csv.DictReader(file_contents, delimiter=",") for row in reader: if not row: continue args = { key: row[key] for key in [ "deck_id_1", "deck_id_2", "winning_deck_id", "number_of_turns", "first_player_out_turn", "win_type_id", "description", ] } args["date"] = datetime.datetime.strptime(row["date"], "%Y-%m-%d") for deck_id_num in ["deck_id_3", "deck_id_4", "deck_id_5", "deck_id_6"]: if deck_id := row[deck_id_num]: LOGGER.error(f"{deck_id_num} is {deck_id}") args[deck_id_num] = deck_id crud.create_game( db=db, game=schemas.GameCreate(**args), ) return "OK!" @api_router.post("/all_in_one") def all_in_one(file: UploadFile, db: Session = Depends(get_db)): file_contents = file.file.read().decode("utf-8").split("\n") reader = csv.DictReader(file_contents, delimiter=",") # Fetch the currently-known-information so that we can avoid recreating existing data current_player_ids_by_name = { player.name: player.id for player in list_players(db=db) } current_deck_ids_by_name = {deck.name: deck.id for deck in list_decks(db=db)} try: latest_recorded_game = latest_game(db=db) date_of_latest_game = latest_recorded_game.date except HTTPException: # No games have been returned from the db, thus no games should be skipped for downloading date_of_latest_game = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0, 0) current_games = list_games(db=db) print(f"{current_games}") # Depends on being sorted by date - which is currently _coincidentally_ true of our source data as games have thus # far only been added in date order, but is not necessarily the case. # TODO - implement sorting (and pagination) of returned data, then update this to take advantage of it. if current_games: # I.e. if any games have been returned from the db date_of_latest_game = current_games[-1].date else: # No games have been returned from the db, thus no games should be skipped for downloading date_of_latest_game = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0, 0) # Mapping from name to set-of-owned-decks # (Set rather than list so that we can blindly `.add`) player_decks = defaultdict(set) # I'm hard-coding seeding of win_cons and formats (in `app/sql/__init__.py`), rather than requiring them to be # manually seeded - but this would be where we'd track them if we wanted them to be seeded # win_types = set() # formats = set() for row_idx, row in enumerate(reader): if not row: continue for i in range(6): player_id = f"Player {i+1}" if row[player_id]: player_decks[row[player_id]].add(row[f"Deck {i+1}"]) # See above # win_types.add(row['Type of win']) # formats.add(row['Format']) # If we cared about memory efficiency we could have instead made `player_decks` into an extensible data structure # and added this information in there, but I'm hardly going to be dealing with memory-intensive amounts of # data in this app. player_id_lookup = {} deck_id_lookup = {} for player_name, decks in player_decks.items(): if player_name in current_player_ids_by_name: LOGGER.info(f"Looked up {player_name=} from existing database") player_id = current_player_ids_by_name[player_name] else: player = crud.create_player( db=db, player=schemas.PlayerCreate(name=player_name) ) LOGGER.info(f"Seeded {player=}") player_id = player.id player_id_lookup[player_name] = player_id for deck_name in decks: if deck_name in current_deck_ids_by_name: LOGGER.info(f"Looked up {deck_name=} from existing database") deck_id_lookup[f"{player_name}:{deck_name}"] = current_deck_ids_by_name[ deck_name ] else: deck = crud.create_deck( db=db, deck=schemas.DeckCreate( name=deck_name, description="", owner_id=player_id ), ) LOGGER.info(f"Seeded {deck=}") # We need to look up deck id by `player_name:deck_name` because there could be multiple decks with the same # name owned by different people :D deck_id_lookup[f"{player_name}:{deck_name}"] = deck.id def parse_date(date_string) -> datetime.datetime: month, day, year = date_string.split("/") return datetime.datetime.strptime( f"{year}-{month.rjust(2, '0')}-{day.rjust(2, '0')}", "%y-%m-%d" ) win_types = db.query(WinType).all() formats = db.query(Format).all() # Recreate the reader to consume the rows again. # (Again, if we _really_ cared about efficiency we could have stored this data on the first pass to avoid a # retraversal. I suspect that the overhead of O(2*n) vs. O(n) data-reads is going to be insignificant) # ((Yes, I know that's an abuse of Big-O notation, shut up - you knew what I meant :P )) reader = csv.DictReader(file_contents, delimiter=",") for row in reader: # Skip any games created before the date of the latest current game # (Note that this means that the `all_in_one` method cannot be used to backfill any previously-played games. If # there arises a desire for that, instead will have to check each potentially-uploaded game against _every_ # currently-uploaded one to check for pre-existence (or, make the "create" option idempotent...though that # probably shouldn't be the case, as attempting to upload the same game twice is _probably_ an indication of an # automated script or summarization going rogue, which should be flagged up _as_ an error rather than blindly # continued. For the User-facing UI, just present a "whoops! You submitted a duplicate" screen)) date_of_current_row = parse_date(row["Date"]) if date_of_current_row <= date_of_latest_game: message = f"Skipped a game on {date_of_current_row} because it is not later than {date_of_latest_game}" LOGGER.info(message) # TBD - logging does not seem to be showing up as-expected print(message) continue # Note that we intentionally create via the API, not via direct `crud.create_game`, to trigger ELO calculation. index_of_winning_deck = [ row[f"Deck {i+1}"] == row["Winning Deck"] for i in range(6) ].index(True) print(f"DEBUG - checking row {row}") created_game = create_game( schemas.GameCreate( date=date_of_current_row, **{ f"deck_id_{i+1}": deck_id_lookup[ row[f"Player {i+1}"] + ":" + row[f"Deck {i+1}"] ] for i in range(6) if row[f"Deck {i+1}"] }, winning_deck_id=deck_id_lookup[ row[f"Player {index_of_winning_deck+1}"] + ":" + row[f"Deck {index_of_winning_deck+1}"] ], number_of_turns=int(row["# turns"]), first_player_out_turn=row["turn 1st player out"], win_type_id=[ win_type.id for win_type in win_types if win_type.name == row["Type of win"] ][0], format_id=[ format.id for format in formats if format.name == row["Format"] ][0], description=row["Notes"], ), db, ) LOGGER.info(f"Seeded {created_game=}") return "Ok!" @html_router.get("/") def main(request: Request, db=Depends(get_db)): return jinja_templates.TemplateResponse( request, "/seed.html", )