Operators-☂自定义ImpalaOperator-《Python教程》

admin 2025-10-19 23:22:09 编程 来源:ZONE.CI 全球网 0 阅读模式
    1. # coding: utf8
    2. import os
    3. import re
    4. from typing import Any, Dict, Optional
    5. from airflow.configuration import conf
    6. from airflow.exceptions import AirflowException
    7. from airflow.models import BaseOperator
    8. from custom_hooks._impala import ImpalaHook
    9. from airflow.utils import operator_helpers
    10. from airflow.utils.operator_helpers import context_to_airflow_vars
    11. class ImpalaOperator(BaseOperator):
    12. template_fields = ('hql', 'impala_conn_id')
    13. template_ext = ('.hql', '.sql')
    14. ui_color = '#f0e4ec'
    15. def __init__(
    16. self,
    17. *,
    18. hql: str = None,
    19. impala_conn_id: str = 'impala_default',
    20. db: str = 'default',
    21. hql_file: Optional[str] = None,
    22. hql_name: Optional[str] = None,
    23. impalaconf_jinja_translate: bool = False,
    24. run_as_owner: bool = False,
    25. **kwargs: Any,
    26. ) -> None:
    27. super().__init__(**kwargs)
    28. self.hql = hql
    29. self.impala_conn_id = impala_conn_id
    30. self.db = db
    31. self.hql_file = hql_file
    32. self.hql_name = hql_name
    33. self.impalaconf_jinja_translate = impalaconf_jinja_translate
    34. self.run_as = None
    35. if run_as_owner:
    36. self.run_as = self.dag.owner
    37. # assigned lazily - just for consistency we can create the attribute with a
    38. # `None` initial value, later it will be populated by the execute method.
    39. # This also makes `on_kill` implementation consistent since it assumes `self.hook`
    40. # is defined.
    41. self.hook: Optional[ImpalaHook] = None
    42. def get_hook(self) -> ImpalaHook:
    43. """Get Impala hook"""
    44. return ImpalaHook()
    45. def _parse_sqlfile(self):
    46. with open(self.hql_file, 'r', encoding="utf8") as f:
    47. if self.impalaconf_jinja_translate:
    48. file_content = re.sub(r"(\$\{([ batch_date|run_date|BATCH_DATE|RUN_DATE]*)\})",
    49. r"{{ ds }}", f.read())
    50. else:
    51. file_content = f.read()
    52. pattern = r"--\[(.*?)\](.*?)\n--\[end\].*?"
    53. sqls_dict = dict([(k, v) for k, v in re.findall(pattern, file_content, re.S) if v != ""])
    54. if self.hql_name:
    55. return sqls_dict[self.hql_name]
    56. elif len(sqls_dict.keys()) == 1:
    57. key = list(sqls_dict.keys())[0]
    58. return sqls_dict[key]
    59. else:
    60. raise AirflowException("You must specify `hql_name` when `hql_file` is defined!")
    61. def prepare_template(self) -> None:
    62. if self.impalaconf_jinja_translate:
    63. if self.hql_file is not None:
    64. self.hql = self._parse_sqlfile()
    65. else:
    66. self.hql = re.sub(r"(\$\{([ a-zA-Z0-9_|batch_date|run_date|BATCH_DATE|RUN_DATE|]*)\})",
    67. r"{{ ds }}", self.hql)
    68. else:
    69. if self.hql_file is not None:
    70. self.hql = self._parse_sqlfile()
    71. def execute(self, context: Dict[str, Any]) -> None:
    72. # self.log.info('Executing: %s', self.hql)
    73. self.hook = self.get_hook()
    74. self.hook.run_hql(self.hql)
    75. self.log.info("Executed successfully!")
    76. def dry_run(self) -> None:
    77. # Reset airflow environment variables to prevent
    78. # existing env vars from impacting behavior.
    79. self.clear_airflow_vars()
    80. self.hook = self.get_hook()
    81. self.hook.test_hql(hql=self.hql)
    82. def on_kill(self) -> None:
    83. if self.hook:
    84. self.hook.kill()
    85. def clear_airflow_vars(self) -> None:
    86. """Reset airflow environment variables to prevent existing ones from impacting behavior."""
    87. blank_env_vars = {
    88. value['env_var_format']: '' for value in operator_helpers.AIRFLOW_VAR_NAME_FORMAT_MAPPING.values()
    89. }
    90. os.environ.update(blank_env_vars)
    以太坊cppgolang区别 编程

    以太坊cppgolang区别

    以太坊是一种去中心化的开源平台,它采用智能合约技术,旨在构建和运行不受干扰的分布式应用程序。作为目前最受欢迎的区块链平台之一,以太坊提供了多种编程语言的支持,其
    progolang 编程

    progolang

    Go语言(Golang)是由Google开发的一门静态类型编程语言。作为一名专业的Golang开发者,我深知这门语言的优势和特点。在本文中,我将介绍Golang
    golangn个发送者 编程

    golangn个发送者

    Golang是一种开源的编程语言,由Google团队开发,旨在提高程序的并发性和简化软件开发过程。在Go语言中,有时需要向多个接收者发送信息。本文将介绍如何在G
    golang技能图谱 编程

    golang技能图谱

    从互联网行业的快速发展到人工智能技术的日益成熟,各种编程语言也应运而生。而在这众多的编程语言中,Golang(即Go)作为一门强大且高效的开发语言备受关注。Go
    评论:0   参与:  3