essayer de créer des sous-dags dynamiques à partir du dag parent basé sur un tableau de noms de fichiers

10

J'essaie de déplacer des fichiers s3 d'un compartiment "sans suppression" (ce qui signifie que je ne peux pas supprimer les fichiers) vers GCS en utilisant airflow. Je ne peux pas garantir que de nouveaux fichiers seront présents tous les jours, mais je dois vérifier chaque jour de nouveaux fichiers.

mon problème est la création dynamique de sous-balises. S'il y a des fichiers, j'ai besoin de sous-dags. S'il n'y a PAS de fichiers, je n'ai pas besoin de sous-dags. Mon problème est les paramètres en amont / en aval. Dans mon code, il détecte les fichiers, mais ne déclenche pas les sous-balises comme ils sont censés le faire. Il me manque quelque chose.

voici mon code:

from airflow import models
from  airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging

args = {
    'owner': 'Airflow',
    'start_date': dates.days_ago(1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_success': True,
}

bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []

parent_dag = models.DAG(
    dag_id='My_Ingestion',
    default_args=args,
    schedule_interval='@daily',
    catchup=False
)

def Check_For_Files(**kwargs):
    s3 = S3Hook(aws_conn_id='S3_BOX')
    s3.get_conn()
    bucket = bucket
    LastBDEXDate = int(Variable.get("last_publish_date"))
    maxdate = LastBDEXDate
    files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
    for file in files:
        print(file)
        print(file.split("_")[-2])
        print(file.split("_")[-2][-8:])  ##proves I can see a date in the file name is ok.
        maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
    if maxdate > LastBDEXDate:
        return 'Start_Process'
    return 'finished'

def create_subdag(dag_parent, dag_id_child_prefix, file_name):
    # dag params
    dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)

    # dag
    subdag = models.DAG(dag_id=dag_id_child,
              default_args=args,
              schedule_interval=None)

    # operators
    s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
        task_id=dag_id_child,
        bucket=bucket,
        prefix=file_name,
        dest_gcs_conn_id='GCP_Account',
        dest_gcs='gs://my_files/To_Process/',
        replace=False,
        gzip=True,
        dag=subdag)


    return subdag

def create_subdag_operator(dag_parent, filename, index):
    tid_subdag = 'file_{}'.format(index)
    subdag = create_subdag(dag_parent, tid_subdag, filename)
    sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
    return sd_op

def create_subdag_operators(dag_parent, file_list):
    subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
    # chain subdag-operators together
    chain(*subdags)
    return subdags

check_for_files = BranchPythonOperator(
    task_id='Check_for_s3_Files',
    provide_context=True,
    python_callable=Check_For_Files,
    dag=parent_dag
)

finished = DummyOperator(
    task_id='finished',
    dag=parent_dag
)

decision_to_continue = DummyOperator(
    task_id='Start_Process',
    dag=parent_dag
)

if len(files) > 0:
    subdag_ops = create_subdag_operators(parent_dag, files)
    check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished


check_for_files >> finished
arcee123
la source
Quel type de travail s'exécute à l'arrière de ces DAGS sont ces sparktravaux ou un pythonscript et qu'est-ce que vous utilisez pour l'exécuter comme livyou une autre méthode
ashwin agrawal
Je suis désolé, je ne comprends pas la question. pouvez-vous s'il vous plaît reformuler?
arcee123
Je veux dire que vous n'utilisez que des scripts python simples et que vous n'utilisez aucun travail spark, n'est-ce pas?
ashwin agrawal
Oui. opérateurs simples qui sont par défaut dans le flux d'air. Je veux ajouter des opérateurs existants à un taux dynamique basé sur des fichiers marqués dans S3 Je veux ingérer dans GCS.
arcee123
Pourquoi filesune liste vide?
Oluwafemi Sule

Réponses:

3

Vous trouverez ci-dessous la méthode recommandée pour créer un DAG ou un sous-DAG dynamique dans le flux d'air, bien qu'il existe également d'autres moyens, mais je suppose que cela serait largement applicable à votre problème.

Créez d'abord un fichier (yaml/csv) qui comprend la liste de tous les s3fichiers et emplacements, dans votre cas, vous avez écrit une fonction pour les stocker dans la liste, je dirais les stocker dans un yamlfichier séparé et le charger au moment de l'exécution dans airflow env puis créer DAG.

Voici un exemple yaml fichier: dynamicDagConfigFile.yaml

job: dynamic-dag
bucket_name: 'bucket-name'
prefix: 'bucket-prefix'
S3Files:
    - File1: 'S3Loc1'
    - File2: 'S3Loc2'
    - File3: 'S3Loc3'

Vous pouvez modifier votre Check_For_Files fonction pour les stocker dans un yamlfichier.

Maintenant, nous pouvons passer à la création dynamique de dag:

Définissez d'abord deux tâches à l'aide d'opérateurs factices, à savoir la tâche de début et la tâche de fin. De telles tâches sont celles dans lesquelles nous allons construire sur notre DAGen créant dynamiquement des tâches entre elles:

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

Dynamic DAG: Nous utiliserons PythonOperatorsdans le flux d'air. La fonction doit recevoir comme arguments l'id de la tâche; une fonction python à exécuter, c'est-à-dire le python_callable pour l'opérateur Python; et un ensemble d'arguments à utiliser pendant l'exécution.

Incluez un argument le task id. Ainsi, nous pouvons échanger des données entre les tâches générées de manière dynamique, par exemple via XCOM.

Vous pouvez spécifier votre fonction d'opération dans ce dag dynamique comme s3_to_gcs_op.

def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Enfin, en fonction de l'emplacement présent dans le fichier yaml, vous pouvez créer des dags dynamiques, d'abord lire le yamlfichier comme ci-dessous et créer un dag dynamique:

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.

Définition finale du DAG:

L'idée est que

#once tasks are generated they should linked with the
#dummy operators generated in the start and end tasks. 
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end

Code de flux d'air complet dans l'ordre:

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)



with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.


start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end
ashwin agrawal
la source
Merci beaucoup. donc l'un des problèmes que j'ai eu était ce qui se passe s'il n'y a pas de nouveaux fichiers? l'un des problèmes auxquels je suis confronté, c'est qu'il y aura toujours des fichiers à cet endroit, mais que de nouveaux fichiers upload_s3_toGCSne seront pas garantis, ce qui signifie que la section n'existera pas et qu'il y aura une erreur dans le flux d'air.
arcee123
Vous pouvez résoudre le problème en supprimant les fichiers du yamlfichier une fois que tous ces fichiers sont téléchargés sur GCS, de cette façon, seuls les nouveaux fichiers seront présents dans le yamlfichier. Et s'il n'y a pas de nouveaux fichiers, le yamlfichier sera vide et aucun fichier dynamique ne sera créé. C'est pourquoi le yamlfichier est une bien meilleure option que de stocker des fichiers dans une liste.
ashwin agrawal
Le yamlfichier aidera également à maintenir la journalisation des fichiers s3 d'une manière, si supposons qu'une partie du fichier s3 ne soit pas téléchargée dans GCS, vous pouvez également conserver un indicateur correspondant à ce fichier, puis réessayer lors de la prochaine exécution du DAG.
ashwin agrawal
Et s'il n'y a pas de nouveaux fichiers, vous pouvez mettre une ifcondition avant le DAG qui vérifiera les nouveaux fichiers dans les yamlfichiers s'il y a de nouveaux fichiers, exécutez-le sinon sautez-le.
ashwin agrawal
le problème ici est que les aval sont définis. si les aval sont définis sans les travaux réels (car aucun fichier n'existe), cela entraînera une erreur.
arcee123