Flujo de aire: obtuvo un argumento de palabra clave inesperado 'dag'

Felipe FB

Sé que ya hay una función de flujo de aire que pasa el archivo de Cloud Storage a Big Query, como hice, hice la conexión dentro del script con el GCP de la misma manera que lo haría sin el flujo de aire, llamé al PythonOperator para llamar a la función que Configuré en la secuencia de comandos para leer Cloud Storage e insertar los datos del archivo en Big Query, sin embargo, aparece el mensaje de error: "obtuve un argumento de palabra clave inesperado 'dag'"

Parece ser una cosa bastante simple de resolver, pero realmente no sé lo que eso significa ya que puse los atributos DAG dentro del PythonOperator

import json
import decimal
import airflow
from airflow import DAG
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks.mssql_hook import MsSqlHook
from tempfile import NamedTemporaryFile
import pymssql  
import logging
import os
# import cloudstorage as gcs
from google.cloud import bigquery
from oauth2client.client import GoogleCredentials
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'depends_on_past': False,
    # If a task fails, retry it once after waiting
    # at least 5 minutes
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='test_tab1',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    dagrun_timeout=timedelta(minutes=60)
)

try:
    script_path = os.path.dirname(os.path.abspath(__file__)) + "/"
except:
    script_path = "/usr/local/airflow/key/key.json"

#Bigquery Credentials and settings
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path 

def insert_bigquery(self):
    bigquery_client = bigquery.Client(project="project-name")
    dataset_ref = bigquery_client.dataset('bucket-name')
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.skip_leading_rows = 1
    job_config.source_format = bigquery.SourceFormat.CSV
    time_partitioning = bigquery.table.TimePartitioning()
    job_config.time_partitioning = time_partitioning
    job_config.clustering_fields = ["id"]
    #job_config.field_delimiter = ";"
    uri = "gs://bucket-name/"+filename
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table('tab1'),
        job_config=job_config
        )
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()
    print('Job finished.')


json_gcs_to_bq = PythonOperator(
    task_id='json_gcs_to_bq',
    python_callable=insert_bigquery,
    provide_context=True,
    dag=dag)

Mensaje de error:

[2019-06-21 15:45:40,732] {{models.py:1760}} ERROR - insert_bigquery() got an unexpected keyword argument 'dag'
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
TypeError: insert_bigquery() got an unexpected keyword argument 'dag'
Ryan Yuan

No es necesario analizar en sí mismo en yor python_callable. Modifique el parámetro de la función insert_bigquery como def insert_bigquery (ds, ** kwargs) en lugar de def insert_bigquery (self) .

Referencia: https://airflow.apache.org/howto/operator/python.html

Este artículo se recopila de Internet, indique la fuente cuando se vuelva a imprimir.

En caso de infracción, por favor [email protected] Eliminar

Editado en
0

Déjame decir algunas palabras

0Comentarios
Iniciar sesiónRevisión de participación posterior

Artículos relacionados

Flujo de aire: obtuvo un argumento de palabra clave inesperado 'conf'

la tarea obtuvo un argumento inesperado 'dag' en el flujo de aire

to_pandas () obtuvo un argumento de palabra clave inesperado

TypeError: use () obtuvo un argumento de palabra clave inesperado 'advertir'

TypeError: post () obtuvo un argumento de palabra clave inesperado

TypeError: Movie () obtuvo un argumento de palabra clave inesperado 'actores'

TypeError: __init __ () obtuvo un argumento de palabra clave inesperado 'recote'

__init __ () obtuvo un argumento de palabra clave inesperado 'solicitud'

obtuvo un argumento de palabra clave inesperado 'with_labels'

django __init __ () obtuvo un argumento de palabra clave inesperado 'contenido'

TypeError: __init __ () obtuvo un argumento de palabra clave inesperado 'camera'

Error: astype () obtuvo un argumento de palabra clave inesperado 'categorías'

obtuvo un argumento de palabra clave inesperado 'id'

TypeError: attrib () obtuvo un argumento de palabra clave inesperado 'convertir'

request () obtuvo un argumento de palabra clave inesperado 'json'

render () obtuvo un argumento de palabra clave inesperado 'renderizador'

mean () obtuvo un argumento de palabra clave inesperado 'dtype'!

seaborn: lmplot () obtuvo un argumento de palabra clave inesperado 'figsize'

__init __ () obtuvo un argumento de palabra clave inesperado 'usuario'

TypeError: __init __ () obtuvo un argumento de palabra clave inesperado 'dir'

fit () obtuvo un argumento de palabra clave inesperado 'criterio'

Flask - get () obtuvo un argumento de palabra clave inesperado

__init __ () obtuvo un argumento de palabra clave inesperado 'mimetype'

__init __ () obtuvo un argumento de palabra clave inesperado 'entradas'

__init __ () obtuvo un argumento de palabra clave inesperado 'entradas'

TypeError: softmax () obtuvo un argumento de palabra clave inesperado 'eje'

__init __ () obtuvo un argumento de palabra clave inesperado 'conne'

distplot () obtuvo un argumento de palabra clave inesperado 'figsize'

TypeError: __init __ () obtuvo un argumento de palabra clave inesperado 'attrs'

TOP Lista

  1. 1

    ¿Cómo ocultar la aplicación web de los robots de búsqueda? (ASP.NET)

  2. 2

    OAuth 2.0 utilizando Spring Security + WSO2 Identity Server

  3. 3

    Manera correcta de agregar referencias al proyecto C # de modo que sean compatibles con el control de versiones

  4. 4

    Ver todos los comentarios en un video de YouTube

  5. 5

    uitableview delete button image in iOS

  6. 6

    ¿Título del selector de SwiftUI?

  7. 7

    Swift / Firebase : Facebook 사용자가 계정을 만들 때 Firebase 데이터베이스에 제대로 저장하려면 어떻게해야합니까?

  8. 8

    ¿Es posible reemplazar los valores de un archivo config.properties a través de TFS?

  9. 9

    Representación de mapas 3D en juegos

  10. 10

    Golang ListenAndServeTLS devuelve datos cuando no se usa https en el navegador

  11. 11

    Declarar propiedades reactivas (agregar bloques de componentes dinámicamente desde la inserción de matriz)

  12. 12

    Cómo hacer que SwiftUI Text multilineTextAlignment comience desde arriba y centro

  13. 13

    Cómo depurar una aplicación React en Visual Studio 2019 usando la plantilla "Blank Node.js"

  14. 14

    Múltiples relaciones en la misma tabla con dos columnas Laravel

  15. 15

    No existe tal archivo o directorio cuando se inicia el nombre del directorio con /

  16. 16

    Verilog : 입력 신호를 한 클럭 주기로 지연시키는 방법은 무엇입니까?

  17. 17

    ¿Cómo hacer un generador de ruido Perlin más suave?

  18. 18

    Problème avec le dessin d'un élément Qml avec des appels OpenGL bruts

  19. 19

    El nombre 'HttpContext' no existe en el contexto actual en Razor

  20. 20

    WPF pleine largeur DataGridColumn sur la largeur de DataGrid

  21. 21

    Tengo algunos problemas con el syscall de golang cuando llamo a dll en win7-64

CalienteEtiquetas

Archivo