Sé que ya hay una función de flujo de aire que pasa el archivo de Cloud Storage a Big Query, como hice, hice la conexión dentro del script con el GCP de la misma manera que lo haría sin el flujo de aire, llamé al PythonOperator para llamar a la función que Configuré en la secuencia de comandos para leer Cloud Storage e insertar los datos del archivo en Big Query, sin embargo, aparece el mensaje de error: "obtuve un argumento de palabra clave inesperado 'dag'"
Parece ser una cosa bastante simple de resolver, pero realmente no sé lo que eso significa ya que puse los atributos DAG dentro del PythonOperator
import json
import decimal
import airflow
from airflow import DAG
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks.mssql_hook import MsSqlHook
from tempfile import NamedTemporaryFile
import pymssql
import logging
import os
# import cloudstorage as gcs
from google.cloud import bigquery
from oauth2client.client import GoogleCredentials
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'depends_on_past': False,
# If a task fails, retry it once after waiting
# at least 5 minutes
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='test_tab1',
default_args=default_args,
schedule_interval=timedelta(days=1),
dagrun_timeout=timedelta(minutes=60)
)
try:
script_path = os.path.dirname(os.path.abspath(__file__)) + "/"
except:
script_path = "/usr/local/airflow/key/key.json"
#Bigquery Credentials and settings
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path
def insert_bigquery(self):
bigquery_client = bigquery.Client(project="project-name")
dataset_ref = bigquery_client.dataset('bucket-name')
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
time_partitioning = bigquery.table.TimePartitioning()
job_config.time_partitioning = time_partitioning
job_config.clustering_fields = ["id"]
#job_config.field_delimiter = ";"
uri = "gs://bucket-name/"+filename
load_job = bigquery_client.load_table_from_uri(
uri,
dataset_ref.table('tab1'),
job_config=job_config
)
print('Starting job {}'.format(load_job.job_id))
load_job.result()
print('Job finished.')
json_gcs_to_bq = PythonOperator(
task_id='json_gcs_to_bq',
python_callable=insert_bigquery,
provide_context=True,
dag=dag)
Mensaje de error:
[2019-06-21 15:45:40,732] {{models.py:1760}} ERROR - insert_bigquery() got an unexpected keyword argument 'dag'
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
TypeError: insert_bigquery() got an unexpected keyword argument 'dag'
No es necesario analizar en sí mismo en yor python_callable. Modifique el parámetro de la función insert_bigquery como def insert_bigquery (ds, ** kwargs) en lugar de def insert_bigquery (self) .
Referencia: https://airflow.apache.org/howto/operator/python.html
Este artículo se recopila de Internet, indique la fuente cuando se vuelva a imprimir.
En caso de infracción, por favor [email protected] Eliminar
Déjame decir algunas palabras