edh-elo/app/routers/stats.py
Jack Jackson 5d2183bbf0 Fix to top movers
Don't try processsing decks that don't have a score before the cut-off
date.
2024-08-23 09:58:58 -07:00

155 lines
5.6 KiB
Python

from collections import defaultdict
from datetime import datetime, timedelta, MINYEAR
from heapq import nlargest, nsmallest
from typing import Optional
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse
from sqlalchemy.sql.expression import func
from app.sql import models
from ..templates import jinja_templates
from ..sql.database import get_db
api_router = APIRouter(prefix="/stats", tags=["stats"])
html_router = APIRouter(
prefix="/stats", include_in_schema=False, default_response_class=HTMLResponse
)
@api_router.get("/graph")
def stats_graph_api(
deck_ids: Optional[str] = None,
normalize_final_datapoint: bool = False,
db=Depends(get_db),
):
# TODO - parallelize? (Probably not worth it :P )
# SO Answer on row_number: https://stackoverflow.com/a/38160409/1040915
# Docs: https://docs.sqlalchemy.org/en/20/core/sqlelement.html#sqlalchemy.sql.expression.over
row_number_column = (
func.row_number()
.over(
partition_by=[models.Deck.name, models.Game.date],
order_by=models.EloScore.id.desc(),
)
.label("row_number")
)
sub_query = (
db.query(models.Deck.name, models.EloScore.score, models.Game.date)
.outerjoin(models.EloScore, models.Deck.id == models.EloScore.deck_id)
.join(models.Game, models.EloScore.after_game_id == models.Game.id)
.add_column(row_number_column)
)
if deck_ids is not None:
sub_query = sub_query.filter(models.Deck.id.in_(deck_ids.split(",")))
sub_query = sub_query.subquery()
query = db.query(sub_query).filter(sub_query.c.row_number == 1)
results = query.all()
data_grouped_by_deck = defaultdict(list)
latest_date_so_far = datetime(MINYEAR, 1, 1, 0, 0, 0, 0)
for result in results:
# TODO - how to index results by name instead of tuple-number
date = result[2]
latest_date_so_far = max(latest_date_so_far, date)
data_grouped_by_deck[result[0]].append(
{"score": result[1], "date": date.strftime("%Y-%m-%d")}
)
if normalize_final_datapoint:
# Add a fake final datapoint to the series for any decks that weren't played in the latest game, so that lines
# continue all the way to the end of the graph
latest_date_formatted = latest_date_so_far.strftime("%Y-%m-%d")
for games in data_grouped_by_deck.values():
if games[-1]["date"] != latest_date_formatted:
games.append(
{"score": games[-1]["score"], "date": latest_date_formatted}
)
return {
"datasets": [
{"label": key, "data": data_grouped_by_deck[key]}
for key in data_grouped_by_deck
]
}
# As with many APIs, this is a candidate for parallelization if desired -
# could key by deck_id, then in parallel get scores over the time period for that deck.
# But performance isn't likely to be a big issue!
@api_router.get("/top_movers")
def top_movers(
lookback_in_days: int = 7,
number_of_movers: int = 3,
db=Depends(get_db),
):
# TODO - this will error-out on an empty database
date_of_latest_game = (
db.query(models.Game.date)
.order_by(models.Game.date.desc())
.limit(1)
.first()
._tuple()[0]
)
beginning_of_lookback = date_of_latest_game - timedelta(days=lookback_in_days)
# TODO - this mostly duplicates logic from `stats_graph_api`. Extract?
row_number_column = (
func.row_number()
.over(
partition_by=[models.Deck.name, models.Game.date],
order_by=models.EloScore.id.desc(),
)
.label("row_number")
)
sub_query = (
db.query(
models.Deck.id, models.Deck.name, models.EloScore.score, models.Game.date
)
.outerjoin(models.EloScore, models.Deck.id == models.EloScore.deck_id)
.join(models.Game, models.EloScore.after_game_id == models.Game.id)
.add_column(row_number_column)
.subquery()
)
scores = (
db.query(sub_query)
.filter(sub_query.c.row_number == 1)
.order_by(sub_query.c.date)
.all()
)
score_tracker = defaultdict(dict)
# First, get the score-per-deck at the start and end of the time period
for score in scores:
if score.date <= beginning_of_lookback:
score_tracker[score.id]["start_score"] = score.score
score_tracker[score.id]["latest_score"] = score.score
# Technically we don't need to _keep_ adding this (as it won't change for a given deck_id) - but, until/unless
# this logic is parallelized, there's no efficient way for the algorithm to know that it's operating on a deck
# that's already been seen once before
score_tracker[score.id]["name"] = score.name
# Then, find biggest movers
calculateds = [
{
"deck_id": deck_id,
"name": score_tracker[deck_id]["name"],
"start": score_tracker[deck_id]["start_score"],
"end": score_tracker[deck_id]["latest_score"],
"diff": score_tracker[deck_id]["latest_score"]
- score_tracker[deck_id]["start_score"],
}
for deck_id in score_tracker
if "start_score" in score_tracker[deck_id]
]
return {
"positive": nlargest(number_of_movers, calculateds, key=lambda x: x["diff"]),
"negative": nsmallest(number_of_movers, calculateds, key=lambda x: x["diff"]),
}
@html_router.get("/graph")
def stats_graph(request: Request, db=Depends(get_db)):
return jinja_templates.TemplateResponse(request, "stats/graph.html")