J'ai un scénario comme ci-dessous:
- Déclenchez a
Task 1
etTask 2
uniquement lorsque de nouvelles données sont disponibles pour eux dans la table source (Athena). Le déclenchement de Task1 et Task2 devrait se produire lors d'une nouvelle partition de données dans une journée. - Déclencher
Task 3
uniquement à la finTask 1
etTask 2
- Déclencher
Task 4
uniquement la fin deTask 3
Mon code
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task1_partition_exists',
database_name='DB',
table_name='Table1',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task2_partition_exists',
database_name='DB',
table_name='Table2',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task1.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task2 = PostgresOperator(
task_id='Task2',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task2.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task3 = PostgresOperator(
task_id='Task3',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task3.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task4 = PostgresOperator(
task_id='Task4',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task4",
params={'limit': '50'},
dag=dag
)
execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)
execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)
execute_Task4.set_upstream(execute_Task3)
Quelle est la meilleure façon optimale d'y parvenir?
Task1
etTask2
va en boucle. Pour moi, les données sont chargées dans la table source Athena à 10 h 00 CET.Réponses:
Je crois que votre question aborde deux problèmes majeurs:
schedule_interval
de manière explicite afin que @daily configure quelque chose que vous n'attendez pas.la réponse courte: définissez explicitement votre planning_interval avec un format de tâche cron et utilisez des opérateurs de capteur pour vérifier de temps en temps
où
startime
est l'heure à laquelle votre tâche quotidienne commencera,endtime
quelle est la dernière heure de la journée, vous devez vérifier si un événement a été effectué avant de signaler l'échec etpoke_time
l'intervallesensor_operator
auquel vous vérifierez si l'événement s'est produit.Comment traiter explicitement le travail cron chaque fois que vous définissez votre dag
@daily
comme vous l'avez fait:à partir des documents , vous pouvez voir que vous êtes en train de faire:
@daily - Run once a day at midnight
Ce qui explique maintenant pourquoi vous obtenez une erreur de dépassement de délai et échoue après 5 minutes car vous avez défini
'retries': 1
et'retry_delay': timedelta(minutes=5)
. Donc, il essaie d'exécuter le poignard à minuit, il échoue. réessaye 5 minutes après et échoue à nouveau, il est donc signalé comme ayant échoué.Donc, fondamentalement, @daily run définit un travail cron implicite de:
Le format du travail cron est du format ci-dessous et vous définissez la valeur à
*
chaque fois que vous voulez dire "tous".Minute Hour Day_of_Month Month Day_of_Week
Donc, @daily dit simplement de l'exécuter toutes les: minute 0 heure 0 de tous les jours_de_mois de tous les mois de tous les jours_de_week
Votre requête est donc exécutée toutes les: minute 0 heure 10 de tous les jours_de_mois de tous_les mois de tous les jours_de_week. Cela se traduit au format cron job en:
Comment déclencher et réessayer correctement l'exécution du dag lorsque vous dépendez d'un événement externe pour terminer l'exécution
vous pouvez déclencher un dag dans le flux d'air à partir d'un événement externe en utilisant la commande
airflow trigger_dag
. cela serait possible si certains comment vous pourriez déclencher une fonction lambda / script python pour cibler votre instance de flux d'air.Si vous ne pouvez pas déclencher le dag en externe, utilisez un opérateur de capteur comme OP l'a fait, définissez-y un poke_time et définissez un nombre raisonnablement élevé de nouvelles tentatives.
la source
airflow trigger_dag
. cela serait possible si certains comment vous pourriez déclencher une fonction lambda / script python pour cibler votre instance de flux d'air.AwsGlueCatalogPartitionSensor
avec la commande airflow{{ds_nodash}}
pour les sorties de partition. Ma question alors comment planifier cela.