summaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/python-learning.py157
1 files changed, 157 insertions, 0 deletions
diff --git a/scripts/python-learning.py b/scripts/python-learning.py
new file mode 100644
index 0000000..d144678
--- /dev/null
+++ b/scripts/python-learning.py
@@ -0,0 +1,157 @@
+import numpy as np
+import pandas
+from math import ceil, floor
+from sklearn.model_selection import train_test_split
+from sklearn import svm
+from sklearn.model_selection import KFold, StratifiedKFold, permutation_test_score
+from sklearn import linear_model
+from sklearn.svm import SVC
+import pickle
+import matplotlib.pyplot as plt
+import numpy.fft as fft
+from sklearn import datasets
+
+class MyOVBox(OVBox):
+ def __init__(self):
+ OVBox.__init__(self)
+ # Names of CSV files
+ self.signalFileName = ""
+ self.stimFileName = ""
+ # Name of Save File
+ self.saveFileName = ""
+ # other variables
+ self.windowSize = 0 # window size in ms
+ self.numOfPreviousWindowsAsOne = 0 # number of windows before actual stimulation to be marked as 1
+ self.numOfWindowsBefore = 0 # number of windows before those marked as 1, to be marked as 0
+ self.numOfWindowsAfter = 0 # number of windows after those marked as 1, to be marked as 0
+
+
+ def filter_signal(self, sampleRate, numberOfSamplesWindow, stimulationTimes, splittedSignal):
+ """ returns tuple of filtered (signal chunks, classes) """
+ splittedSignal_filtrd = []
+ classes_filtrd = []
+
+ temp_classes = np.zeros(len(splittedSignal))
+
+ for stim in stimulationTimes:
+ index = int(floor(stim*sampleRate/numberOfSamplesWindow))
+ temp_classes[index] = 1
+ for i in range(1, self.numOfPreviousWindowsAsOne):
+ temp_classes[index-i] = 1
+
+ tmp_cls_winds = temp_classes[(index - self.numOfPreviousWindowsAsOne - self.numOfWindowsBefore):index+self.numOfWindowsAfter]
+ tmp_sig_winds = np.concatenate(splittedSignal[(index - self.numOfPreviousWindowsAsOne - self.numOfWindowsBefore):index+self.numOfWindowsAfter])
+
+ if len(tmp_sig_winds)/len(tmp_cls_winds)!=numberOfSamplesWindow: # if np.array_split does not split in equal windows
+ tmp_sig_winds = np.lib.pad(tmp_sig_winds, ((0, int(len(tmp_cls_winds)*numberOfSamplesWindow-len(tmp_sig_winds))),(0, 0)), 'edge') # pad with same values on end of array
+
+ classes_filtrd.extend(tmp_cls_winds)
+ splittedSignal_filtrd.extend(np.array_split(tmp_sig_winds, len(tmp_cls_winds)))
+
+ return (splittedSignal_filtrd, classes_filtrd)
+
+
+ def avg_k_fold(self, data, classes, k=4):
+ """ return average CA of k-fold cross validation """
+ avg_val = 0
+
+ kf = KFold(n_splits=k)
+ for train, test in kf.split(data):
+ clf = svm.SVC(kernel='linear', C=1).fit(data[train], classes[train])
+ cur_score = clf.score(data[test], classes[test])
+ avg_val += cur_score
+ # print cur_score
+ return avg_val/k
+
+
+ def permutation_significance_classification_score(self, X, y, k_folds=4):
+ n_classes = np.unique(y).size
+
+ svm = SVC(kernel='linear')
+ cv = StratifiedKFold(k_folds)
+
+ score, permutation_scores, pvalue = permutation_test_score(svm, X, y, scoring="accuracy", cv=cv, n_permutations=200, n_jobs=1)
+ print("Classification score %s (pvalue : %s)" % (score, pvalue))
+
+ plt.hist(permutation_scores, 20, label='Permutation scores')
+ ylim = plt.ylim()
+ plt.plot(2 * [score], ylim, '--g', linewidth=3,
+ label='Classification Score'
+ ' (pvalue %s)' % pvalue)
+ plt.plot(2 * [1. / n_classes], ylim, '--k', linewidth=3, label='Luck')
+
+ plt.ylim(ylim)
+ plt.legend()
+ plt.xlabel('Score')
+ plt.show()
+
+
+ def initialize(self):
+ # Names of CSV files
+ self.signalFileName = self.setting['InputCSVSignal']
+ self.stimFileName = self.setting['InputCSVStimulations']
+ # Name of Save File
+ self.saveFileName = self.setting['SaveFile']
+ # other variables
+ self.windowSize = int(self.setting['WindowSize (ms)']) # in ms
+ self.numOfPreviousWindowsAsOne = int(self.setting['NumOfPrevWindows'])
+ self.numOfWindowsBefore = int(self.setting['NumOfWindowsBefore'])-1
+ self.numOfWindowsAfter = int(self.setting['NumOfWindowsAfter'])+1
+ self.k_folds = int(self.setting['K-folds'])
+
+ print "Reading files..."
+ # read CSV files
+ signalArray = pandas.read_csv(self.signalFileName, delimiter=";", encoding="utf-8-sig")
+ stimsArray = pandas.read_csv(self.stimFileName, delimiter=";", encoding="utf-8-sig")
+ print "Files read!"
+
+ # sort information from tables (pandas dataframe)
+ time = signalArray['Time (s)']
+ electrodes = signalArray.iloc[:, 1:signalArray.shape[1]-1]
+ sampleRate = signalArray['Sampling Rate'][0]
+ stimulationTimes = stimsArray['Time (s)']
+
+ numberOfSamplesWindow = floor(self.windowSize / ((1/sampleRate)*1000)) # number of samples for approximately self.windowSize ms
+
+ splittedSignal = np.array_split(electrodes, ceil(len(time)/numberOfSamplesWindow)) # split signal into chunks of specified length
+
+ s_c = self.filter_signal(sampleRate, numberOfSamplesWindow, stimulationTimes, splittedSignal) # filter signal and return sparsed version of chunks and assign classes
+ splittedSignal_filtrd = s_c[0]
+ classes_filtrd = s_c[1]
+
+ splittedSignal_filtrd_means = np.array(np.mean(splittedSignal_filtrd, axis=1)) # for each window calculate mean value
+ # additional attributes could be added besides splittedSignal_filtrd_means
+ classes_filtrd = np.array(classes_filtrd)
+
+ # print average k-fold CA
+ print "Average " + str(self.k_folds) + "-folds value: " + str(self.avg_k_fold(splittedSignal_filtrd_means, classes_filtrd, k=self.k_folds))
+ #self.permutation_significance_classification_score(splittedSignal_filtrd_means, classes_filtrd, k_folds=2) # k_folds=2, last long if k is bigger
+
+ if self.saveFileName:
+ clf = svm.SVC(kernel='linear', C=1).fit(splittedSignal_filtrd_means, classes_filtrd)
+ clf.fit(splittedSignal_filtrd_means, classes_filtrd)
+
+ print "Saving to pickle file: " + self.saveFileName
+ pickle.dump(clf, open(self.saveFileName, 'wb'))
+ print "Learned score: " + str(clf.score(splittedSignal_filtrd_means, classes_filtrd))
+
+ # send finish stimulation output (for OpenViBE)
+ self.finishBySendingStimulation(32774) # OVTK_StimulationId_TrialStop code
+
+
+ def finishBySendingStimulation(self, stimulationCode):
+ stimSetFinish = OVStimulationSet(self.getCurrentTime(), self.getCurrentTime()+1./self.getClock())
+ stimSetFinish.append(OVStimulation(stimulationCode, self.getCurrentTime(), 0.))
+ self.output[0].append(stimSetFinish)
+
+
+ def process(self):
+
+ return
+
+ def uninitialize(self):
+ # nop
+
+ return
+
+box = MyOVBox()