【问题标题】:kedro nodes input accept kwargs?kedro 节点输入接受 kwargs?
【发布时间】:2020-08-31 10:37:18
【问题描述】:

https://kedro.readthedocs.io/en/stable/kedro.pipeline.node.Node.html#kedro.pipeline.node.Node.inputs

我有一个函数

def 函数(**kwargs): 返回

如何将变量作为节点输入传递给它?

**inputs**
Return node inputs as a list, in the order required to bind them properly to the node’s function. If the node’s function contains kwargs, then kwarg inputs are sorted alphabetically (for python 3.5 deterministic behavior).

【问题讨论】:

    标签: kedro


    【解决方案1】:

    使用装饰器工厂函数可以解决此问题。本例使用python3。

    您可以从 funtools 包中导入 wraps 装饰器

    
        from functools import wraps
    
    
        def deco_factory_df_params(
            *deco_args,
            **deco_kwargs,
        ) -> Callable:
            def deco_fn_df_params(
                    func: Callable,
            ) -> Callable:
    
                @wraps(func)
                def add_df_params(*args, **kwargs):
                    kwargs = {**kwargs, **deco_kwargs}
                    return func(*args, *deco_args, **kwargs)
    
                return add_df_params
    
            return deco_fn_df_params
    

    并在调用 kedro 管道中的节点时将您选择的关键字参数传递给此装饰工厂函数。 例如。

    
        node(
            func=deco_factory_df_params(
                parameter1="value1",
                parameter2="value2",
                parameter3="value3",
            )(your_function_name),
            inputs="data_input1",
            outputs=None,
            name="name_of_node",
        ),
    

    您的节点(函数)定义可能如下所示

    
        def your_function_name(
            df_input: pyspark.sql.DataFrame,
            parameter1: str,
            parameter2: str,
            parameter3: str,
            **kwargs,  # this is important to be here to avoid error
        ) -> None:
            function_code_here
            
            return None
    

    @mediumnok:我希望这对你的问题有用。

    【讨论】:

      【解决方案2】:

      您需要使用kedro.pipeline 内部的node 函数。您可以使用字典将 kwargs 传递给函数。

      from kedro.pipeline input Pipeline, node
      
      def my_kwarg_fun(**kwargs):
          print(kwargs)
      
      def create_pipeline():
          return Pipeline([
              node(
                  my_kwarg_fun,
                  inputs={
                      "my_kwarg": "example_my_kwarg"
                  },
                  outputs=None
              )
          ])
      
      # This should print
      # {'example_my_kwarg': ...}
      

      【讨论】:

      • 我收到此错误,而不是文件“c:\programdata\anaconda3\lib\site-packages\kedro\runner\runner.py”,第 89 行,运行“DataCatalog”.format(不满意) ValueError:在 DataCatalog 中找不到管道输入 {'example_my_kwarg'}
      • 所以如果“example_my_kwargs”是一个数据框,它就可以工作。似乎节点总是将字符串评估为变量,如果我实际上需要将字符串作为变量传递它不起作用,我必须在 parameters.yml 中定义它并使用 params:key 来引用。
      【解决方案3】:

      发送参数时使用密钥。

      例如,如果要发送STRINGTOPASS,请使用前缀为input_ 的可拆分键:

      ...node(
      func=function_to_send_parameter,
      inputs={"input_STRINGTOPASS": "valid_data_catalog_eg_dataframe_item", ...},
      outputs="another_valid_data_catalog_item",
      name="pass_parameter_node"), ...
      

      然后,在您的函数中,从 input_ 中提取 STRINGTOPASS,如下所示:

      def function_to_send_parameter(**kwargs):
      
          # parse input
          for key, value in kwargs.items():
              # parse dataframe and string
              if key.startswith("input_"):
                  df = value
                  input_string = key.split("input_")[1]
                  print("Received string: ", input_string)
          # parse other parameters
          ...
      

      【讨论】:

        猜你喜欢
        • 2016-06-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多