J'essaie de déplacer des fichiers s3 d'un compartiment "sans suppression" (ce qui signifie que je ne peux pas supprimer les fichiers) vers GCS en utilisant airflow. Je ne peux pas garantir que de nouveaux fichiers seront présents tous les jours, mais je dois vérifier chaque jour de nouveaux fichiers.
mon problème est la création dynamique de sous-balises. S'il y a des fichiers, j'ai besoin de sous-dags. S'il n'y a PAS de fichiers, je n'ai pas besoin de sous-dags. Mon problème est les paramètres en amont / en aval. Dans mon code, il détecte les fichiers, mais ne déclenche pas les sous-balises comme ils sont censés le faire. Il me manque quelque chose.
voici mon code:
from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_success': True,
}
bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []
parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)
def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'
def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)
# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)
# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)
return subdag
def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags
check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)
finished = DummyOperator(
task_id='finished',
dag=parent_dag
)
decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)
if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished
check_for_files >> finished
python
airflow
directed-acyclic-graphs
arcee123
la source
la source
spark
travaux ou unpython
script et qu'est-ce que vous utilisez pour l'exécuter commelivy
ou une autre méthodefiles
une liste vide?Réponses:
Vous trouverez ci-dessous la méthode recommandée pour créer un DAG ou un sous-DAG dynamique dans le flux d'air, bien qu'il existe également d'autres moyens, mais je suppose que cela serait largement applicable à votre problème.
Créez d'abord un fichier
(yaml/csv)
qui comprend la liste de tous less3
fichiers et emplacements, dans votre cas, vous avez écrit une fonction pour les stocker dans la liste, je dirais les stocker dans unyaml
fichier séparé et le charger au moment de l'exécution dans airflow env puis créer DAG.Voici un exemple
yaml
fichier:dynamicDagConfigFile.yaml
Vous pouvez modifier votre
Check_For_Files
fonction pour les stocker dans unyaml
fichier.Maintenant, nous pouvons passer à la création dynamique de dag:
Définissez d'abord deux tâches à l'aide d'opérateurs factices, à savoir la tâche de début et la tâche de fin. De telles tâches sont celles dans lesquelles nous allons construire sur notre
DAG
en créant dynamiquement des tâches entre elles:Dynamic DAG: Nous utiliserons
PythonOperators
dans le flux d'air. La fonction doit recevoir comme arguments l'id de la tâche; une fonction python à exécuter, c'est-à-dire le python_callable pour l'opérateur Python; et un ensemble d'arguments à utiliser pendant l'exécution.Incluez un argument le
task id
. Ainsi, nous pouvons échanger des données entre les tâches générées de manière dynamique, par exemple viaXCOM
.Vous pouvez spécifier votre fonction d'opération dans ce dag dynamique comme
s3_to_gcs_op
.Enfin, en fonction de l'emplacement présent dans le fichier yaml, vous pouvez créer des dags dynamiques, d'abord lire le
yaml
fichier comme ci-dessous et créer un dag dynamique:Définition finale du DAG:
L'idée est que
Code de flux d'air complet dans l'ordre:
la source
upload_s3_toGCS
ne seront pas garantis, ce qui signifie que la section n'existera pas et qu'il y aura une erreur dans le flux d'air.yaml
fichier une fois que tous ces fichiers sont téléchargés sur GCS, de cette façon, seuls les nouveaux fichiers seront présents dans leyaml
fichier. Et s'il n'y a pas de nouveaux fichiers, leyaml
fichier sera vide et aucun fichier dynamique ne sera créé. C'est pourquoi leyaml
fichier est une bien meilleure option que de stocker des fichiers dans une liste.yaml
fichier aidera également à maintenir la journalisation des fichiers s3 d'une manière, si supposons qu'une partie du fichier s3 ne soit pas téléchargée dans GCS, vous pouvez également conserver un indicateur correspondant à ce fichier, puis réessayer lors de la prochaine exécution du DAG.if
condition avant le DAG qui vérifiera les nouveaux fichiers dans lesyaml
fichiers s'il y a de nouveaux fichiers, exécutez-le sinon sautez-le.