Calculer la vitesse moyenne des routes [fermé]

20

Je suis allé à un entretien d'embauche d'ingénieur de données. L'enquêteur m'a posé une question. Il m'a donné une situation et m'a demandé de concevoir le flux de données pour ce système. J'ai résolu cela mais il n'a pas aimé ma solution et j'ai échoué. J'aimerais savoir si vous avez de meilleures idées pour résoudre ce défi.

La question était:

Notre système reçoit quatre flux de données. Les données contiennent un identifiant de véhicule, une vitesse et des coordonnées de géolocalisation. Chaque véhicule envoie ses données une fois par minute. Il n'y a aucun lien entre un flux spécifique et une route ou un véhicule spécifique ou toute autre chose. Il existe une fonction qui accepte les coordinations et renvoie un nom de section de route. Nous devons connaître la vitesse moyenne par tronçon de route toutes les 5 minutes. Enfin, nous voulons écrire les résultats à Kafka.

entrez la description de l'image ici

Ma solution était donc:

Tout d'abord, écrivez toutes les données dans un cluster Kafka, en un seul sujet, partitionné par les 5-6 premiers chiffres de la latitude concaténée aux 5-6 premiers chiffres de la longitude. Ensuite, lisez les données par Streamed Structuré, ajoutez pour chaque ligne le nom du tronçon de route par les coordinations (il y a un udf prédéfini pour cela), puis collez les données par nom de tronçon de route.

Parce que je partitionne les données dans Kafka par les 5-6 premiers chiffres des coordinations, après avoir traduit les coordinations en nom de section, il n'est pas nécessaire de transférer beaucoup de données vers la partition correcte et donc je peux profiter de l'opération colesce () cela ne déclenche pas un shuffle complet.

Calculer ensuite la vitesse moyenne par exécuteur.

L'ensemble du processus se déroulera toutes les 5 minutes et nous écrirons les données en mode ajout dans le récepteur Kafka final.

entrez la description de l'image ici

Encore une fois, l'intervieweur n'a pas aimé ma solution. Quelqu'un pourrait-il suggérer comment l'améliorer ou une idée complètement différente et meilleure?

Alon
la source
Ne serait-il pas préférable de demander à la personne ce qu'elle n'a pas aimé exactement?
Gino Pane
Je pense que c'est une mauvaise idée de partitionner par le lat-long concaténé. Le point de données ne sera-t-il pas signalé pour chaque voie comme une coordonnée légèrement différente?
webber
@webber donc je ne prends que quelques chiffres, donc la position ne sera pas unique mais relativement de la taille d'une section de route.
Alon

Réponses:

6

J'ai trouvé cette question très intéressante et j'ai pensé à essayer.

Comme je l'ai évalué plus avant, votre tentative elle-même est bonne, à l'exception des suivantes:

partitionnée par les 5-6 premiers chiffres de la latitude concaténée aux 5-6 premiers chiffres de la longitude

Si vous avez déjà une méthode pour obtenir l'id / le nom de la section de route en fonction de la latitude et de la longitude, pourquoi ne pas appeler cette méthode en premier et utiliser l'id / nom de la section de route pour partitionner les données en premier lieu?

Et après cela, tout est assez facile, donc la topologie sera

Merge all four streams ->
Select key as the road section id/name ->
Group the stream by Key -> 
Use time windowed aggregation for the given time ->
Materialize it to a store. 

(Des explications plus détaillées peuvent être trouvées dans les commentaires dans le code ci-dessous. Veuillez demander si quelque chose n'est pas clair)

J'ai ajouté le code à la fin de cette réponse, veuillez noter qu'au lieu de la moyenne, j'ai utilisé la somme car c'est plus facile à démontrer. Il est possible de faire la moyenne en stockant des données supplémentaires.

J'ai détaillé la réponse dans les commentaires. Voici un diagramme de topologie généré à partir du code (grâce à https://zz85.github.io/kafka-streams-viz/ )

Topologie:

Diagramme de topologie

    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;

    public class VehicleStream {
        // 5 minutes aggregation window
        private static final long AGGREGATION_WINDOW = 5 * 50 * 1000L;

        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();

            // Setting configs, change accordingly
            properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "vehicle.stream.app");
            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,kafka2:19092");
            properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            // initializing  a streambuilder for building topology.
            final StreamsBuilder builder = new StreamsBuilder();

            // Our initial 4 streams.
            List<String> streamInputTopics = Arrays.asList(
                    "vehicle.stream1", "vehicle.stream2",
                    "vehicle.stream3", "vehicle.stream4"
            );
            /*
             * Since there is no connection between a specific stream
             * to a specific road or vehicle or anything else,
             * we can take all four streams as a single stream
             */
            KStream<String, String> source = builder.stream(streamInputTopics);

            /*
             * The initial key is unimportant (which can be ignored),
             * Instead, we will be using the section name/id as key.
             * Data will contain comma separated values in following format.
             * VehicleId,Speed,Latitude,Longitude
             */
            WindowBytesStoreSupplier windowSpeedStore = Stores.persistentWindowStore(
                    "windowSpeedStore",
                    AGGREGATION_WINDOW,
                    2, 10, true
            );
            source
                    .peek((k, v) -> printValues("Initial", k, v))
                    // First, we rekey the stream based on the road section.
                    .selectKey(VehicleStream::selectKeyAsRoadSection)
                    .peek((k, v) -> printValues("After rekey", k, v))
                    .groupByKey()
                    .windowedBy(TimeWindows.of(AGGREGATION_WINDOW))
                    .aggregate(
                            () -> "0.0", // Initialize
                            /*
                             * I'm using summing here for the aggregation as that's easier.
                             * It can be converted to average by storing extra details on number of records, etc..
                             */
                            (k, v, previousSpeed) ->  // Aggregator (summing speed)
                                    String.valueOf(
                                            Double.parseDouble(previousSpeed) +
                                                    VehicleSpeed.getVehicleSpeed(v).speed
                                    ),
                            Materialized.as(windowSpeedStore)
                    );
            // generating the topology
            final Topology topology = builder.build();
            System.out.print(topology.describe());

            // constructing a streams client with the properties and topology
            final KafkaStreams streams = new KafkaStreams(topology, properties);
            final CountDownLatch latch = new CountDownLatch(1);

            // attaching shutdown handler
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }


        private static void printValues(String message, String key, Object value) {
            System.out.printf("===%s=== key: %s value: %s%n", message, key, value.toString());
        }

        private static String selectKeyAsRoadSection(String key, String speedValue) {
            // Would make more sense when it's the section id, rather than a name.
            return coordinateToRoadSection(
                    VehicleSpeed.getVehicleSpeed(speedValue).latitude,
                    VehicleSpeed.getVehicleSpeed(speedValue).longitude
            );
        }

        private static String coordinateToRoadSection(String latitude, String longitude) {
            // Dummy function
            return "Area 51";
        }

        public static class VehicleSpeed {
            public String vehicleId;
            public double speed;
            public String latitude;
            public String longitude;

            public static VehicleSpeed getVehicleSpeed(String data) {
                return new VehicleSpeed(data);
            }

            public VehicleSpeed(String data) {
                String[] dataArray = data.split(",");
                this.vehicleId = dataArray[0];
                this.speed = Double.parseDouble(dataArray[1]);
                this.latitude = dataArray[2];
                this.longitude = dataArray[3];
            }

            @Override
            public String toString() {
                return String.format("veh: %s, speed: %f, latlong : %s,%s", vehicleId, speed, latitude, longitude);
            }
        }
    }
Irshad PI
la source
La fusion de tous les flux n'est-elle pas une mauvaise idée? Cela peut devenir un goulot d'étranglement pour votre flux de données. Que se passe-t-il lorsque vous commencez à recevoir de plus en plus de flux d'entrée à mesure que votre système se développe? Sera-ce évolutif?
wypul
@wypul> la fusion de tous les flux n'est-elle pas une mauvaise idée? -> Je pense que non. Le parallélisme dans Kafka ne se fait pas par le biais de flux, mais par le biais de partitions (et de tâches), de threads, etc. Les flux sont un moyen de regrouper les données. > Cela sera-t-il évolutif? -> oui. Étant donné que nous saisissons par sections de route et en supposant que les sections de route sont assez réparties, nous pouvons augmenter le nombre de partitions pour ces rubriques afin de traiter en parallèle le flux dans différents conteneurs. Nous pouvons utiliser un bon algorithme de partitionnement basé sur le tronçon de route pour répartir la charge entre les répliques.
Irshad PI
1

Le problème en tant que tel semble simple et les solutions proposées ont déjà beaucoup de sens. Je me demande si l'intervieweur était préoccupé par la conception et les performances de la solution sur laquelle vous vous êtes concentré ou par l'exactitude du résultat. Étant donné que d'autres se sont concentrés sur le code, la conception et les performances, je vais peser sur la précision.

Solution de streaming

Au fur et à mesure que les données circulent, nous pouvons fournir une estimation approximative de la vitesse moyenne d'une route. Cette estimation sera utile pour détecter la congestion mais sera désactivée pour déterminer la limite de vitesse.

  1. Combinez les 4 flux de données ensemble.
  2. Créez une fenêtre de 5 minutes pour capturer les données des 4 flux en 5 minutes.
  3. Appliquez l'UDF sur les coordonnées pour obtenir le nom de la rue et le nom de la ville. Les noms de rue sont souvent dupliqués dans les villes, nous utiliserons donc le nom de la ville + le nom de la rue comme clé.
  4. Calculez la vitesse moyenne avec une syntaxe comme -

    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

5. write the result to the Kafka Topic

Solution par lots

Cette estimation sera désactivée car la taille de l'échantillon est petite. Nous aurons besoin d'un traitement par lots sur des données de mois / trimestre / année entières pour déterminer plus précisément la limite de vitesse.

  1. Lire les données d'une année de Data Lake (ou Kafka Topic)

  2. Appliquez l'UDF sur les coordonnées pour obtenir le nom de la rue et le nom de la ville.

  3. Calculez la vitesse moyenne avec une syntaxe comme -


    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

  1. écrire le résultat dans le lac de données.

Sur la base de cette limite de vitesse plus précise, nous pouvons prédire un trafic lent dans l'application de streaming.

Salim
la source
1

Je vois quelques problèmes avec votre stratégie de partitionnement:

  • Lorsque vous dites que vous allez partitionner vos données en fonction des 5 à 6 premiers chiffres de lat, vous ne pourrez pas déterminer le nombre de partitions kafka à l'avance. Vous aurez des données asymétriques car pour certaines sections de route vous observerez un volume élevé que d'autres.

  • De plus, votre combinaison de touches ne garantit pas les mêmes données de tronçon de route dans la même partition et vous ne pouvez donc pas être sûr qu'il n'y aura pas de mélange.

Les informations fournies par l'OMI ne sont pas suffisantes pour concevoir l'ensemble du pipeline de données. Parce que lors de la conception du pipeline, la façon dont vous partitionnez vos données joue un rôle important. Vous devriez vous renseigner davantage sur les données que vous recevez comme le nombre de véhicules, la taille des flux de données d'entrée, le nombre de flux est-il fixe ou peut-il augmenter à l'avenir? Les flux de données d'entrée que vous recevez sont-ils des flux kafka? Combien de données vous recevez en 5 minutes?

  • Supposons maintenant que vous avez 4 flux écrits dans 4 sujets dans kafka ou 4 partitions et que vous n'avez pas de clé spécifique, mais vos données sont partitionnées en fonction d'une clé de centre de données ou elles sont partitionnées par hachage. Sinon, cela devrait être fait du côté des données plutôt que de dédupliquer les données dans un autre flux kafka et de partitionner.
  • Si vous recevez les données sur différents centres de données, vous devez apporter les données à un cluster et à cette fin, vous pouvez utiliser Kafka mirror maker ou quelque chose de similaire.
  • Une fois que vous avez toutes les données sur un cluster, vous pouvez y exécuter un travail de streaming structuré et avec un intervalle de déclenchement de 5 minutes et un filigrane en fonction de vos besoins.
  • Pour calculer la moyenne et éviter beaucoup de brassage, vous pouvez utiliser une combinaison de mapValueset reduceByKeyau lieu de groupBy. Référez ceci .
  • Vous pouvez écrire les données dans Kafka Sink après le traitement.
wypul
la source
mapValues ​​et ReduceByKey appartiennent au RDD de bas niveau. Catalyst n'est-il pas assez intelligent pour générer le RDD le plus efficace lorsque je fais des regroupements et que je calcule la moyenne?
Alon
@Alon Catalyst sera sûrement en mesure de déterminer le meilleur plan pour exécuter votre requête, mais si vous utilisez groupBy, les données avec la même clé seront d'abord mélangées sur la même partition, puis appliquer une opération d'agrégation sur cela. mapValueset reduceByappartient en effet à un RDD de bas niveau, mais il fonctionnera toujours mieux dans cette situation car il calculera d'abord l'agrégat par partition, puis effectuera le brassage.
wypul
0

Les principaux problèmes que je vois avec cette solution sont:

  • Les sections de route qui sont sur le bord des carrés à 6 chiffres de la carte auront des données dans plusieurs partitions de sujet et auront plusieurs vitesses moyennes.
  • La taille des données d'ingestion pour vos partitions Kafka peut être déséquilibrée (ville vs désert). Le partitionnement par les premiers chiffres de l'ID de voiture pourrait être une bonne idée OMI.
  • Je ne suis pas sûr d'avoir suivi la partie fusion, mais cela semble problématique.

Je dirais que la solution doit faire: lire à partir du flux Kafka -> UDF -> tronçon de route groupby -> moyen -> écrire dans le flux Kafka.

David Taub
la source
0

Ma conception dépendrait de

  1. Nombre de routes
  2. Nombre de véhicules
  3. Coût de calcul de la route à partir des coordonnées

Si je veux évoluer pour un certain nombre de comptes, la conception ressemblerait à ceci entrez la description de l'image ici

Croiser les inquiétudes sur cette conception -

  1. Maintenir un état durable des flux d'entrée (si l'entrée est kafka, nous pouvons stocker des décalages avec Kafka ou en externe)
  2. Périodiquement les états des points de contrôle vers le système externe (je préfère utiliser des barrières de point de contrôle asynchrones dans Flink )

Quelques améliorations pratiques possibles sur cette conception -

  1. Fonction de cartographie de la section de mise en cache de route si possible, basée sur les routes
  2. Gestion des pings manqués (en pratique, tous les ping ne sont pas disponibles)
  3. Prise en compte de la courbure de la route (prise en compte et altitude)
yugandhar
la source