【问题标题】:airflow 1.10.5 scheduler connection errors to postgres到 postgres 的气流 1.10.5 调度程序连接错误
【发布时间】:2019-09-25 15:38:48
【问题描述】:

气流 1.10.5 芹菜执行者

Airflow scheduler crashing with postgres sqlalchemy connection error(错误详情如下) 此错误存在于一个环境中,其他类似环境有效。是否有任何版本不兼容,感谢任何帮助解决。

为了隔离问题,手动测试与 postgres 的连接没有问题

示例连接代码(sqlalchemy postgres 功能)

from sqlalchemy import create_engine

engine = create_engine("postgresql+psycopg2://username:password@servername.postgres.database.azure.com/airflow", echo=True, pool_size=6, max_overflow=10, encoding='latin1')

engine.connect()

print(engine)

错误

Celery Task ID: ('example_dag', 'bash_task', datetime.datetime(2018, 10, 8, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1)
Traceback (most recent call last):
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2275, in _wrap_pool_connect
    return fn()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 363, in connect
    return _ConnectionFairy._checkout(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 760, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 492, in checkout
    rec = pool._do_get()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 139, in _do_get
    self._dec_overflow()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 136, in _do_get
    return self._create_connection()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
    return _ConnectionRecord(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
    self.__connect(first_connect_check=True)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 639, in __connect
    connection = pool._invoke_creator(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
    return dialect.connect(*cargs, **cparams)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 453, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/psycopg2/__init__.py", line 126, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: could not connect to server: No such file or directory
    Is the server running locally and accepting
    connections on Unix domain socket "/var/run/postgresql/.s.PGSQL.5432"?


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/airflow/executors/celery_executor.py", line 106, in fetch_celery_task_state
    res = (celery_task[0], celery_task[1].state)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/result.py", line 473, in state
    return self._get_task_meta()['status']
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/result.py", line 412, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/base.py", line 386, in get_task_meta
    meta = self._get_task_meta_for(task_id)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/__init__.py", line 53, in _inner
    return fun(*args, **kwargs)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/__init__.py", line 122, in _get_task_meta_for
    session = self.ResultSession()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/__init__.py", line 99, in ResultSession
    **self.engine_options)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/session.py", line 59, in session_factory
    self.prepare_models(engine)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/celery/backends/database/session.py", line 54, in prepare_models
    ResultModelBase.metadata.create_all(engine)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/sql/schema.py", line 4294, in create_all
    ddl.SchemaGenerator, self, checkfirst=checkfirst, tables=tables
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2045, in _run_visitor
    with self._optional_conn_ctx_manager(connection) as conn:
  File "/usr/lib64/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2037, in _optional_conn_ctx_manager
    with self._contextual_connect() as conn:
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2239, in _contextual_connect
    self._wrap_pool_connect(self.pool.connect, None),
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2279, in _wrap_pool_connect
    e, dialect, self
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1544, in _handle_dbapi_exception_noconnection
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
    raise value.with_traceback(tb)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 2275, in _wrap_pool_connect
    return fn()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 363, in connect
    return _ConnectionFairy._checkout(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 760, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 492, in checkout
    rec = pool._do_get()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 139, in _do_get
    self._dec_overflow()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/impl.py", line 136, in _do_get
    return self._create_connection()
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
    return _ConnectionRecord(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
    self.__connect(first_connect_check=True)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/pool/base.py", line 639, in __connect
    connection = pool._invoke_creator(self)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 114, in connect
    return dialect.connect(*cargs, **cparams)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 453, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/kaiser2/airflow_venv/lib64/python3.6/site-packages/psycopg2/__init__.py", line 126, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not connect to server: No such file or directory
    Is the server running locally and accepting
    connections on Unix domain socket "/var/run/postgresql/.s.PGSQL.5432"?

(Background on this error at: http://sqlalche.me/e/e3q8)

【问题讨论】:

  • 关于这个问题的任何更新?目前在我这边看到它。

标签: postgresql sqlalchemy airflow


【解决方案1】:

我确实遇到了类似的问题。请检查您的 postgres 配置以查看配置文件是否在正确的位置。上次我遇到类似的错误是由于 postgresql 配置文件的文件错位......奇怪的是,我几乎没有在 posgresql 日志中看到任何错误,但连接到 postgresql 失败......并且端口转发正在工作.. 但是集群中的 pod 无法连接到 postgresql。

【讨论】:

  • 不确定您是否还有问题。您开始大量看到这些。 1. 设置 pgbouncer。 2. 低迷时重启worker进程。 3.如果你使用的是azure posgres sql,请切换到灵活服务器。
猜你喜欢
  • 2018-04-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-01-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多