By Andrew Chung\
Id: WealthQuant
Links:
www.zhihu.com/question/23…
It has been reprinted with authorisation
Quantifying the correlation between two time series can be done in many ways. Here is my summary for reference only (Python). Decide which correlation measure to use based on the type of signal you have, the assumptions you make about the signal, and what kind of synchronization data you want to look for in the data.
- The Person related
- Time lag cross-correlation (TLCC) and windowed TLCC
- Dynamic Time Warping (DTW)
- Instantaneous phase synchronization
* * * *
1. Pearson correlation — the simplest and best method
The Person correlation measures how two consecutive signals change together over time and can represent a linear relationship between them as numbers -1 (negative correlation), 0 (no correlation), and 1 (complete correlation). It’s intuitive, it’s easy to understand, it’s easy to explain. However, when using Pearson correlation, there are two things to pay attention to, which are: first, abnormal data may interfere with the results of correlation evaluation; Second, it assumes that the data are all homoscedasticity, in which case the variance of the data is homogeneous over the entire data range. Typically, correlation is a snapshot measure of global synchronicity. Therefore, it does not provide information about the directionality between two signals, for example, which signal is a lead signal and which is a follow signal.
The following code loads the sample data (in the same folder as the code), calculates the Pearson correlation using Pandas and Scipy, and plots the median filtered data.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats
df = pd.read_csv('synchrony_sample.csv')
overall_pearson_r = df.corr().iloc[0.1]
print(f"Pandas computed Pearson r: {overall_pearson_r}")
R: 0.2058774513561943
r, p = stats.pearsonr(df.dropna()['S1_Joy'], df.dropna()['S2_Joy'])
print(f"Scipy computed Pearson r: {r} and p-value: {p}")
R value: 0.20587745135619354 and p-value: 3.7902989479463397E-51
# Calculate sliding window synchronization
f,ax=plt.subplots(figsize=(7.3))
df.rolling(window=30,center=True).median().plot(ax=ax)
ax.set(xlabel='Time',ylabel='Pearson r')
ax.set(title=f"Overall Pearson r = {np.round(overall_pearson_r,2)}")
plt.show()
Copy the code
Again, all Pearson r values are used to measure global synchronization, which reduces the relationship between two signals to a single value. However, there is also a way to observe the state from moment to moment using Pearson correlation, known as local synchronization. One method of calculation is to measure the local Pearson correlation of the signal, and then repeat the process in all sliding Windows until all the signals are covered by the window. Since you can define the width of the window arbitrarily according to how many times you want to repeat it, this result will vary from person to person. In the code below, we use 120 frames for the window width (about 4 seconds) and show the synchronization results for each moment we draw in the image below.
# Set window width to calculate sliding window synchronization
r_window_size = 120
Insert missing values
df_interpolated = df.interpolate()
# Calculate sliding window synchronization
rolling_r = df_interpolated['S1_Joy'].rolling(window=r_window_size, center=True).corr(df_interpolated['S2_Joy'])
f,ax=plt.subplots(2.1,figsize=(14.6),sharex=True)
df.rolling(window=30,center=True).median().plot(ax=ax[0])
ax[0].set(xlabel='Frame',ylabel='Smiling Evidence')
rolling_r.plot(ax=ax[1])
ax[1].set(xlabel='Frame',ylabel='Pearson r')
plt.suptitle("Smiling data and rolling window correlation")
Copy the code
Overall, Pearson correlation is a good starting point tutorial that provides a very simple way to calculate global and local synchronicity. However, it does not provide information about signal dynamics, such as which signal comes first, which can be measured by cross-correlation.
* * * *
2. Time lag cross-correlation — evaluating signal dynamics
Time lag cross-correlation (TLCC) can define the directionality between two signals, such as a lead-and-follow relationship, in which the lead initializes a response and the follow repeats it. There are other ways to probe such relationships, including Granger causality, which is commonly used in economics, but note that these still do not necessarily reflect true causality. But by looking at the cross-correlation, we can still extract information about which signal came first.
As shown in the figure above, TLCC is measured by gradually moving a time series vector (red line) and repeatedly calculating the correlation between the two signals. If the peak correlation is in the center (offset=0), it means that the correlation of the two time series is the highest at this time. However, if one signal is leading another, the peak of the correlation may be at a different coordinate value. The following code applies a cross-correlation function that uses the functionality provided by PANDAS. It can also package the data so that correlation boundaries can be calculated by adding data on the other side of the signal.
def crosscorr(datax, datay, lag=0, wrap=False) :
""" Lag-N cross correlation. Shifted data filled with NaNs Parameters ---------- lag : int, default 0 datax, datay : pandas.Series objects of equal length Returns ---------- crosscorr : float """
if wrap:
shiftedy = datay.shift(lag)
shiftedy.iloc[:lag] = datay.iloc[-lag:].values
return datax.corr(shiftedy)
else:
return datax.corr(datay.shift(lag))
d1 = df['S1_Joy']
d2 = df['S2_Joy']
seconds = 5
fps = 30
rs = [crosscorr(d1,d2, lag) for lag in range(-int(seconds*fps-1),int(seconds*fps))]
offset = np.ceil(len(rs)/2)-np.argmax(rs)
f,ax=plt.subplots(figsize=(14.3))
ax.plot(rs)
ax.axvline(np.ceil(len(rs)/2),color='k',linestyle=The '-',label='Center')
ax.axvline(np.argmax(rs),color='r',linestyle=The '-',label='Peak synchrony')
ax.set(title=f'Offset = {offset} frames\nS1 leads <> S2 leads',ylim=[1..31.],xlim=[0.300], xlabel='Offset',ylabel='Pearson r')
ax.set_xticklabels([int(item-150) for item in ax.get_xticks()])
plt.legend()
plt.show()
Copy the code
In the figure above, we can infer from the negative coordinates that the Subject 1 (S1) signal interacts with the guiding signals (the correlation is highest when S2 is advanced 47 frames). However, this evaluation signal can change dynamically at the global level, such as the lead signal during the three minutes. On the other hand, we think that the interaction between signals might be more pronounced, whether they lead or follow, switching over time.
To evaluate more granular dynamic changes, we can calculate windowed time-lag cross-correlation (WTLCC). This process computes the time lag cross-correlation repeatedly in multiple time Windows of the signal. We can then analyze each window or take the sum over the window to provide a score comparing the difference in leader-follower interaction between the two.
The time lag of Windows is related to each other
seconds = 5
fps = 30
no_splits = 20
samples_per_split = df.shape[0]/no_splits
rss=[]
for t in range(0, no_splits):
d1 = df['S1_Joy'].loc[(t)*samples_per_split:(t+1)*samples_per_split]
d2 = df['S2_Joy'].loc[(t)*samples_per_split:(t+1)*samples_per_split]
rs = [crosscorr(d1,d2, lag) for lag in range(-int(seconds*fps- 1),int(seconds*fps))]
rss.append(rs)
rss = pd.DataFrame(rss)
f,ax = plt.subplots(figsize=(10.5))
sns.heatmap(rss,cmap='RdBu_r',ax=ax)
ax.set(title=f'Windowed Time Lagged Cross Correlation',xlim=[0.300], xlabel='Offset',ylabel='Window epochs')
ax.set_xticklabels([int(item- 150.) for item in ax.get_xticks()]);
Sliding window time lag is correlated
seconds = 5
fps = 30
window_size = 300 # sample
t_start = 0
t_end = t_start + window_size
step_size = 30
rss=[]
while t_end < 5400:
d1 = df['S1_Joy'].iloc[t_start:t_end]
d2 = df['S2_Joy'].iloc[t_start:t_end]
rs = [crosscorr(d1,d2, lag, wrap=False) for lag in range(-int(seconds*fps- 1),int(seconds*fps))]
rss.append(rs)
t_start = t_start + step_size
t_end = t_end + step_size
rss = pd.DataFrame(rss)
f,ax = plt.subplots(figsize=(10.10))
sns.heatmap(rss,cmap='RdBu_r',ax=ax)
ax.set(title=f'Rolling Windowed Time Lagged Cross Correlation',xlim=[0.300], xlabel='Offset',ylabel='Epochs')
ax.set_xticklabels([int(item- 150.) for item in ax.get_xticks()])
plt.show()
Copy the code
As shown in the figure above, the time series is divided into 20 time periods of equal length, and then the cross-correlation of each time window is calculated. This gives us a more fine-grained view of signal interactions. For example, in the first window (the first row), the red peak on the right tells us that S2 starts out directing the interaction. However, in the third or fourth window (row), we can see that S1 starts to conduct more interactions. We could go on and on and get a smooth picture like the one below.
Time-lag cross-correlation and windowed time-lag cross-correlation are good ways to look at more fine-grained dynamic interactions between two signals, such as leader-follow relationships and how they change over time. However, such calculations of signals assume that events are simultaneous and of similar length, which will be covered in the next section.
* * * *
3. Dynamic time warping — synchronizing signals of different lengths
* * * *
Dynamic time warping (DTW) is a method of calculating the path between two signals that minimizes the distance between them. The main advantage of this method is that it can handle signals of different lengths. Originally developed for linguistic analysis, DTW calculates the minimum distance that can match two signals by calculating the Euclidean distance between each frame and all the others. One drawback is that it can’t handle missing values, so if your data point has a missing value, you need to insert the data in advance.
To calculate DTW, we will use Python’s DTW package, which will speed up the computation.
from dtw import dtw,accelerated_dtw
d1 = df['S1_Joy'].interpolate().values
d2 = df['S2_Joy'].interpolate().values
d, cost_matrix, acc_cost_matrix, path = accelerated_dtw(d1,d2, dist='euclidean')
plt.imshow(acc_cost_matrix.T, origin='lower', cmap='gray', interpolation='nearest')
plt.plot(path[0], path[1].'w')
plt.xlabel('Subject1')
plt.ylabel('Subject2')
plt.title(f'DTW Minimum Path with minimum distance: {np.round(d,2)}')
plt.show()
Copy the code
As shown in the figure, we can see the shortest distance drawn by the white convex line. In other words, the synchronization of the older Subject2 data and the later Subject1 data matches. The shortest path cost is D =.33, which can be compared with this value of other signals.
* * * *
4. Instantaneous phase synchronization
* * * *
Finally, if you have a period of time series data that you think might have oscillatory properties (e.g., EEG and fMRI), you can also measure transient phase synchronization. It can also calculate the synchronicity of two signals at each moment. The results may vary from person to person because you need to filter the data to get the wavelength signal that you’re interested in, but you may only have some unpracticed reason for identifying those bands. To calculate phase synchronization, we need to extract the phase of the signal, which can be done by using the Hilbert transform, which separates the phase and energy of the signal (you can learn more about the Hilbert transform here). This allows us to assess whether the two signals are in phase (both signals increase or decrease together).
from scipy.signal import hilbert, butter, filtfilt
from scipy.fftpack import fft,fftfreq,rfft,irfft,ifft
import numpy as np
import seaborn as sns
import pandas as pd
import scipy.stats as stats
def butter_bandpass(lowcut, highcut, fs, order=5) :
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = butter(order, [low, high], btype='band')
return b, a
def butter_bandpass_filter(data, lowcut, highcut, fs, order=5) :
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
y = filtfilt(b, a, data)
return y
lowcut = .01
highcut = . 5
fs = 30.
order = 1
d1 = df['S1_Joy'].interpolate().values
d2 = df['S2_Joy'].interpolate().values
y1 = butter_bandpass_filter(d1,lowcut=lowcut,highcut=highcut,fs=fs,order=order)
y2 = butter_bandpass_filter(d2,lowcut=lowcut,highcut=highcut,fs=fs,order=order)
al1 = np.angle(hilbert(y1),deg=False)
al2 = np.angle(hilbert(y2),deg=False)
phase_synchrony = 1-np.sin(np.abs(al1-al2)/2)
N = len(al1)
# draw result
f,ax = plt.subplots(3.1,figsize=(14.7),sharex=True)
ax[0].plot(y1,color='r',label='y1')
ax[0].plot(y2,color='b',label='y2')
ax[0].legend(bbox_to_anchor=(0..1.02.1..102.),ncol=2)
ax[0].set(xlim=[0,N], title='Filtered Timeseries Data')
ax[1].plot(al1,color='r')
ax[1].plot(al2,color='b')
ax[1].set(ylabel='Angle',title='Angle at each Timepoint',xlim=[0,N])
phase_synchrony = 1-np.sin(np.abs(al1-al2)/2)
ax[2].plot(phase_synchrony)
ax[2].set(ylim=[0.1.1],xlim=[0,N],title='Instantaneous Phase Synchrony',xlabel='Time',ylabel='Phase Synchrony')
plt.tight_layout()
plt.show()
Copy the code
Instantaneous phase synchronization calculation is a good method to calculate the synchronization of two signals at each moment, and it does not require arbitrary window width as we calculate the correlation of sliding Windows.