【问题标题】:Can some provide me with the schema to recreate dag_run table in airflow-db.?有人可以为我提供在气流数据库中重新创建 dag_run 表的架构吗?
【发布时间】:2019-03-20 14:00:42
【问题描述】:

我在 GCP 上有一个谷歌云作曲家环境,我不小心删除了 dag_runs 表,因为气流调度程序不断崩溃,气流网络服务器无法启动。 我能够在 airflow-db 中重新创建 dag_run 表,该表停止了崩溃,但我认为我没有得到正确的架构,因为当我在气流网络服务器上手动触发 dag 时出现以下错误。

哎呀。

                      ____/ (  (    )   )  \___
                     /( (  (  )   _    ))  )   )\
                   ((     (   )(    )  )   (   )  )
                 ((/  ( _(   )   (   _) ) (  () )  )
                ( (  ( (_)   ((    (   )  .((_ ) .  )_
               ( (  )    (      (  )    )   ) . ) (   )
              (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
              ( (  (   ) (  )   (  ))     ) _)(   )  )  )
             ( (  ( \ ) (    (_  ( ) ( )  )   ) )  )) ( )
              (  (   (  (   (_ ( ) ( _    )  ) (  )  )   )
             ( (  ( (  (  )     (_  )  ) )  _)   ) _( ( )
              ((  (   )(    (     _    )   _) _(_ (  (_ )
               (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
               ((__)        \\||lll|l||///          \_))
                        (   /(/ (  )  ) )\   )
                      (    ( ( ( | | ) ) )\   )
                       (   /(| / ( )) ) ) )) )
                     (     ( ((((_(|)_)))))     )
                      (      ||\(|(|)|/||     )
                    (        |(||(||)||||        )
                      (     //|/l|||)|\\ \     )

(/ / // /|//||||\ \ \ \ _)

节点:38b47b3e06a1

Traceback (most recent call last):
  File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1988, in wsgi_app
    response = self.full_dispatch_request()
  File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1641, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1544, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/opt/python3.6/lib/python3.6/site-packages/flask/_compat.py", line 33, in reraise
    raise value
  File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1639, in full_dispatch_request
    rv = self.dispatch_request()
  File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1625, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/opt/python3.6/lib/python3.6/site-packages/flask_login.py", line 755, in decorated_view
    return func(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/www/utils.py", line 262, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/www/utils.py", line 309, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/www/views.py", line 929, in trigger
    external_trigger=True
  File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/models.py", line 3781, in create_dagrun
    run.refresh_from_db()
  File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/models.py", line 4439, in refresh_from_db
    DR.run_id == self.run_id
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3077, in one
    raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()

【问题讨论】:

  • 我用来重新创建表的 Dag_run 模式。 CREATE TABLE dag_run ( id int NOT NULL, dag_id varchar(250), execution_date datetime, state varchar(50), run_id varchar(250), external_trigger boolean, conf BLOB, end_date datetime, start_date datetime, PRIMARY KEY(id), UNIQUE ( dag_id,execution_date), UNIQUE (dag_id,run_id), CHECK (external_trigger IN (0,1)) );

标签: google-cloud-platform airflow google-cloud-composer


【解决方案1】:

DagRunSQLAlchemy模型可以帮上一点忙

id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
execution_date = Column(UtcDateTime, default=timezone.utcnow)
start_date = Column(UtcDateTime, default=timezone.utcnow)
end_date = Column(UtcDateTime)
_state = Column('state', String(50), default=State.RUNNING)
run_id = Column(String(ID_LEN))
external_trigger = Column(Boolean, default=True)
conf = Column(PickleType)

不过,这里是MySQL DDL 声明

mysql> SHOW CREATE TABLE `dag_run`;
...
CREATE TABLE `dag_run` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `dag_id` varchar(250) DEFAULT NULL,
  `execution_date` timestamp(6) NULL DEFAULT NULL,
  `state` varchar(50) DEFAULT NULL,
  `run_id` varchar(250) DEFAULT NULL,
  `external_trigger` tinyint(1) DEFAULT NULL,
  `conf` blob,
  `end_date` timestamp(6) NULL DEFAULT NULL,
  `start_date` timestamp(6) NULL DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `dag_id` (`dag_id`,`execution_date`),
  UNIQUE KEY `dag_id_2` (`dag_id`,`run_id`),
  KEY `dag_id_state` (`dag_id`,`state`)
)
ENGINE=InnoDB
AUTO_INCREMENT=177
DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_0900_ai_ci

以及表格说明

mysql> DESC dag_run;
+------------------+--------------+------+-----+---------+----------------+
| Field            | Type         | Null | Key | Default | Extra          |
+------------------+--------------+------+-----+---------+----------------+
| id               | int(11)      | NO   | PRI | NULL    | auto_increment |
| dag_id           | varchar(250) | YES  | MUL | NULL    |                |
| execution_date   | timestamp(6) | YES  |     | NULL    |                |
| state            | varchar(50)  | YES  |     | NULL    |                |
| run_id           | varchar(250) | YES  |     | NULL    |                |
| external_trigger | tinyint(1)   | YES  |     | NULL    |                |
| conf             | blob         | YES  |     | NULL    |                |
| end_date         | timestamp(6) | YES  |     | NULL    |                |
| start_date       | timestamp(6) | YES  |     | NULL    |                |
+------------------+--------------+------+-----+---------+----------------+

UPDATE-1

礼貌:@AyushChauhan,如果您尝试为某些 playground 环境修复此问题(您并不特别关心 Airflow 后端数据库的历史 DagRuns、TaskInstances 等数据.)

  • 那么airflow resetdb CLI 命令也可以用来解决这个问题

  • 但如果你还没有注意到,beware:

    它将从元数据数据库中删除所有条目。这包括 所有 dag 运行、变量和连接。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-11-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多