Pandas: segmentation en zigzag des données en fonction des minima-maxima locaux

10

J'ai des données de série temporelle. Génération de données

date_rng = pd.date_range('2019-01-01', freq='s', periods=400)
df = pd.DataFrame(np.random.lognormal(.005, .5,size=(len(date_rng), 3)),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)
s = df['data1']

Je veux créer une ligne en zig-zag reliant entre les maxima locaux et les minima locaux, qui satisfait la condition selon laquelle sur l'axe des y, |highest - lowest value|chaque ligne en zig-zag doit dépasser un pourcentage (disons 20%) de la distance de la précédente ligne en zig-zag ET une valeur prédéterminée k (disons 1.2)

Je peux trouver les extrema locaux en utilisant ce code:

# Find peaks(max).
peak_indexes = signal.argrelextrema(s.values, np.greater)
peak_indexes = peak_indexes[0]

# Find valleys(min).
valley_indexes = signal.argrelextrema(s.values, np.less)
valley_indexes = valley_indexes[0]
# Merge peaks and valleys data points using pandas.
df_peaks = pd.DataFrame({'date': s.index[peak_indexes], 'zigzag_y': s[peak_indexes]})
df_valleys = pd.DataFrame({'date': s.index[valley_indexes], 'zigzag_y': s[valley_indexes]})
df_peaks_valleys = pd.concat([df_peaks, df_valleys], axis=0, ignore_index=True, sort=True)

# Sort peak and valley datapoints by date.
df_peaks_valleys = df_peaks_valleys.sort_values(by=['date'])

mais je ne sais pas comment lui appliquer la condition de seuil. Veuillez me conseiller sur la façon d'appliquer une telle condition.

Étant donné que les données peuvent contenir des millions d'horodatages, un calcul efficace est fortement recommandé

Pour une description plus claire: entrez la description de l'image ici

Exemple de sortie, à partir de mes données:

 # Instantiate axes.
(fig, ax) = plt.subplots()
# Plot zigzag trendline.
ax.plot(df_peaks_valleys['date'].values, df_peaks_valleys['zigzag_y'].values, 
                                                        color='red', label="Zigzag")

# Plot original line.
ax.plot(s.index, s, linestyle='dashed', color='black', label="Org. line", linewidth=1)

# Format time.
ax.xaxis_date()
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))

plt.gcf().autofmt_xdate()   # Beautify the x-labels
plt.autoscale(tight=True)

plt.legend(loc='best')
plt.grid(True, linestyle='dashed')

entrez la description de l'image ici

Ma sortie souhaitée (quelque chose de similaire à cela, le zigzag ne connecte que les segments significatifs) entrez la description de l'image ici

Thanh Nguyen
la source

Réponses:

3

J'ai répondu à ma meilleure compréhension de la question. Pourtant, la manière dont la variable K influence le filtre n'est pas claire.

Vous souhaitez filtrer les extrema en fonction d'une condition en cours d'exécution. Je suppose que vous voulez marquer tous les extrema dont la distance relative au dernier extremum marqué est supérieure à p%. Je suppose en outre que vous considérez toujours le premier élément de la série temporelle comme un point valide / pertinent.

J'ai implémenté cela avec la fonction de filtre suivante:

def filter(values, percentage):
    previous = values[0] 
    mask = [True]
    for value in values[1:]: 
        relative_difference = np.abs(value - previous)/previous
        if relative_difference > percentage:
            previous = value
            mask.append(True)
        else:
            mask.append(False)
    return mask

Pour exécuter votre code, j'importe d'abord les dépendances:

from scipy import signal
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

Pour rendre le code reproductible, je fixe la graine aléatoire:

np.random.seed(0)

Le reste d'ici est copypasta. Notez que j'ai diminué la quantité d'échantillon pour rendre le résultat clair.

date_rng = pd.date_range('2019-01-01', freq='s', periods=30)
df = pd.DataFrame(np.random.lognormal(.005, .5,size=(len(date_rng), 3)),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)
s = df['data1']
# Find peaks(max).
peak_indexes = signal.argrelextrema(s.values, np.greater)
peak_indexes = peak_indexes[0]
# Find valleys(min).
valley_indexes = signal.argrelextrema(s.values, np.less)
valley_indexes = valley_indexes[0]
# Merge peaks and valleys data points using pandas.
df_peaks = pd.DataFrame({'date': s.index[peak_indexes], 'zigzag_y': s[peak_indexes]})
df_valleys = pd.DataFrame({'date': s.index[valley_indexes], 'zigzag_y': s[valley_indexes]})
df_peaks_valleys = pd.concat([df_peaks, df_valleys], axis=0, ignore_index=True, sort=True)
# Sort peak and valley datapoints by date.
df_peaks_valleys = df_peaks_valleys.sort_values(by=['date'])

Ensuite, nous utilisons la fonction de filtre:

p = 0.2 # 20% 
filter_mask = filter(df_peaks_valleys.zigzag_y, p)
filtered = df_peaks_valleys[filter_mask]

Et tracez comme vous l'avez fait à la fois votre tracé précédent ainsi que les extrema nouvellement filtrés:

 # Instantiate axes.
(fig, ax) = plt.subplots(figsize=(10,10))
# Plot zigzag trendline.
ax.plot(df_peaks_valleys['date'].values, df_peaks_valleys['zigzag_y'].values, 
                                                        color='red', label="Extrema")
# Plot zigzag trendline.
ax.plot(filtered['date'].values, filtered['zigzag_y'].values, 
                                                        color='blue', label="ZigZag")

# Plot original line.
ax.plot(s.index, s, linestyle='dashed', color='black', label="Org. line", linewidth=1)

# Format time.
ax.xaxis_date()
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))

plt.gcf().autofmt_xdate()   # Beautify the x-labels
plt.autoscale(tight=True)

plt.legend(loc='best')
plt.grid(True, linestyle='dashed')

entrez la description de l'image ici

MODIFIER :

Si vous souhaitez à la fois considérer le premier et le dernier point comme valides, vous pouvez adapter la fonction de filtrage comme suit:

def filter(values, percentage):
    # the first value is always valid
    previous = values[0] 
    mask = [True]
    # evaluate all points from the second to (n-1)th
    for value in values[1:-1]: 
        relative_difference = np.abs(value - previous)/previous
        if relative_difference > percentage:
            previous = value
            mask.append(True)
        else:
            mask.append(False)
    # the last value is always valid
    mask.append(True)
    return mask
Nikolas Rieble
la source
salut, merci pour la bonne réponse. Oui, votre hypothèse est juste "marquez tous les extrema dont la distance relative au dernier extremum marqué est supérieure à p%", et le premier et le dernier point doivent toujours être pris en compte. J'ai vérifié votre réponse, parfois il manque le dernier point, pourriez-vous m'aider à ce sujet?
Thanh Nguyen
3

Vous pouvez utiliser la fonctionnalité de roulement Pandas pour créer les extrema locaux. Cela simplifie un peu le code par rapport à votre approche Scipy.

Fonctions pour trouver les extrema:

def islocalmax(x):
    """Both neighbors are lower,
    assumes a centered window of size 3"""
    return (x[0] < x[1]) & (x[2] < x[1])

def islocalmin(x):
    """Both neighbors are higher,
    assumes a centered window of size 3"""
    return (x[0] > x[1]) & (x[2] > x[1])

def isextrema(x):
    return islocalmax(x) or islocalmin(x)

La fonction pour créer le zigzag, elle peut être appliquée sur le Dataframe à la fois (sur chaque colonne), mais cela introduira des NaN puisque les horodatages retournés seront différents pour chaque colonne. Vous pouvez facilement les supprimer plus tard, comme indiqué dans l'exemple ci-dessous, ou simplement appliquer la fonction sur une seule colonne de votre Dataframe.

Notez que j'ai décommenté le test par rapport à un seuil k, je ne suis pas sûr de bien comprendre cette partie correctement. Vous pouvez l'inclure si la différence absolue entre l'extrême précédent et l'extrême actuel doit être supérieure à k:& (ext_val.diff().abs() > k)

Je ne sais pas non plus si le zigzag final devrait toujours passer d'un niveau haut original à un niveau bas ou vice versa. Je l'ai supposé, sinon vous pouvez supprimer la deuxième recherche d'extrême à la fin de la fonction.

def create_zigzag(col, p=0.2, k=1.2):

    # Find the local min/max
    # converting to bool converts NaN to True, which makes it include the endpoints    
    ext_loc = col.rolling(3, center=True).apply(isextrema, raw=False).astype(np.bool_)

    # extract values at local min/max
    ext_val = col[ext_loc]

    # filter locations based on threshold
    thres_ext_loc = (ext_val.diff().abs() > (ext_val.shift(-1).abs() * p)) #& (ext_val.diff().abs() > k)

    # Keep the endpoints
    thres_ext_loc.iloc[0] = True
    thres_ext_loc.iloc[-1] = True

    thres_ext_loc = thres_ext_loc[thres_ext_loc]

    # extract values at filtered locations 
    thres_ext_val = col.loc[thres_ext_loc.index]

    # again search the extrema to force the zigzag to always go from high > low or vice versa,
    # never low > low, or high > high
    ext_loc = thres_ext_val.rolling(3, center=True).apply(isextrema, raw=False).astype(np.bool_)
    thres_ext_val  =thres_ext_val[ext_loc]

    return thres_ext_val

Générez quelques exemples de données:

date_rng = pd.date_range('2019-01-01', freq='s', periods=35)

df = pd.DataFrame(np.random.randn(len(date_rng), 3),
                  columns=['data1', 'data2', 'data3'],
                  index= date_rng)

df = df.cumsum()

Appliquez la fonction et extrayez le résultat pour la colonne 'data1':

dfzigzag = df.apply(create_zigzag)
data1_zigzag = dfzigzag['data1'].dropna()

Visualisez le résultat:

fig, axs = plt.subplots(figsize=(10, 3))

axs.plot(df.data1, 'ko-', ms=4, label='original')
axs.plot(data1_zigzag, 'ro-', ms=4, label='zigzag')
axs.legend()

entrez la description de l'image ici

Rutger Kassies
la source
Merci pour votre réponse. Je veux poser des questions sur cette ligne (ext_val.diff().abs() > (ext_val.shift(-1).abs() * p)), si je comprends bien, vous comparez la distance entre deux points avec p%le dernier point, ai-je raison? Parce que je veux comparer chaque segment en zigzag avec le segment précédent et répéter jusqu'à ce que la condition soit satisfaite.
Thanh Nguyen