Comment rendre une construction universelle plus efficace?

16

Une "construction universelle" est une classe wrapper pour un objet séquentiel qui permet de le linéariser (une condition de cohérence forte pour les objets concurrents). Par exemple, voici une construction adaptée sans attente, en Java, de [1], qui suppose l'existence d'une file d'attente sans attente qui satisfait l'interface WFQ(qui ne nécessite qu'un consensus unique entre les threads) et suppose une Sequentialinterface:

public interface WFQ<T> // "FIFO" iteration
{
    int enqueue(T t); // returns the sequence number of t
    Iterable<T> iterateUntil(int max); // iterates until sequence max
}
public interface Sequential
{
    // Apply an invocation (method + arguments)
    // and get a response (return value + state)
    Response apply(Invocation i); 
}
public interface Factory<T> { T generate(); } // generate new default object
public interface Universal extends Sequential {}

public class SlowUniversal implements Universal
{
    Factory<? extends Sequential> generator;
    WFQ<Invocation> wfq = new WFQ<Invocation>();
    Universal(Factory<? extends Sequential> g) { generator = g; } 
    public Response apply(Invocation i)
    {
        int max = wfq.enqueue(i);
        Sequential s = generator.generate();
        for(Invocation invoc : wfq.iterateUntil(max))
            s.apply(invoc);
        return s.apply(i);
    }
}

Cette implémentation n'est pas très satisfaisante car elle est vraiment lente (vous vous souvenez de chaque appel, et vous devez la rejouer à chaque application - nous avons un temps d'exécution linéaire dans la taille de l'historique). Existe-t-il un moyen d'étendre les interfaces WFQet Sequential(de manière raisonnable) pour nous permettre d'enregistrer certaines étapes lors de l'application d'un nouvel appel?

Pouvons-nous rendre cela plus efficace (pas d'exécution linéaire dans la taille de l'historique, de préférence l'utilisation de la mémoire diminue également) sans perdre la propriété sans attente?

Clarification

Une "construction universelle" est un terme qui, j'en suis sûr, a été inventé par [1] qui accepte un objet thread-unsafe mais compatible thread, qui est généralisé par l' Sequentialinterface. À l'aide d'une file d'attente sans attente, la première construction offre une version thread-safe, linéarisable de l'objet qui est également sans attente (cela suppose un déterminisme et des applyopérations d' arrêt ).

Cela est inefficace, car la méthode consiste à faire démarrer chaque thread local à partir d'une table blanche et à lui appliquer toutes les opérations jamais enregistrées. Dans tous les cas, cela fonctionne car il réalise la synchronisation efficacement en utilisant le WFQpour déterminer l'ordre dans lequel toutes les opérations doivent être appliquées: chaque thread appelant applyverra le même Sequentialobjet local , avec la même séquence de Invocations qui lui sera appliquée.

Ma question est de savoir si nous pouvons (par exemple) introduire un processus de nettoyage en arrière-plan qui met à jour "l'état de départ" afin que nous n'ayons pas à redémarrer à partir de zéro. Ce n'est pas aussi simple que d'avoir un pointeur atomique avec un pointeur de départ - ces types d'approches perdent facilement la garantie sans attente. Je soupçonne qu'une autre approche basée sur la file d'attente pourrait fonctionner ici.

Jargon:

  1. sans attente - quel que soit le nombre de threads ou la prise de décision du planificateur, applyse terminera par un nombre limité d'instructions exécutables pour ce thread.
  2. sans verrou - comme ci-dessus, mais admet la possibilité d'un temps d'exécution illimité, uniquement dans le cas où un nombre illimité d' applyopérations sont effectuées dans d'autres threads. En règle générale, les schémas de synchronisation optimistes entrent dans cette catégorie.
  3. blocage - efficacité à la merci de l'ordonnanceur.

Un exemple de travail, comme demandé (maintenant sur une page qui n'expirera pas)

[1] Herlihy et Shavit, L'art de la programmation multiprocesseur .

VF1
la source
La question 1 ne peut être répondue que si nous savons ce que «fonctionne» signifie pour vous.
Robert Harvey
@RobertHarvey Je l'ai corrigé - tout ce qu'il faut pour "fonctionner" est que l'encapsuleur soit sans attente et que toutes les opérations CopyableSequentialsoient valides - la linéarisation devrait alors découler du fait qu'elle l'est Sequential.
VF1
Il y a beaucoup de mots significatifs dans cette question, mais j'ai du mal à les rassembler pour comprendre exactement ce que vous essayez d'accomplir. Pouvez-vous expliquer le problème que vous essayez de résoudre et peut-être éclaircir un peu le jargon?
JimmyJames
@JimmyJames J'ai développé dans un "commentaire étendu" à l'intérieur de la question. Veuillez me faire savoir s'il y a un autre jargon à clarifier.
VF1
dans le premier paragraphe du commentaire, vous dites "objet non sûr pour les threads mais compatible pour les threads" et "version linéarisable de l'objet". On ne sait pas ce que vous entendez par là car les thread-safe et linéarisables ne sont vraiment pertinents que pour les instructions exécutables, mais vous les utilisez pour décrire des objets, qui sont des données. Je suppose que l' invocation (qui n'est pas définie) est en fait un pointeur de méthode et c'est cette méthode qui n'est pas thread-safe. Je ne sais pas ce que signifie compatible avec les threads .
JimmyJames

Réponses:

1

Voici une explication et un exemple de la façon dont cela est accompli. Faites-moi savoir s'il y a des parties qui ne sont pas claires.

Gist avec source

Universel

Initialisation:

Les index de threads sont appliqués de manière atomiquement incrémentée. Ceci est géré à l'aide d'un AtomicIntegernommé nextIndex. Ces index sont affectés aux threads via une ThreadLocalinstance qui s'initialise en récupérant l'index suivant nextIndexet en l'incrémentant. Cela se produit la première fois que l'index de chaque thread est récupéré la première fois. Un ThreadLocalest créé pour suivre la dernière séquence créée par ce thread. Il est initialisé à 0. La référence d'objet de fabrique séquentielle est transmise et stockée. Deux AtomicReferenceArrayinstances sont créées de taille n. L'objet de queue est affecté à chaque référence, après avoir été initialisé avec l'état initial fourni par l' Sequentialusine. nest le nombre maximal de threads autorisés. Chaque élément de ces tableaux «appartient» à l'index de thread correspondant.

Appliquer la méthode:

C'est la méthode qui fait le travail intéressant. Il fait ce qui suit:

  • Créez un nouveau nœud pour cette invocation: le mien
  • Définissez ce nouveau nœud dans le tableau d'annonce à l'index du thread actuel

Ensuite, la boucle de séquençage commence. Elle se poursuivra jusqu'à ce que l'invocation en cours soit séquencée:

  1. rechercher un nœud dans le tableau d'annonce en utilisant la séquence du dernier nœud créé par ce thread. Plus d'informations à ce sujet plus tard.
  2. si un nœud est trouvé à l'étape 2, il n'est pas encore séquencé, continuez-le, sinon, concentrez-vous simplement sur l'invocation en cours. Cela essaiera seulement d'aider un autre nœud par appel.
  3. Quel que soit le nœud sélectionné à l'étape 3, continuez d'essayer de le séquencer après le dernier nœud séquencé (d'autres threads peuvent interférer.) Quel que soit le succès, définissez la référence de tête de threads actuelle sur la séquence renvoyée par decideNext()

La clé de la boucle imbriquée décrite ci-dessus est la decideNext()méthode. Pour comprendre cela, nous devons regarder la classe Node.

Classe de noeud

Cette classe spécifie les nœuds dans une liste à double liaison. Il n'y a pas beaucoup d'action dans cette classe. La plupart des méthodes sont de simples méthodes de récupération qui devraient être assez explicites.

méthode de la queue

cela renvoie une instance de nœud spéciale avec une séquence de 0. Elle agit simplement comme un espace réservé jusqu'à ce qu'une invocation la remplace.

Propriétés et initialisation

  • seq: le numéro de séquence, initialisé à -1 (signifiant non séquencé)
  • invocation: la valeur de l'invocation de apply(). Mis en construction.
  • next: AtomicReferencepour la liaison directe. une fois attribué, cela ne sera jamais changé
  • previous: AtomicReferencepour le lien arrière attribué lors du séquençage et effacé partruncate()

Décider ensuite

Cette méthode n'est qu'une dans Node avec une logique non triviale. En résumé, un nœud est proposé comme candidat pour être le nœud suivant dans la liste chaînée. La compareAndSet()méthode vérifiera si sa référence est nulle et si oui, définissez la référence au candidat. Si la référence est déjà définie, elle ne fait rien. Cette opération est atomique donc si deux candidats sont proposés en même temps, un seul sera sélectionné. Cela garantit qu'un seul nœud sera jamais sélectionné comme prochain. Si le nœud candidat est sélectionné, sa séquence est définie sur la valeur suivante et son lien précédent est défini sur ce nœud.

Retour à la méthode d'application de la classe universelle ...

Après avoir appelé decideNext()le dernier nœud séquencé (lorsqu'il est vérifié) avec notre nœud ou un nœud de la announcebaie, il y a deux occurrences possibles: 1. Le nœud a été séquencé avec succès 2. Un autre thread a anticipé ce thread.

L'étape suivante consiste à vérifier si le nœud créé pour cette invocation. Cela peut se produire parce que ce thread l'a séquencé avec succès ou qu'un autre thread l'a récupéré du announcetableau et l'a séquencé pour nous. S'il n'a pas été séquencé, le processus est répété. Sinon, l'appel se termine en effaçant le tableau d'annonces de l'index de ce thread et en renvoyant la valeur de résultat de l'invocation. Le tableau d'annonce est effacé pour garantir qu'il n'y a aucune référence au nœud laissé autour qui empêcherait le nœud d'être récupéré et donc garder tous les nœuds de la liste liée à partir de ce point en vie sur le tas.

Évaluer la méthode

Maintenant que le nœud de l'invocation a été correctement séquencé, l'invocation doit être évaluée. Pour ce faire, la première étape consiste à s'assurer que les invocations précédant celle-ci ont été évaluées. S'ils n'ont pas ce fil, ils n'attendront pas, mais ils le feront immédiatement.

Méthode EnsurePrior

La ensurePrior()méthode effectue ce travail en vérifiant le nœud précédent dans la liste chaînée. Si son état n'est pas défini, le nœud précédent sera évalué. Nœud que c'est récursif. Si le nœud avant le nœud précédent n'a pas été évalué, il appellera évaluer pour ce nœud et ainsi de suite.

Maintenant que le nœud précédent est connu pour avoir un état, nous pouvons évaluer ce nœud. Le dernier nœud est récupéré et affecté à une variable locale. Si cette référence est nulle, cela signifie qu'un autre thread a anticipé celui-ci et a déjà évalué ce nœud; mettre son état. Sinon, l'état du nœud précédent est transmis à la Sequentialméthode apply de l' objet avec l'invocation de ce nœud. L'état renvoyé est défini sur le nœud et la truncate()méthode est appelée, supprimant le lien arrière du nœud car il n'est plus nécessaire.

Méthode MoveForward

La méthode Move Forward tentera de déplacer toutes les références de tête vers ce nœud si elles ne pointent pas déjà vers quelque chose plus loin. Cela permet de garantir que si un thread cesse d'appeler, sa tête ne conservera pas de référence à un nœud qui n'est plus nécessaire. La compareAndSet()méthode s'assurera que nous ne mettons à jour le nœud que si un autre thread ne l'a pas modifié depuis qu'il a été récupéré.

Annoncer la baie et aider

La clé pour rendre cette approche sans attente par opposition à simplement sans verrouillage est que nous ne pouvons pas supposer que le planificateur de threads donnera la priorité à chaque thread quand il en aura besoin. Si chaque thread tente simplement de séquencer ses propres nœuds, il est possible qu'un thread soit continuellement préempté sous charge. Pour tenir compte de cette possibilité, chaque thread essaiera d'abord d'aider les autres threads qui ne pourront peut-être pas être séquencés.

L'idée de base est que chaque thread créant avec succès des nœuds, les séquences affectées augmentent de façon monotone. Si un thread ou des threads préemptent continuellement un autre thread, l'index utilisé pour rechercher les nœuds non séquencés dans le announcetableau avancera. Même si chaque thread qui essaie actuellement de séquencer un nœud donné est continuellement préempté par un autre thread, tous les threads tenteront éventuellement de séquencer ce nœud. Pour illustrer, nous allons construire un exemple avec trois threads.

Au point de départ, la tête et les éléments d'annonce des trois threads sont pointés vers le tailnœud. Le lastSequencepour chaque thread est 0.

À ce stade, le thread 1 est exécuté avec une invocation. Il vérifie le tableau d'annonce pour sa dernière séquence (zéro) qui est le nœud qu'il est actuellement programmé pour indexer. Il séquence le nœud et il lastSequenceest défini sur 1.

Le thread 2 est maintenant exécuté avec une invocation, il vérifie le tableau d'annonces à sa dernière séquence (zéro) et voit qu'il n'a pas besoin d'aide et tente donc de séquencer son invocation. Il réussit et maintenant il lastSequenceest réglé sur 2.

Le thread 3 est maintenant exécuté et il voit également que le nœud à announce[0]est déjà séquencé et séquence sa propre invocation. Il lastSequenceest désormais réglé sur 3.

Le thread 1 est à nouveau invoqué. Il vérifie le tableau d'annonces à l'index 1 et constate qu'il est déjà séquencé. Simultanément, le thread 2 est appelé. Il vérifie le tableau d'annonces à l'index 2 et constate qu'il est déjà séquencé. Les deux Discussion 1 et Discussion 2 tentent maintenant de séquencer leurs propres noeuds. Le thread 2 gagne et il séquence son invocation. Il lastSequenceest défini sur 4. Pendant ce temps, le thread trois a été appelé. Il vérifie l'index lastSequence(mod 3) et constate que le nœud à announce[0]n'a pas été séquencé. Le thread 2 est à nouveau invoqué en même temps que le thread 1 en est à sa deuxième tentative. Fil 1trouve une invocation non séquencée à announce[1]laquelle se trouve le nœud qui vient d'être créé par le thread 2 . Il tente de séquencer l' appel de Thread 2 et réussit. Le thread 2 trouve son propre nœud announce[1]et il a été séquencé. Il est réglé lastSequencesur 5. Le thread 3 est ensuite appelé et trouve que le nœud sur lequel le thread 1 est placé announce[0]n'est toujours pas séquencé et tente de le faire. Pendant ce temps, Thread 2 a également été invoqué et préempte Thread 3. Il séquence son nœud et le définit lastSequencesur 6.

Mauvais fil 1 . Même si Thread 3 essaie de le séquencer, les deux threads ont été continuellement contrecarrés par le planificateur. Mais à ce stade. Le thread 2 pointe également vers announce[0](6 mod 3). Les trois threads sont définis pour tenter de séquencer le même appel. Quel que soit le thread qui réussit, le prochain nœud à séquencer sera l'invocation en attente du thread 1, c'est-à-dire le nœud référencé par announce[0].

C'est inévitable. Pour que les threads soient anticipés, les autres threads doivent être des nœuds de séquençage et, ce faisant, ils avanceront continuellement lastSequence. Si le nœud d'un thread donné n'est pas continuellement séquencé, tous les threads pointeront finalement vers son index dans le tableau d'annonce. Aucun thread ne fera quoi que ce soit tant que le noeud qu'il essaie d'aider n'a pas été séquencé, le pire des cas est que tous les threads pointent vers le même noeud non séquencé. Par conséquent, le temps requis pour séquencer toute invocation est fonction du nombre de threads et non de la taille de l'entrée.

JimmyJames
la source
Pourriez-vous mettre certains extraits de code sur pastebin? Beaucoup de choses (comme la liste chaînée sans verrouillage) peuvent être simplement déclarées comme telles? Il est un peu difficile de comprendre votre réponse dans son ensemble quand il y a tant de détails. En tout cas, cela semble prometteur, je voudrais certainement creuser dans les garanties qu'il offre.
VF1
Cela semble certainement être une implémentation sans verrou valide, mais il manque le problème fondamental qui m'inquiète. L'exigence de linéarisation nécessite la présence d'un "historique valide" qui, dans le cas d'une implémentation de liste chaînée, a besoin d'un pointeur previouset nextpour être valide. Il est difficile de maintenir et de créer un historique valide sans attente.
VF1
@ VF1 Je ne sais pas quel problème n'est pas résolu. Tout ce que vous mentionnez dans le reste du commentaire est traité dans l'exemple que j'ai donné, d'après ce que je peux dire.
JimmyJames
Vous avez abandonné la propriété sans attente .
VF1
@ VF1 Comment figurez-vous?
JimmyJames
0

Ma réponse précédente ne répond pas vraiment à la question correctement, mais comme le PO la juge utile, je la laisse telle quelle. Basé sur le code dans le lien dans la question, voici ma tentative. Je n'ai fait que des tests vraiment basiques à ce sujet, mais il semble calculer correctement les moyennes. Les commentaires sont les bienvenus pour savoir si cela est correctement sans attente.

REMARQUE : j'ai supprimé l'interface universelle et en ai fait une classe. Faire en sorte qu'Universal soit composé de séquences et en être un semble une complication inutile, mais il me manque peut-être quelque chose. Dans la classe moyenne, j'ai marqué la variable d'état comme étant volatile. Ce n'est pas nécessaire pour faire fonctionner le code. Pour être prudent (une bonne idée avec le filetage) et empêcher chaque fil de faire tous les calculs (une fois).

Séquentiel et usine

public interface Sequential<E, S, R>
{ 
  R apply(S priorState);

  S state();

  default boolean isApplied()
  {
    return state() != null;
  }
}

public interface Factory<E, S, R>
{
   S initial();

   Sequential<E, S, R> generate(E input);
}

Universel

import java.util.concurrent.ConcurrentLinkedQueue;

public class Universal<I, S, R> 
{
  private final Factory<I, S, R> generator;
  private final ConcurrentLinkedQueue<Sequential<I, S, R>> wfq = new ConcurrentLinkedQueue<>();
  private final ThreadLocal<Sequential<I, S, R>> last = new ThreadLocal<>();

  public Universal(Factory<I, S, R> g)
  { 
    generator = g;
  }

  public R apply(I invocation)
  {
    Sequential<I, S, R> newSequential = generator.generate(invocation);
    wfq.add(newSequential);

    Sequential<I, S, R> last = null;
    S prior = generator.initial(); 

    for (Sequential<I, S, R> i : wfq) {
      if (!i.isApplied() || newSequential == i) {
        R r = i.apply(prior);

        if (i == newSequential) {
          wfq.remove(last.get());
          last.set(newSequential);

          return r;
        }
      }

      prior = i.state();
    }

    throw new IllegalStateException("Houston, we have a problem");
  }
}

Moyenne

public class Average implements Sequential<Integer, Average.State, Double>
{
  private final Integer invocation;
  private volatile State state;

  private Average(Integer invocation)
  {
    this.invocation = invocation;
  }

  @Override
  public Double apply(State prior)
  {
    System.out.println(Thread.currentThread() + " " + invocation + " prior " + prior);

    state = prior.add(invocation);

    return ((double) state.sum)/ state.count;
  }

  @Override
  public State state()
  {
    return state;
  }

  public static class AverageFactory implements Factory<Integer, State, Double> 
  {
    @Override
    public State initial()
    {
      return new State(0, 0);
    }

    @Override
    public Average generate(Integer i)
    {
      return new Average(i);
    }
  }

  public static class State
  {
    private final int sum;
    private final int count;

    private State(int sum, int count)
    {
      this.sum = sum;
      this.count = count;
    }

    State add(int value)
    {
      return new State(sum + value, count + 1);
    }

    @Override
    public String toString()
    {
      return sum + " / " + count;
    }
  }
}

Code démo

private static final int THREADS = 10;
private static final int SIZE = 50;

public static void main(String... args)
{
  Average.AverageFactory factory = new Average.AverageFactory();

  Universal<Integer, Average.State, Double> universal = new Universal<>(factory);

  for (int i = 0; i < THREADS; i++)
  {
    new Thread(new Test(i * SIZE, universal)).start();
  }
}

static class Test implements Runnable
{
  final int start;
  final Universal<Integer, Average.State, Double> universal;

  Test(int start, Universal<Integer, Average.State, Double> universal)
  {
    this.start = start;
    this.universal = universal;
  }

  @Override
  public void run()
  {
    for (int i = start; i < start + SIZE; i++)
    {
      System.out.println(Thread.currentThread() + " " + i);

      System.out.println(System.nanoTime() + " " + Thread.currentThread() + " " + i + " result " + universal.apply(i));
    }
  }
}

J'ai apporté quelques modifications au code pendant que je le publiais ici. Ça devrait aller, mais faites-moi savoir si vous avez des problèmes avec ça.

JimmyJames
la source
Vous n'avez pas besoin de garder votre autre réponse pour moi (j'ai déjà mis à jour ma question pour en tirer des conclusions pertinentes). Malheureusement, cette réponse ne répond pas non plus à la question, car elle ne libère en fait aucune de la mémoire dans le wfq, vous devez donc parcourir toute l'historique - le temps d'exécution ne s'est amélioré que d'un facteur constant.
VF1
@ Vf1 Le temps qu'il faudra pour parcourir toute la liste pour vérifier s'il a été calculé sera minuscule par rapport à chaque calcul. Étant donné que les états précédents ne sont pas requis, il devrait être possible de supprimer les états initiaux. Les tests sont difficiles et peuvent nécessiter l'utilisation d'une collection personnalisée, mais j'ai ajouté un petit changement.
JimmyJames
@ VF1 Mise à jour vers une implémentation qui semble fonctionner avec des tests superficiels de base. Je ne suis pas sûr que ce soit sûr mais du haut de ma tête, si l'universel était au courant des threads qui travaillent avec lui, il pourrait garder une trace de chaque thread et supprimer des éléments une fois que tous les threads les ont dépassés.
JimmyJames
@ VF1 En regardant le code de ConcurrentLinkedQueue, la méthode d'offre a une boucle semblable à celle qui, selon vous, a rendu l'autre réponse sans attente. Recherchez le commentaire "Course CAS perdue vers un autre fil;
relisez
"Il devrait être possible de supprimer les états initiaux" - exactement. Cela devrait être le cas , mais il est facile d'introduire subtilement du code qui perd la liberté d'attente. Un schéma de suivi des threads peut fonctionner. Enfin, je n'ai pas accès à la source CLQ, cela vous dérangerait de créer un lien?
VF1