Spark, divisant de manière optimale un seul RDD en deux

10

J'ai un grand ensemble de données que je dois diviser en groupes selon des paramètres spécifiques. Je veux que le travail soit traité aussi efficacement que possible. Je peux imaginer deux façons de le faire

Option 1 - Créer une carte à partir du RDD et du filtre d'origine

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Option 2 - Filtrez directement le RDD d'origine

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

La première méthode doit répéter 3 fois tous les enregistrements de l'ensemble de données d'origine, alors que la seconde ne doit le faire que deux fois, dans des circonstances normales, cependant, spark fait un peu de construction de graphique en arrière-plan, donc je peux imaginer qu'ils sont efficacement de la même manière. Mes questions sont: a.) Est-ce qu'une méthode est plus efficace que l'autre, ou est-ce que la construction du graphique à étincelles les rend équivalentes b.) Est-il possible de faire cette division en une seule passe

jagartner
la source
Je me suis également retrouvé avec un problème très similaire et je n'ai pas vraiment trouvé de solution. Mais ce qui se passe réellement n'est pas clair à partir de ce code, car spark a une `` évaluation paresseuse '' et est censé être capable d'exécuter uniquement ce dont il a vraiment besoin pour exécuter, et également de combiner des cartes, des filtres et tout ce qui peut être fait ensemble. Donc, ce que vous décrivez peut se produire en une seule fois. Pas assez familier avec les mécanismes d'évaluation paresseux pour le dire, cependant. En fait, je viens de remarquer le .cache (). Peut-être qu'il existe un moyen de ne faire qu'un seul .cache () et d'obtenir les résultats complets?
user3780968

Réponses:

9

Tout d'abord, laissez-moi vous dire que je ne suis pas un expert Spark; Je l'utilise beaucoup depuis quelques mois et je crois que je le comprends maintenant, mais je me trompe peut-être.

Donc, répondant à vos questions:

a.) ils sont équivalents, mais pas de la façon dont vous le voyez; Spark n'optimisera pas le graphique si vous vous posez la question, mais le customMappersera toujours exécuté deux fois dans les deux cas; cela est dû au fait que pour spark, rdd1et rdd2sont deux RDD complètement différents, et il construira le graphique de transformation de bas en haut à partir des feuilles; donc l'option 1 se traduira par:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Comme vous l'avez dit, customMapperest exécuté deux fois (de plus, il rddInsera également lu deux fois, ce qui signifie que s'il provient d'une base de données, il peut être encore plus lent).

b.) il y a un moyen, il suffit de se déplacer cache()au bon endroit:

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

En faisant cela, nous disons à spark qu'il peut stocker les résultats partiels de mappedRdd; il utilisera ensuite ces résultats partiels pour rdd1et rdd2. Du point de vue de l'étincelle, cela équivaut à:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
StefanoP
la source