Problème
Existe-t-il un moyen dans Airflow de créer un flux de travail tel que le nombre de tâches B. * soit inconnu jusqu'à la fin de la tâche A? J'ai regardé les subdags mais il semble que cela ne puisse fonctionner qu'avec un ensemble statique de tâches qui doivent être déterminées lors de la création de Dag.
Les déclencheurs de dag fonctionneraient-ils? Et si oui, pourriez-vous donner un exemple.
J'ai un problème où il est impossible de connaître le nombre de tâches B qui seront nécessaires pour calculer la tâche C jusqu'à ce que la tâche A soit terminée. Chaque tâche B. * prendra plusieurs heures à calculer et ne peut pas être combinée.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Idée n ° 1
Je n'aime pas cette solution car je dois créer un ExternalTaskSensor bloquant et toute la tâche B. * prendra entre 2 et 24 heures. Je ne considère donc pas cela comme une solution viable. Il existe sûrement un moyen plus simple? Ou Airflow n'a-t-il pas été conçu pour cela?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Modifier 1:
Pour le moment, cette question n'a toujours pas de bonne réponse . J'ai été contacté par plusieurs personnes à la recherche d'une solution.
Réponses:
Voici comment je l'ai fait avec une demande similaire sans aucun sous-marqueur:
Commencez par créer une méthode qui renvoie les valeurs souhaitées
Ensuite, créez la méthode qui générera les travaux de manière dynamique:
Et puis combinez-les:
la source
for i in values_function()
j'attendrais quelque chose commefor i in push_func_output
. Le problème est que je ne trouve pas de moyen d'obtenir cette sortie de manière dynamique. La sortie de PythonOperator sera dans le Xcom après l'exécution mais je ne sais pas si je peux le référencer à partir de la définition du DAG.group
fonction?J'ai trouvé un moyen de créer des flux de travail basés sur le résultat des tâches précédentes.
Fondamentalement, ce que vous voulez faire est d'avoir deux sous-balises avec les éléments suivants:
def return_list()
)parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
), on pourrait probablement ajouter plus de filtres ici.dag_id='%s.%s' % (parent_dag_name, 'test1')
Maintenant, j'ai testé cela dans mon installation locale de flux d'air et cela fonctionne très bien. Je ne sais pas si la partie pull xcom aura des problèmes s'il y a plus d'une instance du dag en cours d'exécution en même temps, mais alors vous utiliseriez probablement une clé unique ou quelque chose comme ça pour identifier de manière unique le xcom valeur que vous voulez. On pourrait probablement optimiser l'étape 3. pour être sûr à 100% d'obtenir une tâche spécifique du dag principal actuel, mais pour mon utilisation, cela fonctionne assez bien, je pense qu'il suffit d'un seul objet task_instance pour utiliser xcom_pull.
De plus, je nettoie les xcom pour le premier sous-dag avant chaque exécution, juste pour m'assurer que je n'obtiens pas accidentellement une valeur erronée.
Je suis assez mauvais pour expliquer, donc j'espère que le code suivant clarifiera tout:
test1.py
test2.py
et le flux de travail principal:
test.py
la source
_ _init_ _.py
au dossier subdags. rookie errorOui, c'est possible, j'ai créé un exemple de DAG qui le démontre.
Avant d'exécuter le DAG, créez ces trois variables de flux d'air
Vous verrez que le DAG part de ça
À ceci après qu'il ait couru
Vous pouvez voir plus d'informations sur ce DAG dans mon article sur la création de flux de travail dynamiques sur Airflow .
la source
OA: "Est-il possible dans Airflow de créer un flux de travail tel que le nombre de tâches B. * soit inconnu jusqu'à la fin de la tâche A?"
La réponse courte est non. Airflow créera le flux DAG avant de commencer à l'exécuter.
Cela dit, nous sommes arrivés à une conclusion simple, à savoir que nous n'avons pas un tel besoin. Lorsque vous souhaitez paralléliser certains travaux, vous devez évaluer les ressources dont vous disposez et non le nombre d'éléments à traiter.
Nous l'avons fait comme ceci: nous générons dynamiquement un nombre fixe de tâches, disons 10, qui diviseront le travail. Par exemple, si nous devons traiter 100 fichiers, chaque tâche en traitera 10. Je publierai le code plus tard aujourd'hui.
Mettre à jour
Voici le code, désolé pour le retard.
Explication du code:
Ici, nous avons une seule tâche de début et une seule tâche de fin (toutes deux factices).
Ensuite, à partir de la tâche de démarrage avec la boucle for, nous créons 10 tâches avec le même appel python. Les tâches sont créées dans la fonction create_dynamic_task.
À chaque appelable en python, nous passons en arguments le nombre total de tâches parallèles et l'index de la tâche en cours.
Supposons que vous ayez 1000 éléments à élaborer: la première tâche recevra en entrée qu'elle doit élaborer le premier morceau sur 10 morceaux. Il divisera les 1000 éléments en 10 morceaux et élaborera le premier.
la source
parallelTask
n'est pas définie: est-ce que je manque quelque chose?Ce que je pense que vous cherchez, c'est de créer DAG dynamiquement J'ai rencontré ce type de situation il y a quelques jours après quelques recherches, j'ai trouvé ce blog .
Génération de tâches dynamiques
Définition du flux de travail DAG
Voici à quoi ressemble notre DAG après avoir assemblé le code
C'était très utile, j'espère que cela aidera aussi quelqu'un d'autre
la source
Je pense avoir trouvé une meilleure solution à cela sur https://github.com/mastak/airflow_multi_dagrun , qui utilise une simple mise en file d'attente de DagRuns en déclenchant plusieurs dagruns, similaires à TriggerDagRuns . La plupart des crédits vont à https://github.com/mastak , même si j'ai dû corriger certains détails pour que cela fonctionne avec le flux d'air le plus récent.
La solution utilise un opérateur personnalisé qui déclenche plusieurs DagRuns :
Vous pouvez ensuite soumettre plusieurs dagruns à partir de la fonction appelable dans votre PythonOperator, par exemple:
J'ai créé un fork avec le code sur https://github.com/flinz/airflow_multi_dagrun
la source
Le graphique des travaux n'est pas généré au moment de l'exécution. Le graphique est plutôt créé lorsqu'il est récupéré par Airflow à partir de votre dossier dags. Par conséquent, il ne sera pas vraiment possible d'avoir un graphique différent pour le travail à chaque fois qu'il s'exécute. Vous pouvez configurer un travail pour créer un graphique basé sur une requête au moment du chargement . Ce graphique restera le même pour chaque exécution après cela, ce qui n'est probablement pas très utile.
Vous pouvez concevoir un graphique qui exécute différentes tâches à chaque exécution en fonction des résultats de la requête à l'aide d'un opérateur de branche.
Ce que j'ai fait, c'est de préconfigurer un ensemble de tâches, puis de prendre les résultats de la requête et de les répartir entre les tâches. C'est probablement mieux de toute façon parce que si votre requête renvoie beaucoup de résultats, vous ne voudrez probablement pas inonder le planificateur avec beaucoup de tâches simultanées de toute façon. Pour être encore plus sûr, j'ai également utilisé un pool pour m'assurer que ma concurrence ne devienne pas incontrôlable avec une requête d'une taille inattendue.
la source
for tasks in tasks
boucle de mon exemple, je supprime l'objet sur lequel je suis en train d'itérer. C'est une mauvaise idée. Au lieu de cela, obtenez une liste des clés et répétez-y - ou ignorez les suppressions. De même, si xcom_pull renvoie None (au lieu d'une liste ou d'une liste vide), la boucle for échoue également. On peut vouloir exécuter xcom_pull avant le 'for', puis vérifier s'il est None - ou s'assurer qu'il y a au moins une liste vide. YMMV. Bonne chance!open_order_task
?Vous ne comprenez pas quel est le problème?
Voici un exemple standard. Maintenant, si dans la fonction subdag remplacez
for i in range(5):
parfor i in range(random.randint(0, 10)):
alors tout fonctionnera. Imaginez maintenant que l'opérateur 'start' place les données dans un fichier, et au lieu d'une valeur aléatoire, la fonction lira ces données. Ensuite, l'opérateur «start» affectera le nombre de tâches.Le problème ne sera que dans l'affichage dans l'interface utilisateur car lors de la saisie du sous-dag, le nombre de tâches sera égal à la dernière lecture du fichier / base de données / XCom pour le moment. Ce qui donne automatiquement une restriction sur plusieurs lancements d'un dag à la fois.
la source
J'ai trouvé ce post Medium qui est très similaire à cette question. Cependant, il est plein de fautes de frappe et ne fonctionne pas lorsque j'ai essayé de l'implémenter.
Ma réponse à ce qui précède est la suivante:
Si vous créez des tâches de manière dynamique, vous devez le faire en itérant sur quelque chose qui n'est pas créé par une tâche en amont ou qui peut être défini indépendamment de cette tâche. J'ai appris que vous ne pouvez pas passer des dates d'exécution ou d'autres variables de flux d'air à quelque chose en dehors d'un modèle (par exemple, une tâche) comme beaucoup d'autres l'ont souligné auparavant. Voir aussi cet article .
la source