设计DAG-并联任务-《Python教程》

admin 2025-10-19 23:25:40 编程 来源:ZONE.CI 全球网 0 阅读模式

    之前我们简单总结过串联任务的 DAG 脚本如何编写,具体可以参考一下 串联任务。下面我们改写一下脚本,来让脚本并行跑任务。将任务处理成并行 task,最大的好处就是可以极大地节省时间,下面直接贴一下并联代码:

    1. def run_case_sql(i=0):
    2. """运行sql函数"""
    3. batch_date = airflow_get_date(today_key, 0)
    4. context = {"BATCH_DATE": batch_date}
    5. sql_file = os.path.join(sql_path, "case.sql")
    6. runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
    7. runner.run_sql_block(sql_name="sql" + str(i))
    8. runner.close()
    9. def run_phone_sql(i=0):
    10. """运行sql函数"""
    11. batch_date = airflow_get_date(today_key, 0)
    12. context = {"BATCH_DATE": batch_date}
    13. sql_file = os.path.join(sql_path, "phone.sql")
    14. runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
    15. runner.run_sql_block(sql_name="sql" + str(i))
    16. runner.close()
    17. default_args = {
    18. "owner": "yumingmin",
    19. "depends_on_past": False,
    20. "start_date": days_ago(1),
    21. "email": ["[email protected]"],
    22. "email_on_failure": False,
    23. "email_on_retry": False,
    24. "retries": 60,
    25. "retry_delay": timedelta(minutes=5),
    26. "catchup": False
    27. }
    28. dag = DAG(
    29. project_name,
    30. default_args=default_args,
    31. description=project_name,
    32. schedule_interval='30 8 * * *')
    33. init_op = PythonOperator(
    34. task_id="init",
    35. python_callable=init,
    36. dag=dag)
    37. for j in range(0, 9):
    38. parallel_op = PythonOperator(
    39. task_id="run_case_sql" + str(j + 1),
    40. pyttaskflowshon_callable=run_case_sql,
    41. op_kwargs={"i": j + 1},
    42. dag=dag)
    43. init_variable_op >> parallel_op
    44. for j in range(0, 2):
    45. parallel_op = PythonOperator(
    46. task_id="run_phone_sql" + str(j + 1),
    47. python_callable=run_case_sql,
    48. op_kwargs={"i": j + 1},
    49. dag=dag)
    50. init_variable_op >> parallel_op

    💡 注意:如果是并行任务,一定是要有一个并行的分支点才可以。

    以太坊cppgolang区别 编程

    以太坊cppgolang区别

    以太坊是一种去中心化的开源平台,它采用智能合约技术,旨在构建和运行不受干扰的分布式应用程序。作为目前最受欢迎的区块链平台之一,以太坊提供了多种编程语言的支持,其
    progolang 编程

    progolang

    Go语言(Golang)是由Google开发的一门静态类型编程语言。作为一名专业的Golang开发者,我深知这门语言的优势和特点。在本文中,我将介绍Golang
    golangn个发送者 编程

    golangn个发送者

    Golang是一种开源的编程语言,由Google团队开发,旨在提高程序的并发性和简化软件开发过程。在Go语言中,有时需要向多个接收者发送信息。本文将介绍如何在G
    golang技能图谱 编程

    golang技能图谱

    从互联网行业的快速发展到人工智能技术的日益成熟,各种编程语言也应运而生。而在这众多的编程语言中,Golang(即Go)作为一门强大且高效的开发语言备受关注。Go
    评论:0   参与:  7