
Also adds rudimentary testing framework for seeding a database from a given `.db` SQLite file. Probably extract this for general use!
154 lines
5.5 KiB
Python
154 lines
5.5 KiB
Python
from collections import defaultdict
|
|
from datetime import datetime, timedelta, MINYEAR
|
|
from heapq import nlargest, nsmallest
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, Request
|
|
from fastapi.responses import HTMLResponse
|
|
from sqlalchemy.sql.expression import func
|
|
|
|
from app.sql import models
|
|
|
|
from ..templates import jinja_templates
|
|
from ..sql.database import get_db
|
|
|
|
api_router = APIRouter(prefix="/stats", tags=["stats"])
|
|
html_router = APIRouter(
|
|
prefix="/stats", include_in_schema=False, default_response_class=HTMLResponse
|
|
)
|
|
|
|
|
|
@api_router.get("/graph")
|
|
def stats_graph_api(
|
|
deck_ids: Optional[str] = None,
|
|
normalize_final_datapoint: bool = False,
|
|
db=Depends(get_db),
|
|
):
|
|
# TODO - parallelize? (Probably not worth it :P )
|
|
|
|
# SO Answer on row_number: https://stackoverflow.com/a/38160409/1040915
|
|
# Docs: https://docs.sqlalchemy.org/en/20/core/sqlelement.html#sqlalchemy.sql.expression.over
|
|
row_number_column = (
|
|
func.row_number()
|
|
.over(
|
|
partition_by=[models.Deck.name, models.Game.date],
|
|
order_by=models.EloScore.id.desc(),
|
|
)
|
|
.label("row_number")
|
|
)
|
|
sub_query = (
|
|
db.query(models.Deck.name, models.EloScore.score, models.Game.date)
|
|
.outerjoin(models.EloScore, models.Deck.id == models.EloScore.deck_id)
|
|
.join(models.Game, models.EloScore.after_game_id == models.Game.id)
|
|
.add_column(row_number_column)
|
|
)
|
|
if deck_ids is not None:
|
|
sub_query = sub_query.filter(models.Deck.id.in_(deck_ids.split(",")))
|
|
|
|
sub_query = sub_query.subquery()
|
|
query = db.query(sub_query).filter(sub_query.c.row_number == 1)
|
|
results = query.all()
|
|
|
|
data_grouped_by_deck = defaultdict(list)
|
|
latest_date_so_far = datetime(MINYEAR, 1, 1, 0, 0, 0, 0)
|
|
for result in results:
|
|
# TODO - how to index results by name instead of tuple-number
|
|
date = result[2]
|
|
latest_date_so_far = max(latest_date_so_far, date)
|
|
data_grouped_by_deck[result[0]].append(
|
|
{"score": result[1], "date": date.strftime("%Y-%m-%d")}
|
|
)
|
|
|
|
if normalize_final_datapoint:
|
|
# Add a fake final datapoint to the series for any decks that weren't played in the latest game, so that lines
|
|
# continue all the way to the end of the graph
|
|
latest_date_formatted = latest_date_so_far.strftime("%Y-%m-%d")
|
|
for games in data_grouped_by_deck.values():
|
|
if games[-1]["date"] != latest_date_formatted:
|
|
games.append(
|
|
{"score": games[-1]["score"], "date": latest_date_formatted}
|
|
)
|
|
|
|
return {
|
|
"datasets": [
|
|
{"label": key, "data": data_grouped_by_deck[key]}
|
|
for key in data_grouped_by_deck
|
|
]
|
|
}
|
|
|
|
|
|
# As with many APIs, this is a candidate for parallelization if desired -
|
|
# could key by deck_id, then in parallel get scores over the time period for that deck.
|
|
# But performance isn't likely to be a big issue!
|
|
@api_router.get("/top_movers")
|
|
def top_movers(
|
|
lookback_in_days: int = 7,
|
|
number_of_movers: int = 3,
|
|
db=Depends(get_db),
|
|
):
|
|
# TODO - this will error-out on an empty database
|
|
date_of_latest_game = (
|
|
db.query(models.Game.date)
|
|
.order_by(models.Game.date.desc())
|
|
.limit(1)
|
|
.first()
|
|
._tuple()[0]
|
|
)
|
|
beginning_of_lookback = date_of_latest_game - timedelta(days=lookback_in_days)
|
|
|
|
# TODO - this mostly duplicates logic from `stats_graph_api`. Extract?
|
|
row_number_column = (
|
|
func.row_number()
|
|
.over(
|
|
partition_by=[models.Deck.name, models.Game.date],
|
|
order_by=models.EloScore.id.desc(),
|
|
)
|
|
.label("row_number")
|
|
)
|
|
sub_query = (
|
|
db.query(
|
|
models.Deck.id, models.Deck.name, models.EloScore.score, models.Game.date
|
|
)
|
|
.outerjoin(models.EloScore, models.Deck.id == models.EloScore.deck_id)
|
|
.join(models.Game, models.EloScore.after_game_id == models.Game.id)
|
|
.add_column(row_number_column)
|
|
.subquery()
|
|
)
|
|
scores = (
|
|
db.query(sub_query)
|
|
.filter(sub_query.c.row_number == 1)
|
|
.order_by(sub_query.c.date)
|
|
.all()
|
|
)
|
|
score_tracker = defaultdict(dict)
|
|
# First, get the score-per-deck at the start and end of the time period
|
|
for score in scores:
|
|
if score.date <= beginning_of_lookback:
|
|
score_tracker[score.id]["start_score"] = score.score
|
|
score_tracker[score.id]["latest_score"] = score.score
|
|
# Technically we don't need to _keep_ adding this (as it won't change for a given deck_id) - but, until/unless
|
|
# this logic is parallelized, there's no efficient way for the algorithm to know that it's operating on a deck
|
|
# that's already been seen once before
|
|
score_tracker[score.id]["name"] = score.name
|
|
# Then, find biggest movers
|
|
calculateds = [
|
|
{
|
|
"deck_id": deck_id,
|
|
"name": score_tracker[deck_id]["name"],
|
|
"start": score_tracker[deck_id]["start_score"],
|
|
"end": score_tracker[deck_id]["latest_score"],
|
|
"diff": score_tracker[deck_id]["latest_score"]
|
|
- score_tracker[deck_id]["start_score"],
|
|
}
|
|
for deck_id in score_tracker
|
|
]
|
|
return {
|
|
"positive": nlargest(number_of_movers, calculateds, key=lambda x: x["diff"]),
|
|
"negative": nsmallest(number_of_movers, calculateds, key=lambda x: x["diff"]),
|
|
}
|
|
|
|
|
|
@html_router.get("/graph")
|
|
def stats_graph(request: Request, db=Depends(get_db)):
|
|
return jinja_templates.TemplateResponse(request, "stats/graph.html")
|