from collections import defaultdict from datetime import datetime, timedelta, MINYEAR from heapq import nlargest, nsmallest from typing import Optional from fastapi import APIRouter, Depends, Request from fastapi.responses import HTMLResponse from sqlalchemy.sql.expression import func from app.sql import models from ..templates import jinja_templates from ..sql.database import get_db api_router = APIRouter(prefix="/stats", tags=["stats"]) html_router = APIRouter( prefix="/stats", include_in_schema=False, default_response_class=HTMLResponse ) @api_router.get("/graph") def stats_graph_api( deck_ids: Optional[str] = None, normalize_final_datapoint: bool = False, db=Depends(get_db), ): # TODO - parallelize? (Probably not worth it :P ) # SO Answer on row_number: https://stackoverflow.com/a/38160409/1040915 # Docs: https://docs.sqlalchemy.org/en/20/core/sqlelement.html#sqlalchemy.sql.expression.over row_number_column = ( func.row_number() .over( partition_by=[models.Deck.name, models.Game.date], order_by=models.EloScore.id.desc(), ) .label("row_number") ) sub_query = ( db.query(models.Deck.name, models.EloScore.score, models.Game.date) .outerjoin(models.EloScore, models.Deck.id == models.EloScore.deck_id) .join(models.Game, models.EloScore.after_game_id == models.Game.id) .add_column(row_number_column) ) if deck_ids is not None: sub_query = sub_query.filter(models.Deck.id.in_(deck_ids.split(","))) sub_query = sub_query.subquery() query = db.query(sub_query).filter(sub_query.c.row_number == 1) results = query.all() data_grouped_by_deck = defaultdict(list) latest_date_so_far = datetime(MINYEAR, 1, 1, 0, 0, 0, 0) for result in results: # TODO - how to index results by name instead of tuple-number date = result[2] latest_date_so_far = max(latest_date_so_far, date) data_grouped_by_deck[result[0]].append( {"score": result[1], "date": date.strftime("%Y-%m-%d")} ) if normalize_final_datapoint: # Add a fake final datapoint to the series for any decks that weren't played in the latest game, so that lines # continue all the way to the end of the graph latest_date_formatted = latest_date_so_far.strftime("%Y-%m-%d") for games in data_grouped_by_deck.values(): if games[-1]["date"] != latest_date_formatted: games.append( {"score": games[-1]["score"], "date": latest_date_formatted} ) return { "datasets": [ {"label": key, "data": data_grouped_by_deck[key]} for key in data_grouped_by_deck ] } # As with many APIs, this is a candidate for parallelization if desired - # could key by deck_id, then in parallel get scores over the time period for that deck. # But performance isn't likely to be a big issue! @api_router.get("/top_movers") def top_movers( lookback_in_days: int = 7, number_of_movers: int = 3, db=Depends(get_db), ): # TODO - this will error-out on an empty database date_of_latest_game = ( db.query(models.Game.date) .order_by(models.Game.date.desc()) .limit(1) .first() ._tuple()[0] ) beginning_of_lookback = date_of_latest_game - timedelta(days=lookback_in_days) # TODO - this mostly duplicates logic from `stats_graph_api`. Extract? row_number_column = ( func.row_number() .over( partition_by=[models.Deck.name, models.Game.date], order_by=models.EloScore.id.desc(), ) .label("row_number") ) sub_query = ( db.query( models.Deck.id, models.Deck.name, models.EloScore.score, models.Game.date ) .outerjoin(models.EloScore, models.Deck.id == models.EloScore.deck_id) .join(models.Game, models.EloScore.after_game_id == models.Game.id) .add_column(row_number_column) .subquery() ) scores = ( db.query(sub_query) .filter(sub_query.c.row_number == 1) .order_by(sub_query.c.date) .all() ) score_tracker = defaultdict(dict) # First, get the score-per-deck at the start and end of the time period for score in scores: if score.date <= beginning_of_lookback: score_tracker[score.id]["start_score"] = score.score score_tracker[score.id]["latest_score"] = score.score # Technically we don't need to _keep_ adding this (as it won't change for a given deck_id) - but, until/unless # this logic is parallelized, there's no efficient way for the algorithm to know that it's operating on a deck # that's already been seen once before score_tracker[score.id]["name"] = score.name # Then, find biggest movers calculateds = [ { "deck_id": deck_id, "name": score_tracker[deck_id]["name"], "start": score_tracker[deck_id]["start_score"], "end": score_tracker[deck_id]["latest_score"], "diff": score_tracker[deck_id]["latest_score"] - score_tracker[deck_id]["start_score"], } for deck_id in score_tracker if "start_score" in score_tracker[deck_id] ] return { "positive": nlargest(number_of_movers, calculateds, key=lambda x: x["diff"]), "negative": nsmallest(number_of_movers, calculateds, key=lambda x: x["diff"]), } @html_router.get("/graph") def stats_graph(request: Request, db=Depends(get_db)): return jinja_templates.TemplateResponse(request, "stats/graph.html")