Notificaciones en tiempo real con Python y Postgres
Es 2019 y si eres un desarrollador Backend es muy probable que en alguna etapa de desarrollo fue o sera necesario implementar Websockets.
Hace aproximadamente unos cuatro años en el mundo de Python implementar funciones asíncronas en un backend era algo complicado ya que las opciones disponibles como Tornado o gevent tiene una curva de aprendizaje difícil. Muchos desarrolladores voltearon a ver con buenos ojos implementar backends con NodeJS para evitar los dolores de cabeza.
Después de un tiempo apareció Django Channels, uno de los primeros ASGI frameworks y la implementación de Websockets comenzaba a resultar más fácil. Django Channels nos daba la oportunidad de crear funciones sequenciales junto con funciones asíncronas. Más tarde otros frameworks le siguieron como Quart y FastAPI.
De estos tres el que más se ha ganado la preferencia de la comunidad ha sido FastAPI por su flexibilidad y rapidez. Y es un claro favorito para nosotros en lo personal.
La alineación estelar
- Postgres
- FastAPI
- AIOPG
Crear la base de datos
createuser food_user
createdb -E utf-8 fast_food_db
psql fast_food_db
alter user food_user with encrypted password <password>;
grant all privileges on database fast_food_db to food_user;
Crear un entorno virtual e instalar los paquetes
python3 -m venv my_env
my_env pip install fastapi SQLAlchemy alembic psycopg2 uvicorn aiopg
mkdir FastFoodAPI
cd FastFoodAPI
mkdir fastfoodapi
#iniciamos el entorno de alembic para las migraciones
my_env alembic init alembic
Editar alembic.ini
En este archivo configuramos la url de la base de datos
sqlalchemy.url = postgresql://food_user:<password>@localhost/fast_food_db
Editar env.py en la carpeta de alembic
En este archivo necesitamos importar nuestro modelos para ternerlos disponibles en memoria cuando se ejecuten las migraciones con el arg —autogenerate y de paso vamos a fijar el pythonpath al directorio raíz.
#set pythonpath a root
import sys
sys.path.append(".")
from fastfoodapi.meta import Base
from fastfoodapi.models import *
target_metadata = Base.metadata
Crear nuestro modelo fastfoodapi/models.py
from sqlalchemy import (
Column,
Index,
Integer,
Text,
orm,
MetaData,
Table,
DateTime,
LargeBinary,
ForeignKey,
Table,
Boolean,
func,
String,
BigInteger,
Numeric,
)
from sqlalchemy.orm import relationship
from .meta import Base
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
name = Column(Text)
quantity = Column(Integer)
price = Column(Numeric)
Creamos un archivo meta para Alembic fastfoodapi/meta.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import MetaData
#Recommended naming convention used by Alembic, as various different database
#providers will autogenerate vastly different names making migrations more
#difficult. See: http://alembic.zzzcomputing.com/en/latest/naming.html
NAMING_CONVENTION = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
}
metadata = MetaData(naming_convention=NAMING_CONVENTION)
Base = declarative_base(metadata=metadata)
Ejecutamos la migración
alembic revision --autogenerate -m "mensaje de migración"
alembic upgrade head
Crear triggers
Vamos a crear el trigger con alembic pero ahora solo vamos a crear una migración en blanco para agregar el sql.
alembic revision -m "mensaje de migración"
from sqlalchemy import text
def upgrade():
connection = op.get_bind()
trigger = text(
"""
CREATE OR REPLACE FUNCTION orders_event() RETURNS TRIGGER AS $$
DECLARE
record RECORD;
payload JSON;
BEGIN
IF (TG_OP = 'DELETE') THEN
record = OLD;
ELSE
record = NEW;
END IF;
payload = json_build_object('table', TG_TABLE_NAME,
'action', TG_OP,
'data', row_to_json(record));
PERFORM pg_notify('orders', payload::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER notify_order_event
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE PROCEDURE orders_event();
""")
connection.execute(trigger)
def downgrade():
drop_trigger = text("""
drop trigger notify_order_event on orders;
""")
connection = op.get_bind()
connection.execute(drop_trigger)
alembic upgrade head
Hasta este momento esta seria la estructura de nuestro proyecto.
Crear nuestro backend fastfoodapi/main.py
import json
import asyncio
import aiopg
from starlette.endpoints import WebSocketEndpoint
from starlette.websockets import WebSocket
from fastapi import Depends, FastAPI, HTTPException
dsn = ‘dbname=fast_food_db user=food_user password=password host=127.0.0.1’
app = FastAPI()
@app.websocket_route(‘/order_events’)
class WebSocketOrders(WebSocketEndpoint):
encoding = ‘json’
def __init__(self, scope, receive, send):
super().__init__(scope, receive, send)
self.connected = False
self.loop = asyncio.get_event_loop()
self.websocket = {}
@asyncio.coroutine
async def listen(self,conn,channel):
async with conn.cursor() as cur:
await cur.execute(“LISTEN {0}”.format(channel))
while True:
msg = await conn.notifies.get()
payload = json.loads(msg.payload)
if payload.get(‘action’) "INSERT":
await self.websocket.send_json({'message': "New order", 'data': payload.get('data')})
elif payload.get('action') “UPDATE”:
await self.websocket.send_json({‘message’: “Order update”, ‘data’: payload.get(‘data’)})
async def db_events(self,data: dict, websocket: WebSocket, channel: str):
async with aiopg.create_pool(dsn) as pool:
async with pool.acquire() as conn:
await asyncio.gather(self.listen(conn, channel))
async def on_receive(self, websocket: WebSocket, data: dict):
channel: str = data.get(‘channel’)
asyncio.ensure_future(self.db_events(data, websocket, channel), loop=self.loop)
async def on_connect(self, websocket: WebSocket):
await websocket.accept()
self.connected = True
self.websocket = websocket
async def on_close(self, websocket):
self.connected = False
self.websocket.close()
Con un backend muy inseguro iniciamos nuestro servicio con uvicorn
my_env uvicorn fastfoodapi.main:app --reload
El cliente
Ahora toca el turno de conectarnos a través del websocket. Lo haremos con Javascript.
<!DOCTYPE html>
<html>
<header>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css">
</header>
<body>
<div class="jumbotron text-center">
<h1>Real Time Notifications With Python And Postgres</h1>
</div>
<div class="container">
<div class="row">
<div id="log" class="col-lg-12"></div>
</div>
</div>
</body>
<script src="http://code.jquery.com/jquery.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/4.3.1/js/bootstrap.min.js"></script>
<script type="text/javascript">
var connection
$(function () {
log = document.getElementById('log');
window.WebSocket = window.WebSocket || window.MozWebSocket;
connection = new WebSocket('ws://localhost:8000/order_events')
connection.onopen = function () {
connection.send(JSON.stringify({"method":"subscribe","channel": "orders" }))
};
connection.onerror = function (error) {
log.innerHTML = '<li class="them">an error occurred when sending-receiving data</li>' + log.innerHTML;
};
connection.onmessage = function (message) {
try {
var json = JSON.parse(message.data);
let d = $("<div />", {class: 'data', "css": {background: 'white',padding: '0.5em', margin: '10px 0'}})
let h = $("<h1 />",{css:{margin:0},text: json.message})
let s = $("<h3 />",{css:{margin:0},text: json.data.name})
let p = $("<p />", {text: "Price " +json.data.price + " quantity "+ json.data.quantity})
d.append(h)
d.append(s)
d.append(p)
d.append($("<hr />"))
$("#log").append(d.fadeIn())
} catch (e) {
console.log('This doesn\'t look like a valid JSON: ', message);
return;
}
};
});
</script>
</html>
Ahora necesitamos generar inserts en la base de datos, lo haremos con el siguiente script orders-generator.py
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import (
sessionmaker
)
from time import sleep
from sqlalchemy.sql import select, insert
from sqlalchemy.ext.automap import automap_base
import random
db_url = “postgresql://food_user:pac4life@localhost/fast_food_db”
engine = create_engine(db_url,echo=True)
metadata = MetaData(bind=engine)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
SessionLocal.configure(bind=engine)
Base = automap_base()
class Order(Base):
tablename = ‘orders’
Base.prepare(engine, reflect=True)
db = SessionLocal()
products = [‘French Fries’, ‘Hamburguer’, ‘Nachos’, ‘Soda’, ‘Milkshake’, ‘Burrito’, ‘Hot Dog’, ‘Salad’]
qty = [1,2,3,4,5,6,7,8,9,12]
for num in range(1, 200):
order = Order()
order.name = random.choice(products)
order.price = random.choice(qty) * num
order.quantity = random.choice(qty)
db.add(order)
db.commit()
sleep(20)
Veremos algo así en nuestra pagina orders.html
¿Muy fácil no?. Debajo de toda esta magia esta Starlett que en conjunto con FastAPI nos brindan la oportunidad de crear servicios asyncio de alto rendimiento.
Todo el código esta en Github. En la segunda parte veremos cómo podemos hacer más seguro nuestro servicio con tokens JWT.
Commenting is closed for this article.