edh-elo/app/routers/seed.py
Jack Jackson 9b4e6c3b4d
All checks were successful
Publish / build-and-push (push) Successful in 6m14s
Introduce ability to seed data directly from GSheet
Reasonably hacky, in that I introduce a facade to reuse the data format
previously provided by the `csv` module, rather than using the
`list[list[str]]` directly.

Next I want to introduce something like Celery to continually refresh.

Note that this will require changes to the deployment repo in order to
provide the required secrets.
2025-04-17 22:22:21 -07:00

343 lines
14 KiB
Python

import csv
import datetime
import logging
from collections import defaultdict
from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile
from fastapi.responses import HTMLResponse
from sqlalchemy.orm import Session
from app.services import google_sheets
from .decks import list_decks
from .games import create_game, latest_game, list_games
from .players import list_players
from ..templates import jinja_templates
from ..sql import crud, schemas
from ..sql.database import get_db
from ..sql.models import Format, WinType
LOGGER = logging.getLogger(__name__)
api_router = APIRouter(prefix="/seed", tags=["seed"])
html_router = APIRouter(
prefix="/seed", include_in_schema=False, default_response_class=HTMLResponse
)
@api_router.post("/players")
def seed_players(file: UploadFile, db: Session = Depends(get_db)):
file_contents = file.file.read().decode("utf-8").split("\n")
reader = csv.reader(file_contents, delimiter=",")
for row in reader:
if not row:
continue
player_name = row[1]
crud.create_player(db=db, player=schemas.PlayerCreate(name=player_name))
return "OK!"
@api_router.post("/decks")
def seed_decks(file: UploadFile, db: Session = Depends(get_db)):
file_contents = file.file.read().decode("utf-8").split("\n")
reader = csv.DictReader(file_contents, delimiter=",")
for row in reader:
if not row:
continue
crud.create_deck(
db=db,
deck=schemas.DeckCreate(
**{key: row[key] for key in ["name", "description", "owner_id"]}
),
)
return "OK!"
@api_router.post("/games")
def seed_games(file: UploadFile, db: Session = Depends(get_db)):
file_contents = file.file.read().decode("utf-8").split("\n")
reader = csv.DictReader(file_contents, delimiter=",")
for row in reader:
if not row:
continue
args = {
key: row[key]
for key in [
"deck_id_1",
"deck_id_2",
"winning_deck_id",
"number_of_turns",
"first_player_out_turn",
"win_type_id",
"description",
]
}
args["date"] = datetime.datetime.strptime(row["date"], "%Y-%m-%d")
for deck_id_num in ["deck_id_3", "deck_id_4", "deck_id_5", "deck_id_6"]:
if deck_id := row[deck_id_num]:
LOGGER.error(f"{deck_id_num} is {deck_id}")
args[deck_id_num] = deck_id
crud.create_game(
db=db,
game=schemas.GameCreate(**args),
)
return "OK!"
@api_router.post("/from_google_sheets")
def from_google_sheets(db: Session = Depends(get_db)):
data = google_sheets.get_data()
reader = CSVFacade(data)
# Fetch the currently-known-information so that we can avoid recreating existing data
current_player_ids_by_name = {
player.name: player.id for player in list_players(db=db)
}
current_deck_ids_by_name = {deck.name: deck.id for deck in list_decks(db=db)}
try:
latest_recorded_game = latest_game(db=db)
date_of_latest_game = latest_recorded_game.date
except HTTPException:
# No games have been returned from the db, thus no games should be skipped for downloading
date_of_latest_game = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0, 0)
current_games = list_games(db=db)
print(f"{current_games}")
# Depends on being sorted by date - which is currently _coincidentally_ true of our source data as games have thus
# far only been added in date order, but is not necessarily the case.
# TODO - implement sorting (and pagination) of returned data, then update this to take advantage of it.
if current_games:
# I.e. if any games have been returned from the db
date_of_latest_game = current_games[-1].date
else:
# No games have been returned from the db, thus no games should be skipped for downloading
date_of_latest_game = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0, 0)
# Mapping from name to set-of-owned-decks
# (Set rather than list so that we can blindly `.add`)
player_decks = defaultdict(set)
# I'm hard-coding seeding of win_cons and formats (in `app/sql/__init__.py`), rather than requiring them to be
# manually seeded - but this would be where we'd track them if we wanted them to be seeded
# win_types = set()
# formats = set()
for row_idx, row in enumerate(reader):
if not row:
continue
for i in range(6):
player_id = f"Player {i+1}"
if row[player_id]:
player_decks[row[player_id]].add(row[f"Deck {i+1}"])
# See above
# win_types.add(row['Type of win'])
# formats.add(row['Format'])
# If we cared about memory efficiency we could have instead made `player_decks` into an extensible data structure
# and added this information in there, but I'm hardly going to be dealing with memory-intensive amounts of
# data in this app.
player_id_lookup = {}
deck_id_lookup = {}
for player_name, decks in player_decks.items():
if player_name in current_player_ids_by_name:
LOGGER.info(f"Looked up {player_name=} from existing database")
player_id = current_player_ids_by_name[player_name]
else:
player = crud.create_player(
db=db, player=schemas.PlayerCreate(name=player_name)
)
LOGGER.info(f"Seeded {player=}")
player_id = player.id
player_id_lookup[player_name] = player_id
for deck_name in decks:
if deck_name in current_deck_ids_by_name:
LOGGER.info(f"Looked up {deck_name=} from existing database")
deck_id_lookup[f"{player_name}:{deck_name}"] = current_deck_ids_by_name[
deck_name
]
else:
deck = crud.create_deck(
db=db,
deck=schemas.DeckCreate(
name=deck_name, description="", owner_id=player_id
),
)
LOGGER.info(f"Seeded {deck=}")
# We need to look up deck id by `player_name:deck_name` because there could be multiple decks with the same
# name owned by different people :D
deck_id_lookup[f"{player_name}:{deck_name}"] = deck.id
def parse_date(date_string) -> datetime.datetime:
month, day, year = date_string.split("/")
return datetime.datetime.strptime(
f"{year}-{month.rjust(2, '0')}-{day.rjust(2, '0')}", "%y-%m-%d"
)
win_types = db.query(WinType).all()
formats = db.query(Format).all()
# Recreate the reader to consume the rows again.
# (Again, if we _really_ cared about efficiency we could have stored this data on the first pass to avoid a
# retraversal. I suspect that the overhead of O(2*n) vs. O(n) data-reads is going to be insignificant)
# ((Yes, I know that's an abuse of Big-O notation, shut up - you knew what I meant :P ))
reader = CSVFacade(data)
for row in reader:
# Skip any games created before the date of the latest current game
# (Note that this means that the `all_in_one` method cannot be used to backfill any previously-played games. If
# there arises a desire for that, instead will have to check each potentially-uploaded game against _every_
# currently-uploaded one to check for pre-existence (or, make the "create" option idempotent...though that
# probably shouldn't be the case, as attempting to upload the same game twice is _probably_ an indication of an
# automated script or summarization going rogue, which should be flagged up _as_ an error rather than blindly
# continued. For the User-facing UI, just present a "whoops! You submitted a duplicate" screen))
date_of_current_row = parse_date(row["Date"])
if date_of_current_row <= date_of_latest_game:
message = f"Skipped a game on {date_of_current_row} because it is not later than {date_of_latest_game}"
LOGGER.info(message)
# TBD - logging does not seem to be showing up as-expected
print(message)
continue
# Note that we intentionally create via the API, not via direct `crud.create_game`, to trigger ELO calculation.
if not row["Winning Deck"].startswith("Tie"):
print(f"DEBUG - checking row {row}")
try:
index_of_winning_deck = [
row[f"Deck {i+1}"] == row["Winning Deck"] for i in range(6)
].index(True)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Error when processing row {row_idx}, game on {row['Date']}: Winning Deck is named {row['Winning Deck']}, but no deck with that name was found",
)
created_game = create_game(
schemas.GameCreate(
date=date_of_current_row,
**{
f"deck_id_{i+1}": deck_id_lookup[
row[f"Player {i+1}"] + ":" + row[f"Deck {i+1}"]
]
for i in range(6)
if row[f"Deck {i+1}"]
},
winning_deck_id=deck_id_lookup[
row[f"Player {index_of_winning_deck+1}"]
+ ":"
+ row[f"Deck {index_of_winning_deck+1}"]
],
number_of_turns=int(row["# turns"]),
first_player_out_turn=row["turn 1st player out"],
win_type_id=[
win_type.id
for win_type in win_types
if win_type.name == row["Type of win"]
][0],
format_id=[
format.id for format in formats if format.name == row["Format"]
][0],
description=row["Notes"],
),
db,
)
LOGGER.info(f"Seeded {created_game=}")
else:
# "Winning Deck" starts with the string `Tie` => the game was a tie
print(f"DEBUG - checking row {row}")
LOGGER.info("Checking a game with a tie!")
winning_deck_names = row["Winning Deck"][5:-1].split("; ")
print(f"DEBUG - {winning_deck_names=}")
indices_of_winning_decks = [
[row[f"Deck {i+1}"] == name for i in range(6)].index(True)
for name in winning_deck_names
]
created_game = create_game(
schemas.GameCreate(
date=date_of_current_row,
**{
f"deck_id_{i+1}": deck_id_lookup[
row[f"Player {i+1}"] + ":" + row[f"Deck {i+1}"]
]
for i in range(6)
if row[f"Deck {i+1}"]
},
winning_deck_id=deck_id_lookup[
row[f"Player {indices_of_winning_decks[0]+1}"]
+ ":"
+ row[f"Deck {indices_of_winning_decks[0]+1}"]
],
other_winning_deck_ids=",".join(
[
str(
deck_id_lookup[
row[f"Player {i+1}"] + ":" + row[f"Deck {i+1}"]
]
)
for i in indices_of_winning_decks[1:]
]
),
number_of_turns=int(row["# turns"]),
first_player_out_turn=row["turn 1st player out"],
win_type_id=[
win_type.id
for win_type in win_types
if win_type.name == row["Type of win"]
][0],
format_id=[
format.id for format in formats if format.name == row["Format"]
][0],
description=row["Notes"],
),
db,
)
LOGGER.info(f"Seeded {created_game=}")
return "Ok!"
# Facade class to imitate the CSV-reader-interface when passed a list[list[str]]
# as we get from the Google Sheets API
# dict[str, str], where the keys are the header names and the values are the row values
class CSVFacade:
def __init__(self, data: list[list[str]]):
self.headers = data[0]
self.data = data[1:]
logging.critical(f"Headers: {self.headers}")
logging.critical(f"First row: {self.data[0]}")
def __iter__(self):
self.index = 0
return self
def __next__(self):
if self.index >= len(self.data):
raise StopIteration
result = self.data[self.index]
self.index += 1
if len(self.headers) != len(result):
# If the last column - "notes" - is empty, fill it in with an empty string.
if len(result) == len(self.headers) - 1:
result.append("")
# If there's any other kind of discrepancy, though, that's probably an error
else:
raise ValueError(
f"Row {self.index} has {len(result)} columns, but {len(self.headers)} are expected"
)
return dict(zip(self.headers, result))
@html_router.get("/")
def main(request: Request, db=Depends(get_db)):
return jinja_templates.TemplateResponse(
request,
"/seed.html",
)