Problemas de conexión con SQLAlchemy y múltiples procesos

Florian Brucker:

Estoy usando PostgreSQL y SQLAlchemy en un proyecto que consiste en un proceso principal que inicia procesos secundarios. Todos estos procesos acceden a la base de datos a través de SQLAlchemy.

Estoy experimentando fallas de conexión repetibles: los primeros procesos secundarios funcionan correctamente, pero después de un tiempo se genera un error de conexión. Aquí hay un MWCE:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker

DB_URL = 'postgresql://user:password@localhost/database'

Base = declarative_base()

class Dummy(Base):
    __tablename__ = 'dummies'
    id = Column(Integer, primary_key=True)
    value = Column(Integer)

engine = None
Session = None
session = None

def init():
    global engine, Session, session
    engine = create_engine(DB_URL)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()

def cleanup():
    session.close()
    engine.dispose()

def target(id):
    init()
    try:
        dummy = session.query(Dummy).get(id)
        dummy.value += 1
        session.add(dummy)
        session.commit()
    finally:
        cleanup()

def main():
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        p = multiprocessing.Process(target=target, args=(dummy.id,))
        p.start()
        p.join()
        session.refresh(dummy)
        assert dummy.value == 2
    finally:
        cleanup()

if __name__ == '__main__':
    i = 1
    while True:
        print(i)
        main()
        i += 1

En mi sistema (PostgreSQL 9.6, SQLAlchemy 1.1.4, psycopg2 2.6.2, Python 2.7, Ubuntu 14.04) esto produce

1
2
3
4
5
6
7
8
9
10
11
Traceback (most recent call last):
  File "./fork_test.py", line 64, in <module>
    main()
  File "./fork_test.py", line 55, in main
    session.refresh(dummy)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1422, in refresh
    only_load_props=attribute_names) is None:
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
    return q.one()
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2756, in one
    ret = self.one_or_none()
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2726, in one_or_none
    ret = list(self)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2797, in __iter__
    return self._execute_and_instances(context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 469, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 11074}]

Esto es repetible y siempre se bloquea en la misma iteración.

Estoy creando un nuevo motor y sesión después de la bifurcación, según lo recomendado por la documentación de SQLAlchemy y en otros lugares . Curiosamente, el siguiente enfoque ligeramente diferente no falla:

import contextlib
import multiprocessing

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, create_engine
from sqlalchemy.orm import sessionmaker

DB_URL = 'postgresql://user:password@localhost/database'

Base = declarative_base()

class Dummy(Base):
    __tablename__ = 'dummies'
    id = Column(Integer, primary_key=True)
    value = Column(Integer)

@contextlib.contextmanager
def get_session():
    engine = sqlalchemy.create_engine(DB_URL)
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        yield session
    finally:
        session.close()
        engine.dispose()

def target(id):
    with get_session() as session:
        dummy = session.query(Dummy).get(id)
        dummy.value += 1
        session.add(dummy)
        session.commit()

def main():
    with get_session() as session:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        p = multiprocessing.Process(target=target, args=(dummy.id,))
        p.start()
        p.join()
        session.refresh(dummy)
        assert dummy.value == 2

if __name__ == '__main__':
    i = 1
    while True:
        print(i)
        main()
        i += 1

Dado que el código original es más complejo y no se puede cambiar simplemente a la última versión, me gustaría entender por qué uno de estos funciona y el otro no.

La única diferencia obvia es que el código de bloqueo utiliza variables globales para el motor y la sesión, que se comparten mediante copia en escritura con los procesos secundarios. Sin embargo, dado que los reinicio directamente después de la bifurcación, no entiendo cómo podría ser un problema.

Actualizar

Volví a ejecutar las dos piezas de código con la última SQLAlchemy (1.1.5) usando Python 2.7 y Python 3.4. En ambos, los resultados son básicamente los descritos anteriormente. Sin embargo, en Python 2.7, el bloqueo de la primera pieza de código ahora ocurre en la 13ª iteración (reproducible) mientras que en 3.4 ya ocurre en la tercera iteración (también reproducible). El segundo código se ejecuta sin problemas en ambas versiones. Aquí está el rastreo de 3.4:

1
2
3
Traceback (most recent call last):
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
psycopg2.OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "fork_test.py", line 64, in <module>
    main()
  File "fork_test.py", line 55, in main
    session.refresh(dummy)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 1424, in refresh
    only_load_props=attribute_names) is None:
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident
    return q.one()
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2749, in one
    ret = self.one_or_none()
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2719, in one_or_none
    ret = list(self)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2790, in __iter__
    return self._execute_and_instances(context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2813, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
 [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 3397}]

Aquí está el registro de PostgreSQL (es el mismo para 2.7 y 3.4):

2017-01-18 10:59:36 UTC [22429-1] LOG:  database system was shut down at 2017-01-18 10:59:35 UTC
2017-01-18 10:59:36 UTC [22429-2] LOG:  MultiXact member wraparound protections are now enabled
2017-01-18 10:59:36 UTC [22428-1] LOG:  database system is ready to accept connections
2017-01-18 10:59:36 UTC [22433-1] LOG:  autovacuum launcher started
2017-01-18 10:59:36 UTC [22435-1] [unknown]@[unknown] LOG:  incomplete startup packet
2017-01-18 11:00:10 UTC [22466-1] user@db LOG:  SSL error: decryption failed or bad record mac
2017-01-18 11:00:10 UTC [22466-2] user@db LOG:  could not receive data from client: Connection reset by peer

(Tenga en cuenta que el mensaje sobre el paquete de inicio incompleto es inofensivo )

Ilja Everilä:

Citando "¿Cómo uso motores / conexiones / sesiones con multiprocesamiento de Python u os.fork ()?" con énfasis adicional:

El objeto SQLAlchemy Engine se refiere a un grupo de conexiones de conexiones de bases de datos existentes. Entonces, cuando este objeto se replica en un proceso secundario, el objetivo es garantizar que no se transfieran conexiones a la base de datos .

y

Sin embargo, para el caso de una sesión o conexión activa de transacción que se comparte, no hay una solución automática para esto; una aplicación debe garantizar que un nuevo proceso secundario solo inicie nuevos objetos y transacciones de Connection, así como objetos de sesión ORM.

El problema surge del proceso secundario bifurcado que hereda el live global session, que se está aferrando a a Connection. Cuando targetllama init, sobrescribe las referencias globales enginey session, por lo tanto, disminuye sus recuentos a 0 en el elemento secundario, obligándolos a finalizar. Si, por ejemplo, de una forma u otra crea otra referencia a la sesión heredada en el elemento secundario, evita que se limpie, pero no lo haga. Después de mainunirse y volver al negocio como siempre, está tratando de usar la conexión ahora potencialmente finalizada, o fuera de sincronización. En cuanto a por qué esto causa un error solo después de una cierta cantidad de iteraciones, no estoy seguro.

La única forma de manejar esta situación usando globales de la manera que lo haces es

  1. Cerrar todas las sesiones
  2. Llamada engine.dispose()

antes de bifurcar. Esto evitará que las conexiones se filtren al niño. Por ejemplo:

def main():
    global session
    init()
    try:
        dummy = Dummy(value=1)
        session.add(dummy)
        session.commit()
        dummy_id = dummy.id
        # Return the Connection to the pool
        session.close()
        # Dispose of it!
        engine.dispose()
        # ...or call your cleanup() function, which does the same
        p = multiprocessing.Process(target=target, args=(dummy_id,))
        p.start()
        p.join()
        # Start a new session
        session = Session()
        dummy = session.query(Dummy).get(dummy_id)
        assert dummy.value == 2
    finally:
        cleanup()

Su segundo ejemplo no desencadena la finalización en el elemento secundario, por lo que solo parece funcionar, aunque podría estar tan roto como el primero, ya que todavía está heredando una copia de la sesión y su conexión definida localmente main.

Este artículo se recopila de Internet, indique la fuente cuando se vuelva a imprimir.

En caso de infracción, por favor [email protected] Eliminar

Editado en
0

Déjame decir algunas palabras

0Comentarios
Iniciar sesiónRevisión de participación posterior

Artículos relacionados

Lista de procesos de elementos 'N' con múltiples subprocesos

¿Cómo crear una conexión sql alchemy para pandas read_sql con sqlalchemy + pyodbc y múltiples bases de datos en MS SQL Server?

Producir múltiples combinaciones con declaración sqlalchemy y api de expresión

Producir múltiples combinaciones con declaración sqlalchemy y api de expresión

Uso de Flask-SQLAlchemy en múltiples procesos uWSGI

Problemas de conexión de SQLAlchemy y Oracle 12c

EF, ASP MVC + inyección de dependencia. Problemas con múltiples solicitudes concurrentes y conectividad de base de datos

QThread con problemas de conexión QTimer

¿Actualización correlacionada de SQLAlchemy con múltiples columnas?

Workmanager con múltiples procesos

Workmanager con múltiples procesos

múltiples procesos con ProcessBuilder

Múltiples procesos con vfork ()

Comunicación entre múltiples procesos con zmq

Multiprocesamiento de Python, problemas de terminación de procesos al reiniciar y prevención de zombis

Problemas con la conexión de Node.js y MySQL

Problemas de leyenda con múltiples puntos

Problemas anidados y múltiples de <marquee>

SQL a Linq, tiene problemas en SUM y el grupo de columnas múltiples con uniones

Problemas con la columna desplazable en y con el ejemplo de listas de conexión ordenables de JQuery UI

Procesos de ejecución de múltiples subprocesos simultáneamente

Problemas con la creación de subparcelas para múltiples conjuntos de datos con matplotlib

¿Algún enfoque factible para usar múltiples GPU, múltiples procesos con tensorflow?

¿Cadena de conexión para MySQL con múltiples parámetros de conexión?

C # - "La conexión debe ser válida y abierta" con múltiples consultas y métodos | ¿Conexión no cerrada?

Variable de configuración compartida entre múltiples procesos

Cómo controlar la depuración de múltiples procesos

Resolución de sobrecarga con múltiples funciones y múltiples operadores de conversión

SQL Server 2014, SSDT: implementación de paquetes .dtsx en vivo, problemas con administradores de conexión y variables

TOP Lista

CalienteEtiquetas

Archivo