83 lines
2.9 KiB
Python
83 lines
2.9 KiB
Python
from collections import defaultdict
|
|
from datetime import datetime, MINYEAR
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, Request
|
|
from fastapi.responses import HTMLResponse
|
|
from sqlalchemy.sql.expression import func
|
|
|
|
from app.sql import models
|
|
|
|
from ..templates import jinja_templates
|
|
from ..sql.database import get_db
|
|
|
|
api_router = APIRouter(prefix="/stats", tags=["stats"])
|
|
html_router = APIRouter(
|
|
prefix="/stats", include_in_schema=False, default_response_class=HTMLResponse
|
|
)
|
|
|
|
|
|
@api_router.get("/graph")
|
|
def stats_graph_api(
|
|
deck_ids: Optional[str] = None,
|
|
normalize_final_datapoint: bool = False,
|
|
db=Depends(get_db),
|
|
):
|
|
# TODO - parallelize? (Probably not worth it :P )
|
|
|
|
# SO Answer on row_number: https://stackoverflow.com/a/38160409/1040915
|
|
# Docs: https://docs.sqlalchemy.org/en/20/core/sqlelement.html#sqlalchemy.sql.expression.over
|
|
row_number_column = (
|
|
func.row_number()
|
|
.over(
|
|
partition_by=[models.Deck.name, models.Game.date],
|
|
order_by=models.EloScore.id.desc(),
|
|
)
|
|
.label("row_number")
|
|
)
|
|
sub_query = (
|
|
db.query(models.Deck.name, models.EloScore.score, models.Game.date)
|
|
.outerjoin(models.EloScore, models.Deck.id == models.EloScore.deck_id)
|
|
.join(models.Game, models.EloScore.after_game_id == models.Game.id)
|
|
.add_column(row_number_column)
|
|
)
|
|
if deck_ids is not None:
|
|
sub_query = sub_query.filter(models.Deck.id.in_(deck_ids.split(",")))
|
|
|
|
sub_query = sub_query.subquery()
|
|
query = db.query(sub_query).filter(sub_query.c.row_number == 1)
|
|
results = query.all()
|
|
|
|
data_grouped_by_deck = defaultdict(list)
|
|
latest_date_so_far = datetime(MINYEAR, 1, 1, 0, 0, 0, 0)
|
|
for result in results:
|
|
# TODO - how to index results by name instead of tuple-number
|
|
date = result[2]
|
|
latest_date_so_far = max(latest_date_so_far, date)
|
|
data_grouped_by_deck[result[0]].append(
|
|
{"score": result[1], "date": date.strftime("%Y-%m-%d")}
|
|
)
|
|
|
|
if normalize_final_datapoint:
|
|
# Add a fake final datapoint to the series for any decks that weren't played in the latest game, so that lines
|
|
# continue all the way to the end of the graph
|
|
latest_date_formatted = latest_date_so_far.strftime("%Y-%m-%d")
|
|
print(f"DEBUG = {latest_date_formatted=}")
|
|
for games in data_grouped_by_deck.values():
|
|
if games[-1]["date"] != latest_date_formatted:
|
|
games.append(
|
|
{"score": games[-1]["score"], "date": latest_date_formatted}
|
|
)
|
|
|
|
return {
|
|
"datasets": [
|
|
{"label": key, "data": data_grouped_by_deck[key]}
|
|
for key in data_grouped_by_deck
|
|
]
|
|
}
|
|
|
|
|
|
@html_router.get("/graph")
|
|
def stats_graph(request: Request, db=Depends(get_db)):
|
|
return jinja_templates.TemplateResponse(request, "stats/graph.html")
|