Convert to FastAPI

This commit is contained in:
Jack Jackson 2024-01-26 22:16:35 -08:00
parent ba457170ba
commit 5472dbc8b9
18 changed files with 251 additions and 457 deletions

3
.gitignore vendored
View File

@ -1 +1,2 @@
instance/
database/

View File

@ -42,10 +42,14 @@ USER appuser
# Copy the source code into the container.
COPY . .
# Create writeable directory for database
USER root
RUN mkdir database
RUN chmod 755 database
RUN chown appuser:appuser database
USER appuser
# Expose the port that the application listens on.
EXPOSE 5000
EXPOSE 8000
# Run the application. Note that this sets a hard-coded secret key, which is not secure - use proper secret generation
# and management in production!
CMD FLASK_APP=app SECRET_KEY=super-secret flask run --host=0.0.0.0
CMD uvicorn app:app --host 0.0.0.0

View File

@ -1,39 +1,12 @@
import os
import sys
from fastapi import FastAPI
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flasgger import Swagger
from .routers import decks, players
from .sql.models import Base
from .sql.database import engine
db = SQLAlchemy()
Base.metadata.create_all(bind=engine)
app = FastAPI()
def create_app():
app = Flask(__name__)
app.config["SWAGGER"] = {"openapi": "3.0.2"}
swagger = Swagger(app)
secret_key = os.environ.get("SECRET_KEY")
if not secret_key:
sys.stderr.write("YOU NEED TO SET AN ENV VARIABLE NAMED SECRET_KEY\n")
sys.exit(1)
app.config["SECRET_KEY"] = secret_key
# TODO - support other database types 🙃
app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get(
"DATABASE_URI", "sqlite:///db.sqlite"
)
db.init_app(app)
from .main import main as main_blueprint
app.register_blueprint(main_blueprint)
# TODO - understand how this works, since `db.create_all()` requires that the model classes have already been
# imported in order to know what to create. Perhaps `__init__.py` just magically has the context of everything in
# its module? Good opportunity to learn more about the Python import system!
with app.app_context():
db.create_all()
return app
app.include_router(players.router)
app.include_router(decks.router)

View File

@ -1,311 +0,0 @@
from flask import Blueprint, render_template, request
from . import db
from .models import Deck, Player
main = Blueprint("main", __name__)
@main.route("/")
def index():
"""Main Page
---
responses:
200:
description: A friendly greeting
schema:
type: string
"""
return "Hello, World - but new!"
@main.route("/player", methods=["POST"])
def create_player():
"""Create a Player
---
requestBody:
description: Payload describing the player
required: true
content:
application/json:
schema:
type: object
properties:
name:
type: string
example: Jim Bloggs
required:
- name
responses:
201:
description: Payload containing Player Id
schema:
type: object
properties:
id:
type: number
required:
- id
tags:
- player
"""
data = request.json
player = Player(name=data["name"])
db.session.add(player)
db.session.commit()
return {"id": player.id}, 201
@main.route("/player/<player_id>")
def get_player(player_id: str):
"""Get a Player
---
parameters:
- name: player_id
in: path
required: true
schema:
type: integer
minimum: 1
description: The Player Id
requestBody:
content:
application/json:
schema: {}
text/html:
schema: {}
responses:
200:
description: Payload describing player
content:
application/json:
schema:
id: Player
text/html:
schema:
type: string
404:
description: Player not found
content:
application/json: {}
text/html: {}
tags:
- player
"""
# TODO - actually, the schema above doesn't reference the `Player` class as I'd hoped it would.
# The docs at https://github.com/flasgger/flasgger#extracting-definitions are not super-clear.
# _Maybe_ what I'm trying to do is not possible?
player_from_db = db.session.get(Player, int(player_id))
if not player_from_db:
return "Not Found", 404
player_data = _jsonify(player_from_db)
content_type = request.headers.get("Content-Type")
if content_type == "application/json":
return player_data
else: # Assume they want HTML
return render_template("player_detail.html", **player_data)
@main.route("/players")
def list_players():
"""List Players
---
requestBody:
content:
application/json:
schema: {}
responses:
200:
description: Payload describing the players
content:
application/json:
schema:
type: array
items:
type: object
properties:
id: Player
text/html:
schema:
type: string
tags:
- player
"""
players = db.session.query(Player)
return [_jsonify(p) for p in players]
@main.route("/player/<player_id>", methods=["DELETE"])
def delete_player(player_id: str):
"""Delete a Player
---
parameters:
- name: player_id
in: path
required: true
schema:
type: integer
minimum: 1
description: The Player Id
requestBody:
content:
application/json:
schema: {}
responses:
204:
description: Empty
content:
application/json:
schema: {}
tags:
- player
"""
# Note - no checking that the player exists, because HTTP semantics specify
# that `DELETE` should be idempotent.
db.session.query(Player).filter(Player.id == int(player_id)).delete()
db.session.commit()
return "", 204
@main.route("/deck", methods=["POST"])
def create_deck():
"""Create a Deck
---
requestBody:
description: Payload describing the deck
required: true
content:
application/json:
schema:
type: object
properties:
name:
type: string
example: My First Deck
description:
type: string
example: Better than yours!
owner_id:
type: number
example: 1
required:
- name
- owner_id
responses:
201:
description: Payload containing Deck Id
schema:
type: object
properties:
id:
type: number
required:
- id
400:
description: Owner not found
tags:
- deck
"""
data = request.json
owner_id = data["owner_id"]
player_from_db = db.session.get(Player, int(owner_id))
if not player_from_db:
return f"Owner id {owner_id} not found", 400
deck = Deck(
name=data["name"], description=data.get("description"), owner_id=owner_id
)
db.session.add(deck)
db.session.commit()
print("Finished creating the deck!")
return {"id": deck.id}, 201
@main.route("/deck/<deck_id>")
def get_deck(deck_id: str):
"""Get a Deck
---
parameters:
- name: deck_id
in: path
required: true
schema:
type: integer
minimum: 1
description: The Deck Id
requestBody:
content:
application/json:
schema: {}
text/html:
schema: {}
responses:
200:
description: Payload describing deck
content:
application/json:
schema:
id: Deck
text/html:
schema:
type: string
404:
description: Deck not found
content:
application/json: {}
text/html: {}
tags:
- deck
"""
deck_from_db = db.session.get(Deck, int(deck_id))
if not deck_from_db:
return "Not Found", 404
deck_data = _jsonify(deck_from_db)
content_type = request.headers.get("Content-Type")
if content_type == "application/json":
return deck_data
else: # Assume they want HTML
owner_data = db.session.get(Player, int(deck_data["owner_id"]))
return render_template(
"deck_detail.html", deck=deck_data, owner=_jsonify(owner_data)
)
@main.route("/deck/<deck_id>", methods=["DELETE"])
def delete_deck(deck_id: str):
"""Delete a Deck
---
parameters:
- name: deck_id
in: path
required: true
schema:
type: integer
minimum: 1
description: The Deck Id
requestBody:
content:
application/json:
schema: {}
responses:
204:
description: Empty
content:
application/json:
schema: {}
tags:
- deck
"""
db.session.query(Deck).filter(Deck.id == int(deck_id)).delete()
db.session.commit()
return "", 204
# TODO - would this be better as a method on a class extending `db.Model` that the classes in `models.py` could then
# extend?
# (Probably not, as we'd still need to explicitly call it - it wouldn't be implicitly called _by_ Flask)
def _jsonify(o):
return {k: v for (k, v) in o.__dict__.items() if k != "_sa_instance_state"}

View File

@ -1,37 +0,0 @@
from . import db
# Note that a `Player` is "someone who plays in the Pod", whereas `User` (which will be implemented later) is "a user of
# this system". While all Users will _probably_ be Players, they do not have to be - and, it is likely that several
# Players will not be Users (if they don't register to use the system).
class Player(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String, nullable=False)
class Deck(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(60), nullable=False)
description = db.Column(db.String)
owner_id = db.Column(
db.String, db.ForeignKey(Player.__table__.c.id), nullable=False
)
class Game(db.Model):
id = db.Column(db.Integer, primary_key=True)
# TODO - columns like `location`, `writeups`, etc.
# Not wild about this structure ("fill in non-null columns to indicate decks that were present"), but what can you
# do with a database that doesn't support arrays? (Postgres _does_, but I don't wanna ramp up on a whole other
# database system just for that...)
deck_1 = db.Column(db.Integer)
deck_2 = db.Column(db.Integer)
deck_3 = db.Column(db.Integer)
deck_4 = db.Column(db.Integer)
deck_5 = db.Column(db.Integer)
deck_6 = db.Column(db.Integer)
deck_7 = db.Column(db.Integer)
deck_8 = db.Column(db.Integer)
deck_9 = db.Column(db.Integer)
deck_10 = db.Column(db.Integer)

0
app/routers/__init__.py Normal file
View File

34
app/routers/decks.py Normal file
View File

@ -0,0 +1,34 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from ..sql import crud, schemas
from ..sql.database import get_db
router = APIRouter()
@router.post("/deck", response_model=schemas.Deck, tags=["deck"], status_code=201)
def create_deck(deck: schemas.DeckCreate, db: Session = Depends(get_db)):
db_player = crud.get_player_by_id(db, deck.owner_id)
if db_player is None:
raise HTTPException(status_code=400, detail=f"Owner id {deck.owner_id} not found")
return crud.create_deck(db=db, deck=deck)
@router.get("/deck/{deck_id}", response_model=schemas.Deck, tags=["deck"])
def read_deck(deck_id: str, db = Depends(get_db)):
db_deck = crud.get_deck_by_id(db, deck_id)
if db_deck is None:
raise HTTPException(status_code=404, detail="Deck not found")
return db_deck
@router.get("/decks", response_model=list[schemas.Deck], tags=["deck"])
def list_decks(skip: int = 0, limit: int = 100, db = Depends(get_db)):
return crud.get_decks(db, skip=skip, limit=limit)
@router.delete("/deck/{deck_id}", tags=["deck"], status_code=204)
def delete_deck(deck_id: str, db = Depends(get_db)):
crud.delete_deck_by_id(db, int(deck_id))

30
app/routers/players.py Normal file
View File

@ -0,0 +1,30 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from ..sql import crud, schemas
from ..sql.database import get_db
router = APIRouter()
@router.post("/player", response_model=schemas.Player, tags=["player"], status_code=201)
def create_player(player: schemas.PlayerCreate, db: Session = Depends(get_db)):
return crud.create_player(db=db, player=player)
@router.get("/player/{player_id}", response_model=schemas.Player, tags=["player"])
def read_player(player_id: str, db = Depends(get_db)):
db_player = crud.get_player_by_id(db, player_id)
if db_player is None:
raise HTTPException(status_code=404, detail="Player not found")
return db_player
@router.get("/players", response_model=list[schemas.Player], tags=["player"])
def list_players(skip: int = 0, limit: int = 100, db = Depends(get_db)):
return crud.get_players(db, skip=skip, limit=limit)
@router.delete("/player/{player_id}", tags=["player"], status_code=204)
def delete_player(player_id: str, db = Depends(get_db)):
crud.delete_player_by_id(db, int(player_id))

0
app/sql/__init__.py Normal file
View File

38
app/sql/crud.py Normal file
View File

@ -0,0 +1,38 @@
from sqlalchemy.orm import Session
from . import models, schemas
def get_player_by_id(db: Session, player_id: int):
return db.query(models.Player).filter(models.Player.id == player_id).first()
def get_players(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Player).offset(skip).limit(limit).all()
def create_player(db: Session, player: schemas.PlayerCreate):
db_player = models.Player(**player.model_dump())
db.add(db_player)
db.commit()
db.refresh(db_player)
return db_player
def delete_player_by_id(db: Session, player_id: int):
db.query(models.Player).filter(models.Player.id == player_id).delete()
db.commit()
def get_deck_by_id(db: Session, deck_id: int):
return db.query(models.Deck).filter(models.Deck.id == deck_id).first()
def get_decks(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Deck).offset(skip).limit(limit).all()
def create_deck(db: Session, deck: schemas.DeckCreate):
db_deck = models.Deck(**deck.model_dump())
db.add(db_deck)
db.commit()
db.refresh(db_deck)
return db_deck
def delete_deck_by_id(db: Session, deck_id: int):
db.query(models.Deck).filter(models.Deck.id == deck_id).delete()
db.commit()
return "", 204

20
app/sql/database.py Normal file
View File

@ -0,0 +1,20 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
SQLALCHEMY_DATABASE_URL = os.environ.get('DATABASE_URL', "sqlite:///database/sql_app.db")
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

22
app/sql/models.py Normal file
View File

@ -0,0 +1,22 @@
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class Player(Base):
__tablename__ = "players"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
decks = relationship("Deck", back_populates="owner")
class Deck(Base):
__tablename__ = "decks"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
description = Column(String)
owner_id = Column(Integer, ForeignKey("players.id"))
owner = relationship("Player", back_populates="decks")

36
app/sql/schemas.py Normal file
View File

@ -0,0 +1,36 @@
from typing import Optional
from pydantic import BaseModel
class PlayerBase(BaseModel):
name: str
class PlayerCreate(PlayerBase):
pass
class Player(PlayerBase):
id: int
model_config = {
'from_attributes': True
}
class DeckBase(BaseModel):
name: str
description: Optional[str] = None
owner_id: int
class DeckCreate(DeckBase):
pass
class Deck(DeckBase):
id: int
model_config = {
'from_attributes': True
}

View File

@ -2,5 +2,4 @@
# Idempotent
source .venv/bin/activate
# `--debug` enables live-refresh
FLASK_APP=app SECRET_KEY=super-secret flask --debug run
uvicorn app:app --reload

View File

@ -12,7 +12,7 @@ services:
build:
context: .
ports:
- 5000:5000
- 8000:8000
# The commented out section below is an example of how to define a PostgreSQL
# database that your application can use. `depends_on` tells Docker Compose to

View File

@ -1,6 +1,10 @@
Flask
Flask-SQLAlchemy
flasgger
flask-login
pytest
fastapi
uvicorn
sqlalchemy
# Linting
ruff
# Testing
httpx
pytest

View File

@ -2,45 +2,31 @@ import os
import pathlib
import pytest
from flask import Flask
from flask.testing import FlaskClient
from fastapi.testclient import TestClient
# TODO - this seems to be a "magic path" which makes fixtures available. Learn more about it by reading
# https://stackoverflow.com/questions/34466027/what-is-conftest-py-for-in-pytest
from app import create_app
# https://flask.palletsprojects.com/en/2.3.x/testing/
@pytest.fixture()
def app_fixture() -> Flask:
def prime_database():
# Start afresh!
test_database_name = "testing-db.sqlite"
database_location = pathlib.Path("instance").joinpath(test_database_name)
if database_location.exists():
database_location.unlink()
database_dir = "database"
db_dir_path = pathlib.Path(database_dir)
if not db_dir_path.exists():
db_dir_path.mkdir()
db_dir_path.chmod(0o777)
os.environ["DATABASE_URI"] = f"sqlite:///{test_database_name}"
os.environ["SECRET_KEY"] = "testing-secret-key"
test_database_name = "testing_database.db"
db_path = db_dir_path.joinpath(test_database_name)
app = create_app()
# app.config.update({
# 'TESTING': True
# })
if db_path.exists():
db_path.unlink()
# other setup can go here
print(f'Setting database_url using {db_path}')
os.environ['DATABASE_URL'] = f'sqlite:///{db_path}'
yield app
# clean up / reset resources here
prime_database()
# This must be after `prime_database`, as the database initialization will happen
# during the import, and must do so after the environment-var setting
from app import app # noqa: E402
@pytest.fixture()
def client(app_fixture: Flask) -> FlaskClient:
return app_fixture.test_client()
@pytest.fixture()
def runner(app_fixture):
return app_fixture.test_cli_runner()
def test_client() -> TestClient:
return TestClient(app)

View File

@ -1,22 +1,22 @@
from typing import Mapping
from flask.testing import FlaskClient
from werkzeug.test import TestResponse
import httpx
from fastapi.testclient import TestClient
# These tests expect that the database starts empty.
# TODO: create tests with initialized states
# TODO - these cannot be run in parallel because they involve assumptions about the same database
from app import app
client = TestClient(app)
def test_add_and_retrieve_player(client: FlaskClient):
response = _json_get(client, "/player/1")
def test_add_and_retrieve_player(test_client: TestClient):
response = _json_get(test_client, "/player/1")
assert response.status_code == 404
create_player_response = _json_post(client, "/player", {"name": "jason"})
assert create_player_response.status_code == 201
response_1 = _json_get(client, "/player/1")
assert response_1.json["name"] == "jason"
assert response_1.json()["name"] == "jason"
# Cleanup
# TODO - put this in a finally clause (or similar as provided by pytest)
@ -24,50 +24,45 @@ def test_add_and_retrieve_player(client: FlaskClient):
assert delete_response.status_code == 204
def test_add_and_retrieve_deck(client: FlaskClient):
not_found_response = _json_get(client, "/deck/1")
def test_add_and_retrieve_deck(test_client: TestClient):
not_found_response = _json_get(test_client, "/deck/1")
assert not_found_response.status_code == 404
# Try (and fail) to create a deck owned by a non-existent player
invalid_owner_response = _json_post(
client, "/deck", {"name": "Baby's First Deck", "owner_id": 1}
test_client, "/deck", {"name": "Baby's First Deck", "owner_id": 1}
)
assert invalid_owner_response.status_code == 400
assert invalid_owner_response.text == "Owner id 1 not found"
assert invalid_owner_response.json()['detail'] == "Owner id 1 not found"
create_jim_response = _json_post(client, "/player", {"name": "jim"})
create_jim_response = _json_post(test_client, "/player", {"name": "jim"})
assert create_jim_response.status_code == 201
jim_id = create_jim_response.json["id"]
jim_id = create_jim_response.json()["id"]
create_deck_response = _json_post(
client, "/deck", {"name": "Baby's First Deck", "owner_id": str(jim_id)}
test_client, "/deck", {"name": "Baby's First Deck", "owner_id": str(jim_id)}
)
assert create_deck_response.status_code == 201
# _Should_ always be 1, since we expect to start with an empty database, but why risk it?
deck_id = create_deck_response.json["id"]
deck_id = create_deck_response.json()["id"]
get_deck_response = _json_get(client, f"/deck/{deck_id}")
get_deck_response = _json_get(test_client, f"/deck/{deck_id}")
assert get_deck_response.status_code == 200
assert get_deck_response.json["name"] == "Baby's First Deck"
assert get_deck_response.json()["name"] == "Baby's First Deck"
############
# VERY basic template testing!
############
html_response = client.get(f"/deck/{deck_id}")
assert "owned by jim" in html_response.text
# Cleanup
delete_response = _json_delete(client, f"/deck/{deck_id}")
delete_response = _json_delete(test_client, f"/deck/{deck_id}")
assert delete_response.status_code == 204
def _json_get(c: FlaskClient, path: str) -> TestResponse:
def _json_get(c: TestClient, path: str) -> httpx.Response:
return c.get(path, headers={"Content-Type": "application/json"})
def _json_post(c: FlaskClient, path: str, body: Mapping) -> TestResponse:
def _json_post(c: TestClient, path: str, body: Mapping) -> httpx.Response:
return c.post(path, headers={"Content-Type": "application/json"}, json=body)
def _json_delete(c: FlaskClient, path: str) -> TestResponse:
def _json_delete(c: TestClient, path: str) -> httpx.Response:
return c.delete(path, headers={"Content-Type": "application/json"})