
When `seed/all_in_one` is now called, it will update with only that data that exists later than the most-recently-played* game, allowing the upload to be used repeatedly without having to clear the database. \* Actually, "highest-ID game", as we haven't implemented `list_games_by_date`, yet
250 lines
9.9 KiB
Python
250 lines
9.9 KiB
Python
import csv
|
|
import datetime
|
|
import logging
|
|
|
|
from collections import defaultdict
|
|
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile
|
|
from fastapi.responses import HTMLResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from .decks import list_decks
|
|
from .games import create_game, latest_game, list_games
|
|
from .players import list_players
|
|
from ..templates import jinja_templates
|
|
from ..sql import crud, schemas
|
|
from ..sql.database import get_db
|
|
from ..sql.models import Format, WinType
|
|
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
api_router = APIRouter(prefix="/seed", tags=["seed"])
|
|
html_router = APIRouter(
|
|
prefix="/seed", include_in_schema=False, default_response_class=HTMLResponse
|
|
)
|
|
|
|
|
|
@api_router.post("/players")
|
|
def seed_players(file: UploadFile, db: Session = Depends(get_db)):
|
|
file_contents = file.file.read().decode("utf-8").split("\n")
|
|
reader = csv.reader(file_contents, delimiter=",")
|
|
for row in reader:
|
|
if not row:
|
|
continue
|
|
player_name = row[1]
|
|
crud.create_player(db=db, player=schemas.PlayerCreate(name=player_name))
|
|
return "OK!"
|
|
|
|
|
|
@api_router.post("/decks")
|
|
def seed_decks(file: UploadFile, db: Session = Depends(get_db)):
|
|
file_contents = file.file.read().decode("utf-8").split("\n")
|
|
reader = csv.DictReader(file_contents, delimiter=",")
|
|
for row in reader:
|
|
if not row:
|
|
continue
|
|
crud.create_deck(
|
|
db=db,
|
|
deck=schemas.DeckCreate(
|
|
**{key: row[key] for key in ["name", "description", "owner_id"]}
|
|
),
|
|
)
|
|
return "OK!"
|
|
|
|
|
|
@api_router.post("/games")
|
|
def seed_games(file: UploadFile, db: Session = Depends(get_db)):
|
|
file_contents = file.file.read().decode("utf-8").split("\n")
|
|
reader = csv.DictReader(file_contents, delimiter=",")
|
|
for row in reader:
|
|
if not row:
|
|
continue
|
|
args = {
|
|
key: row[key]
|
|
for key in [
|
|
"deck_id_1",
|
|
"deck_id_2",
|
|
"winning_deck_id",
|
|
"number_of_turns",
|
|
"first_player_out_turn",
|
|
"win_type_id",
|
|
"description",
|
|
]
|
|
}
|
|
args["date"] = datetime.datetime.strptime(row["date"], "%Y-%m-%d")
|
|
|
|
for deck_id_num in ["deck_id_3", "deck_id_4", "deck_id_5", "deck_id_6"]:
|
|
if deck_id := row[deck_id_num]:
|
|
LOGGER.error(f"{deck_id_num} is {deck_id}")
|
|
args[deck_id_num] = deck_id
|
|
crud.create_game(
|
|
db=db,
|
|
game=schemas.GameCreate(**args),
|
|
)
|
|
return "OK!"
|
|
|
|
|
|
@api_router.post("/all_in_one")
|
|
def all_in_one(file: UploadFile, db: Session = Depends(get_db)):
|
|
file_contents = file.file.read().decode("utf-8").split("\n")
|
|
reader = csv.DictReader(file_contents, delimiter=",")
|
|
|
|
# Fetch the currently-known-information so that we can avoid recreating existing data
|
|
current_player_ids_by_name = {
|
|
player.name: player.id for player in list_players(db=db)
|
|
}
|
|
current_deck_ids_by_name = {deck.name: deck.id for deck in list_decks(db=db)}
|
|
try:
|
|
latest_recorded_game = latest_game(db=db)
|
|
date_of_latest_game = latest_recorded_game.date
|
|
except HTTPException:
|
|
# No games have been returned from the db, thus no games should be skipped for downloading
|
|
date_of_latest_game = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0, 0)
|
|
current_games = list_games(db=db)
|
|
print(f"{current_games}")
|
|
# Depends on being sorted by date - which is currently _coincidentally_ true of our source data as games have thus
|
|
# far only been added in date order, but is not necessarily the case.
|
|
# TODO - implement sorting (and pagination) of returned data, then update this to take advantage of it.
|
|
if current_games:
|
|
# I.e. if any games have been returned from the db
|
|
date_of_latest_game = current_games[-1].date
|
|
else:
|
|
# No games have been returned from the db, thus no games should be skipped for downloading
|
|
date_of_latest_game = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0, 0)
|
|
|
|
# Mapping from name to set-of-owned-decks
|
|
# (Set rather than list so that we can blindly `.add`)
|
|
player_decks = defaultdict(set)
|
|
# I'm hard-coding seeding of win_cons and formats (in `app/sql/__init__.py`), rather than requiring them to be
|
|
# manually seeded - but this would be where we'd track them if we wanted them to be seeded
|
|
# win_types = set()
|
|
# formats = set()
|
|
|
|
for row_idx, row in enumerate(reader):
|
|
if not row:
|
|
continue
|
|
for i in range(6):
|
|
player_id = f"Player {i+1}"
|
|
if row[player_id]:
|
|
player_decks[row[player_id]].add(row[f"Deck {i+1}"])
|
|
|
|
# See above
|
|
# win_types.add(row['Type of win'])
|
|
# formats.add(row['Format'])
|
|
|
|
# If we cared about memory efficiency we could have instead made `player_decks` into an extensible data structure
|
|
# and added this information in there, but I'm hardly going to be dealing with memory-intensive amounts of
|
|
# data in this app.
|
|
player_id_lookup = {}
|
|
deck_id_lookup = {}
|
|
|
|
for player_name, decks in player_decks.items():
|
|
if player_name in current_player_ids_by_name:
|
|
LOGGER.info(f"Looked up {player_name=} from existing database")
|
|
player_id = current_player_ids_by_name[player_name]
|
|
else:
|
|
player = crud.create_player(
|
|
db=db, player=schemas.PlayerCreate(name=player_name)
|
|
)
|
|
LOGGER.info(f"Seeded {player=}")
|
|
player_id = player.id
|
|
|
|
player_id_lookup[player_name] = player_id
|
|
|
|
for deck_name in decks:
|
|
if deck_name in current_deck_ids_by_name:
|
|
LOGGER.info(f"Looked up {deck_name=} from existing database")
|
|
deck_id_lookup[f"{player_name}:{deck_name}"] = current_deck_ids_by_name[
|
|
deck_name
|
|
]
|
|
else:
|
|
deck = crud.create_deck(
|
|
db=db,
|
|
deck=schemas.DeckCreate(
|
|
name=deck_name, description="", owner_id=player_id
|
|
),
|
|
)
|
|
LOGGER.info(f"Seeded {deck=}")
|
|
# We need to look up deck id by `player_name:deck_name` because there could be multiple decks with the same
|
|
# name owned by different people :D
|
|
deck_id_lookup[f"{player_name}:{deck_name}"] = deck.id
|
|
|
|
def parse_date(date_string) -> datetime.datetime:
|
|
month, day, year = date_string.split("/")
|
|
return datetime.datetime.strptime(
|
|
f"{year}-{month.rjust(2, '0')}-{day.rjust(2, '0')}", "%y-%m-%d"
|
|
)
|
|
|
|
win_types = db.query(WinType).all()
|
|
formats = db.query(Format).all()
|
|
|
|
# Recreate the reader to consume the rows again.
|
|
# (Again, if we _really_ cared about efficiency we could have stored this data on the first pass to avoid a
|
|
# retraversal. I suspect that the overhead of O(2*n) vs. O(n) data-reads is going to be insignificant)
|
|
# ((Yes, I know that's an abuse of Big-O notation, shut up - you knew what I meant :P ))
|
|
reader = csv.DictReader(file_contents, delimiter=",")
|
|
for row in reader:
|
|
# Skip any games created before the date of the latest current game
|
|
# (Note that this means that the `all_in_one` method cannot be used to backfill any previously-played games. If
|
|
# there arises a desire for that, instead will have to check each potentially-uploaded game against _every_
|
|
# currently-uploaded one to check for pre-existence (or, make the "create" option idempotent...though that
|
|
# probably shouldn't be the case, as attempting to upload the same game twice is _probably_ an indication of an
|
|
# automated script or summarization going rogue, which should be flagged up _as_ an error rather than blindly
|
|
# continued. For the User-facing UI, just present a "whoops! You submitted a duplicate" screen))
|
|
date_of_current_row = parse_date(row["Date"])
|
|
if date_of_current_row <= date_of_latest_game:
|
|
message = f"Skipped a game on {date_of_current_row} because it is not later than {date_of_latest_game}"
|
|
LOGGER.info(message)
|
|
# TBD - logging does not seem to be showing up as-expected
|
|
print(message)
|
|
continue
|
|
|
|
# Note that we intentionally create via the API, not via direct `crud.create_game`, to trigger ELO calculation.
|
|
|
|
index_of_winning_deck = [
|
|
row[f"Deck {i+1}"] == row["Winning Deck"] for i in range(6)
|
|
].index(True)
|
|
print(f"DEBUG - checking row {row}")
|
|
created_game = create_game(
|
|
schemas.GameCreate(
|
|
date=date_of_current_row,
|
|
**{
|
|
f"deck_id_{i+1}": deck_id_lookup[
|
|
row[f"Player {i+1}"] + ":" + row[f"Deck {i+1}"]
|
|
]
|
|
for i in range(6)
|
|
if row[f"Deck {i+1}"]
|
|
},
|
|
winning_deck_id=deck_id_lookup[
|
|
row[f"Player {index_of_winning_deck+1}"]
|
|
+ ":"
|
|
+ row[f"Deck {index_of_winning_deck+1}"]
|
|
],
|
|
number_of_turns=int(row["# turns"]),
|
|
first_player_out_turn=row["turn 1st player out"],
|
|
win_type_id=[
|
|
win_type.id
|
|
for win_type in win_types
|
|
if win_type.name == row["Type of win"]
|
|
][0],
|
|
format_id=[
|
|
format.id for format in formats if format.name == row["Format"]
|
|
][0],
|
|
description=row["Notes"],
|
|
),
|
|
db,
|
|
)
|
|
LOGGER.info(f"Seeded {created_game=}")
|
|
|
|
return "Ok!"
|
|
|
|
|
|
@html_router.get("/")
|
|
def main(request: Request, db=Depends(get_db)):
|
|
return jinja_templates.TemplateResponse(
|
|
request,
|
|
"/seed.html",
|
|
)
|