Notificaciones en tiempo real con Python y Postgres

Oct 24

Es 2019 y si eres un desarrollador Backend es muy probable que en alguna etapa de desarrollo fue o sera necesario implementar Websockets.

Hace aproximadamente unos cuatro años en el mundo de Python implementar funciones asíncronas en un backend era algo complicado ya que las opciones disponibles como Tornado o gevent tiene una curva de aprendizaje difícil. Muchos desarrolladores voltearon a ver con buenos ojos implementar backends con NodeJS para evitar los dolores de cabeza.

Después de un tiempo apareció Django Channels, uno de los primeros ASGI frameworks y la implementación de Websockets comenzaba a resultar más fácil. Django Channels nos daba la oportunidad de crear funciones sequenciales junto con funciones asíncronas. Más tarde otros frameworks le siguieron como Quart y FastAPI.

De estos tres el que más se ha ganado la preferencia de la comunidad ha sido FastAPI por su flexibilidad y rapidez. Y es un claro favorito para nosotros en lo personal.

La alineación estelar

  • Postgres
  • FastAPI
  • AIOPG

Crear la base de datos


createuser food_user
createdb -E utf-8 fast_food_db
psql fast_food_db
alter user food_user with encrypted password <password>;
grant all privileges on database fast_food_db to food_user;

Crear un entorno virtual e instalar los paquetes

python3 -m venv my_env
my_env pip install fastapi SQLAlchemy alembic psycopg2 uvicorn aiopg
mkdir FastFoodAPI
cd FastFoodAPI
mkdir fastfoodapi
#iniciamos el entorno de alembic para las migraciones
my_env alembic init alembic

Editar alembic.ini

En este archivo configuramos la url de la base de datos

sqlalchemy.url = postgresql://food_user:<password>@localhost/fast_food_db

Editar env.py en la carpeta de alembic

En este archivo necesitamos importar nuestro modelos para ternerlos disponibles en memoria cuando se ejecuten las migraciones con el arg —autogenerate y de paso vamos a fijar el pythonpath al directorio raíz.


#set pythonpath a root
import sys
sys.path.append(".")
from fastfoodapi.meta import Base
from fastfoodapi.models import *  
target_metadata = Base.metadata

Crear nuestro modelo fastfoodapi/models.py

 
from sqlalchemy import (
    Column,
    Index,
    Integer,
    Text,
    orm,
    MetaData,
    Table,
    DateTime,
    LargeBinary,
    ForeignKey,
    Table,
    Boolean,
    func,
    String,
    BigInteger,
    Numeric,
)
from sqlalchemy.orm import relationship
from .meta import Base
class Order(Base):
    __tablename__ = "orders"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(Text)
    quantity = Column(Integer)
    price = Column(Numeric)

Creamos un archivo meta para Alembic fastfoodapi/meta.py

 
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import MetaData
#Recommended naming convention used by Alembic, as various different database
#providers will autogenerate vastly different names making migrations more
#difficult. See: http://alembic.zzzcomputing.com/en/latest/naming.html
NAMING_CONVENTION = {
    "ix": "ix_%(column_0_label)s",
    "uq": "uq_%(table_name)s_%(column_0_name)s",
    "ck": "ck_%(table_name)s_%(constraint_name)s",
    "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
    "pk": "pk_%(table_name)s"
}
metadata = MetaData(naming_convention=NAMING_CONVENTION)
Base = declarative_base(metadata=metadata)

Ejecutamos la migración

alembic revision --autogenerate -m "mensaje de migración"
alembic upgrade head

Crear triggers

Vamos a crear el trigger con alembic pero ahora solo vamos a crear una migración en blanco para agregar el sql.

alembic revision -m "mensaje de migración"
 
from sqlalchemy import text
def upgrade():
	connection = op.get_bind()
	trigger = text(
	"""
		CREATE OR REPLACE FUNCTION orders_event() RETURNS TRIGGER AS $$
		DECLARE
		record RECORD;
		payload JSON;
		BEGIN
		IF (TG_OP = 'DELETE') THEN
		record = OLD;
		ELSE
		record = NEW;
		END IF;
		payload = json_build_object('table', TG_TABLE_NAME,
		                        'action', TG_OP,
		                        'data', row_to_json(record));
		PERFORM pg_notify('orders', payload::text);
		RETURN NULL;
		END;
		$$ LANGUAGE plpgsql;
		CREATE TRIGGER notify_order_event
		AFTER INSERT OR UPDATE OR DELETE ON orders
		FOR EACH ROW EXECUTE PROCEDURE orders_event();
  """)
connection.execute(trigger)
def downgrade():
	drop_trigger = text("""
		drop trigger notify_order_event on orders;
		""")
	connection = op.get_bind()
	connection.execute(drop_trigger)
alembic upgrade head

Hasta este momento esta seria la estructura de nuestro proyecto.

Crear nuestro backend fastfoodapi/main.py

 
import json
import asyncio
import aiopg
from starlette.endpoints import WebSocketEndpoint
from starlette.websockets import WebSocket
from fastapi import Depends, FastAPI, HTTPException

dsn = ‘dbname=fast_food_db user=food_user password=password host=127.0.0.1’

app = FastAPI()

@app.websocket_route(‘/order_events’)
class WebSocketOrders(WebSocketEndpoint): encoding = ‘json’ def __init__(self, scope, receive, send): super().__init__(scope, receive, send) self.connected = False self.loop = asyncio.get_event_loop() self.websocket = {}

@asyncio.coroutine async def listen(self,conn,channel): async with conn.cursor() as cur: await cur.execute(“LISTEN {0}”.format(channel)) while True: msg = await conn.notifies.get() payload = json.loads(msg.payload) if payload.get(‘action’) "INSERT": await self.websocket.send_json({'message': "New order", 'data': payload.get('data')}) elif payload.get('action') “UPDATE”: await self.websocket.send_json({‘message’: “Order update”, ‘data’: payload.get(‘data’)}) async def db_events(self,data: dict, websocket: WebSocket, channel: str): async with aiopg.create_pool(dsn) as pool: async with pool.acquire() as conn: await asyncio.gather(self.listen(conn, channel)) async def on_receive(self, websocket: WebSocket, data: dict): channel: str = data.get(‘channel’) asyncio.ensure_future(self.db_events(data, websocket, channel), loop=self.loop) async def on_connect(self, websocket: WebSocket): await websocket.accept() self.connected = True self.websocket = websocket async def on_close(self, websocket): self.connected = False self.websocket.close()

Con un backend muy inseguro iniciamos nuestro servicio con uvicorn

my_env uvicorn fastfoodapi.main:app --reload

El cliente

Ahora toca el turno de conectarnos a través del websocket. Lo haremos con Javascript.

 
<!DOCTYPE html>
<html>
<header>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css">
</header>
<body>
    <div class="jumbotron text-center">
      <h1>Real Time Notifications With Python And Postgres</h1>
    </div>
    <div class="container">
      <div class="row">
        <div id="log" class="col-lg-12"></div>
    </div>
</div>
</body>
<script src="http://code.jquery.com/jquery.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/4.3.1/js/bootstrap.min.js"></script>
<script type="text/javascript">
    var connection
    $(function () {
        log = document.getElementById('log');
        window.WebSocket = window.WebSocket || window.MozWebSocket;
        connection = new WebSocket('ws://localhost:8000/order_events')
        connection.onopen = function () {
            connection.send(JSON.stringify({"method":"subscribe","channel": "orders" }))
        };
        connection.onerror = function (error) {
        log.innerHTML = '<li class="them">an error occurred when sending-receiving data</li>' + log.innerHTML;
        };
        connection.onmessage = function (message) {
        try {
        var json = JSON.parse(message.data);
        let d = $("<div />", {class: 'data', "css": {background: 'white',padding: '0.5em', margin: '10px 0'}})
        let h = $("<h1 />",{css:{margin:0},text: json.message})
        let s = $("<h3 />",{css:{margin:0},text: json.data.name})
        let p = $("<p />", {text: "Price " +json.data.price + " quantity "+ json.data.quantity})
        d.append(h)
        d.append(s)
        d.append(p)
        d.append($("<hr />"))
        $("#log").append(d.fadeIn())
        } catch (e) {
        console.log('This doesn\'t look like a valid JSON: ', message);
        return;
        }
        };
    });
</script>
</html>

Ahora necesitamos generar inserts en la base de datos, lo haremos con el siguiente script orders-generator.py

 
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import (
   sessionmaker
   )
from time import sleep
from sqlalchemy.sql import select, insert
from sqlalchemy.ext.automap import automap_base
import random

db_url = “postgresql://food_user:pac4life@localhost/fast_food_db”
engine = create_engine(db_url,echo=True)
metadata = MetaData(bind=engine)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
SessionLocal.configure(bind=engine)
Base = automap_base()

class Order(Base): tablename = ‘orders’

Base.prepare(engine, reflect=True)

db = SessionLocal() products = [‘French Fries’, ‘Hamburguer’, ‘Nachos’, ‘Soda’, ‘Milkshake’, ‘Burrito’, ‘Hot Dog’, ‘Salad’] qty = [1,2,3,4,5,6,7,8,9,12] for num in range(1, 200): order = Order() order.name = random.choice(products) order.price = random.choice(qty) * num order.quantity = random.choice(qty) db.add(order) db.commit() sleep(20)

Veremos algo así en nuestra pagina orders.html

¿Muy fácil no?. Debajo de toda esta magia esta Starlett que en conjunto con FastAPI nos brindan la oportunidad de crear servicios asyncio de alto rendimiento.

Todo el código esta en Github. En la segunda parte veremos cómo podemos hacer más seguro nuestro servicio con tokens JWT.

Comentarios (0)
Publicar comentario

Enter your comment below. Fields marked * are required. You must preview your comment before submitting it.