设计DAG-串联任务-《Python教程》

admin 2025-10-19 23:25:10 编程 来源:ZONE.CI 全球网 0 阅读模式

日常任务中,我们会遇到一些串联任务,比如下方这个 run_case_sql1run_case_sql2...run_case_sql9,其实每个 task 运行的功能都是类似的,就是运行文件中的一段 SQL。这里只有 9 个还好,但是如果有 100 多个,一个一个写 task 会让代码显得很冗余,而且非常容易出粗,一点也不 Pythonic。image.png

下面来简化一段代码,来让程序更简单一点。

1. SQL文件

我们有一个 case.sql 文件,里面存放了我们需要跑的 SQL 代码,内容如下(中间省去 sql2 到 sql8 代码):

  1. --[sql1]
  2. DROP TABLE IF EXISTS test.tb1
  3. ;
  4. CREATE TABLE IF NOT EXISTS test.tb1
  5. STORED AS PARQUET AS
  6. SELECT * FROM test.tb
  7. ;
  8. COMPUTE STATS test.tb1
  9. ;
  10. --[end]
  11. --[sql9]
  12. DROP TABLE IF EXISTS test.tb9
  13. ;
  14. CREATE TABLE IF NOT EXISTS test.tb9
  15. STORED AS PARQUET AS
  16. SELECT * FROM test.tb8
  17. ;
  18. COMPUTE STATS test.tb9
  19. ;
  20. --[end]

2. 复用函数

这是一段复用函数,主要用于对应文件中的 SQL 代码:

  1. def run_case_sql(i=0):
  2. """运行sql函数"""
  3. batch_date = airflow_get_date(today_key, 0)
  4. context = {"BATCH_DATE": batch_date}
  5. sql_file = os.path.join(sql_path, "case.sql")
  6. runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
  7. runner.run_sql_block(sql_name="sql" + str(i))
  8. runner.close()
  9. def run_phone_sql(i=0):
  10. """运行sql函数"""
  11. batch_date = airflow_get_date(today_key, 0)
  12. context = {"BATCH_DATE": batch_date}
  13. sql_file = os.path.join(sql_path, "phone.sql")
  14. runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
  15. runner.run_sql_block(sql_name="sql" + str(i))
  16. runner.close()

3. DAG文件

下面我们编写 DAG 文件

  1. default_args = {
  2. "owner": "yumingmin",
  3. "depends_on_past": False,
  4. "start_date": days_ago(1),
  5. "email": ["[email protected]"],
  6. "email_on_failure": False,
  7. "email_on_retry": False,
  8. "retries": 60,
  9. "retry_delay": timedelta(minutes=5),
  10. "catchup": False
  11. }
  12. dag = DAG(
  13. project_name,
  14. default_args=default_args,
  15. description=project_name,
  16. schedule_interval='30 8 * * *')
  17. taskflows = []
  18. for j in range(0, 9):
  19. taskflows.append(PythonOperator(
  20. task_id="run_case_sql" + str(j + 1),
  21. pyttaskflowshon_callable=run_case_sql,
  22. op_kwargs={"i": j + 1},
  23. dag=dag)
  24. )
  25. if j != 0:
  26. taskflows[-2] >> taskflows[-1]
  27. for j in range(0, 2):
  28. order.append(PythonOperator(
  29. task_id="run_phone_sql" + str(j + 1),
  30. python_callable=run_case_sql,
  31. op_kwargs={"i": j + 1},
  32. dag=dag)
  33. )
  34. taskflows[-2] >> taskflows[-1]

这样我们就代码量极大地进行了缩短,Perfect 🤟

以太坊cppgolang区别 编程

以太坊cppgolang区别

以太坊是一种去中心化的开源平台,它采用智能合约技术,旨在构建和运行不受干扰的分布式应用程序。作为目前最受欢迎的区块链平台之一,以太坊提供了多种编程语言的支持,其
progolang 编程

progolang

Go语言(Golang)是由Google开发的一门静态类型编程语言。作为一名专业的Golang开发者,我深知这门语言的优势和特点。在本文中,我将介绍Golang
golangn个发送者 编程

golangn个发送者

Golang是一种开源的编程语言,由Google团队开发,旨在提高程序的并发性和简化软件开发过程。在Go语言中,有时需要向多个接收者发送信息。本文将介绍如何在G
golang技能图谱 编程

golang技能图谱

从互联网行业的快速发展到人工智能技术的日益成熟,各种编程语言也应运而生。而在这众多的编程语言中,Golang(即Go)作为一门强大且高效的开发语言备受关注。Go
评论:0   参与:  8