Comment déclencher une tâche Airflow uniquement lorsque de nouvelles partitions / données sont disponibles dans la table AWS athena en utilisant DAG en python?

9

J'ai un scénario comme ci-dessous:

  1. Déclenchez a Task 1et Task 2uniquement lorsque de nouvelles données sont disponibles pour eux dans la table source (Athena). Le déclenchement de Task1 et Task2 devrait se produire lors d'une nouvelle partition de données dans une journée.
  2. Déclencher Task 3uniquement à la fin Task 1etTask 2
  3. Déclencher Task 4uniquement la fin deTask 3

entrez la description de l'image ici

Mon code

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

Quelle est la meilleure façon optimale d'y parvenir?

pankaj
la source
avez-vous des problèmes avec cette solution?
Bernardo stearns reisen
@ Bernardostearnsreisen, Parfois, le Task1et Task2va en boucle. Pour moi, les données sont chargées dans la table source Athena à 10 h 00 CET.
pankaj
faire une boucle que vous voulez dire, airflow réessaye plusieurs fois la tâche 1 et la tâche 2 jusqu'à ce qu'elle réussisse?
Bernardo stearns reisen
@Bernardostearnsreisen, yup exactement
pankaj
1
@Bernardostearnsreisen, je ne savais pas comment attribuer la prime :)
pankaj

Réponses:

1

Je crois que votre question aborde deux problèmes majeurs:

  1. oubliant de configurer le schedule_intervalde manière explicite afin que @daily configure quelque chose que vous n'attendez pas.
  2. Comment déclencher et réessayer correctement l'exécution du dag lorsque vous dépendez d'un événement externe pour terminer l'exécution

la réponse courte: définissez explicitement votre planning_interval avec un format de tâche cron et utilisez des opérateurs de capteur pour vérifier de temps en temps

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

startimeest l'heure à laquelle votre tâche quotidienne commencera, endtimequelle est la dernière heure de la journée, vous devez vérifier si un événement a été effectué avant de signaler l'échec et poke_timel'intervalle sensor_operatorauquel vous vérifierez si l'événement s'est produit.

Comment traiter explicitement le travail cron chaque fois que vous définissez votre dag@dailycomme vous l'avez fait:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

à partir des documents , vous pouvez voir que vous êtes en train de faire: @daily - Run once a day at midnight

Ce qui explique maintenant pourquoi vous obtenez une erreur de dépassement de délai et échoue après 5 minutes car vous avez défini 'retries': 1et 'retry_delay': timedelta(minutes=5). Donc, il essaie d'exécuter le poignard à minuit, il échoue. réessaye 5 minutes après et échoue à nouveau, il est donc signalé comme ayant échoué.

Donc, fondamentalement, @daily run définit un travail cron implicite de:

@daily -> Run once a day at midnight -> 0 0 * * *

Le format du travail cron est du format ci-dessous et vous définissez la valeur à *chaque fois que vous voulez dire "tous".

Minute Hour Day_of_Month Month Day_of_Week

Donc, @daily dit simplement de l'exécuter toutes les: minute 0 heure 0 de tous les jours_de_mois de tous les mois de tous les jours_de_week

Votre requête est donc exécutée toutes les: minute 0 heure 10 de tous les jours_de_mois de tous_les mois de tous les jours_de_week. Cela se traduit au format cron job en:

0 10 * * *

Comment déclencher et réessayer correctement l'exécution du dag lorsque vous dépendez d'un événement externe pour terminer l'exécution

  1. vous pouvez déclencher un dag dans le flux d'air à partir d'un événement externe en utilisant la commande airflow trigger_dag. cela serait possible si certains comment vous pourriez déclencher une fonction lambda / script python pour cibler votre instance de flux d'air.

  2. Si vous ne pouvez pas déclencher le dag en externe, utilisez un opérateur de capteur comme OP l'a fait, définissez-y un poke_time et définissez un nombre raisonnablement élevé de nouvelles tentatives.

Bernardo stearns reisen
la source
Merci pour cela. Aussi, si je veux déclencher les tâches en fonction de l'événement plutôt que du temps, c'est-à-dire dès que la nouvelle partition de données est disponible dans la source, la tâche suivante `AWS Athena Tables` doit être déclenchée. Alors comment planifier. Mon code actuel est-il assez adapté?
pankaj
@pankaj, je ne vois que deux alternatives. Je ne sais pas grand-chose sur aws athena, mais vous pouvez déclencher un dag dans le flux d'air d'un événement externe en utilisant la commande airflow trigger_dag. cela serait possible si certains comment vous pourriez déclencher une fonction lambda / script python pour cibler votre instance de flux d'air.
Bernardo stearns reisen
L'autre alternative est plus ou moins ce que vous faites, car vous n'avez pas de déclencheur basé sur un événement, vous devez vérifier périodiquement si cet événement s'est produit. Donc, l'utilisation de cette solution actuelle serait définie comme une tâche cron pour une plage d'heures de fonctionnement du dag dans une fréquence élevée de minutes ... beaucoup échoueront mais elle sera capable de rattraper assez rapidement après l'événement
Bernardo stearns reisen
@Bernado, j'ai trouvé un package dans Airflow appelé AwsGlueCatalogPartitionSensoravec la commande airflow {{ds_nodash}}pour les sorties de partition. Ma question alors comment planifier cela.
pankaj
@Benado, pouvez-vous regarder mon code où j'ai mis en œuvre la vérification mentionnée ci-dessus et donner vos entrées
pankaj