summaryrefslogtreecommitdiff
path: root/scripts/python-learning.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/python-learning.py')
-rw-r--r--scripts/python-learning.py157
1 files changed, 0 insertions, 157 deletions
diff --git a/scripts/python-learning.py b/scripts/python-learning.py
deleted file mode 100644
index d144678..0000000
--- a/scripts/python-learning.py
+++ /dev/null
@@ -1,157 +0,0 @@
-import numpy as np
-import pandas
-from math import ceil, floor
-from sklearn.model_selection import train_test_split
-from sklearn import svm
-from sklearn.model_selection import KFold, StratifiedKFold, permutation_test_score
-from sklearn import linear_model
-from sklearn.svm import SVC
-import pickle
-import matplotlib.pyplot as plt
-import numpy.fft as fft
-from sklearn import datasets
-
-class MyOVBox(OVBox):
- def __init__(self):
- OVBox.__init__(self)
- # Names of CSV files
- self.signalFileName = ""
- self.stimFileName = ""
- # Name of Save File
- self.saveFileName = ""
- # other variables
- self.windowSize = 0 # window size in ms
- self.numOfPreviousWindowsAsOne = 0 # number of windows before actual stimulation to be marked as 1
- self.numOfWindowsBefore = 0 # number of windows before those marked as 1, to be marked as 0
- self.numOfWindowsAfter = 0 # number of windows after those marked as 1, to be marked as 0
-
-
- def filter_signal(self, sampleRate, numberOfSamplesWindow, stimulationTimes, splittedSignal):
- """ returns tuple of filtered (signal chunks, classes) """
- splittedSignal_filtrd = []
- classes_filtrd = []
-
- temp_classes = np.zeros(len(splittedSignal))
-
- for stim in stimulationTimes:
- index = int(floor(stim*sampleRate/numberOfSamplesWindow))
- temp_classes[index] = 1
- for i in range(1, self.numOfPreviousWindowsAsOne):
- temp_classes[index-i] = 1
-
- tmp_cls_winds = temp_classes[(index - self.numOfPreviousWindowsAsOne - self.numOfWindowsBefore):index+self.numOfWindowsAfter]
- tmp_sig_winds = np.concatenate(splittedSignal[(index - self.numOfPreviousWindowsAsOne - self.numOfWindowsBefore):index+self.numOfWindowsAfter])
-
- if len(tmp_sig_winds)/len(tmp_cls_winds)!=numberOfSamplesWindow: # if np.array_split does not split in equal windows
- tmp_sig_winds = np.lib.pad(tmp_sig_winds, ((0, int(len(tmp_cls_winds)*numberOfSamplesWindow-len(tmp_sig_winds))),(0, 0)), 'edge') # pad with same values on end of array
-
- classes_filtrd.extend(tmp_cls_winds)
- splittedSignal_filtrd.extend(np.array_split(tmp_sig_winds, len(tmp_cls_winds)))
-
- return (splittedSignal_filtrd, classes_filtrd)
-
-
- def avg_k_fold(self, data, classes, k=4):
- """ return average CA of k-fold cross validation """
- avg_val = 0
-
- kf = KFold(n_splits=k)
- for train, test in kf.split(data):
- clf = svm.SVC(kernel='linear', C=1).fit(data[train], classes[train])
- cur_score = clf.score(data[test], classes[test])
- avg_val += cur_score
- # print cur_score
- return avg_val/k
-
-
- def permutation_significance_classification_score(self, X, y, k_folds=4):
- n_classes = np.unique(y).size
-
- svm = SVC(kernel='linear')
- cv = StratifiedKFold(k_folds)
-
- score, permutation_scores, pvalue = permutation_test_score(svm, X, y, scoring="accuracy", cv=cv, n_permutations=200, n_jobs=1)
- print("Classification score %s (pvalue : %s)" % (score, pvalue))
-
- plt.hist(permutation_scores, 20, label='Permutation scores')
- ylim = plt.ylim()
- plt.plot(2 * [score], ylim, '--g', linewidth=3,
- label='Classification Score'
- ' (pvalue %s)' % pvalue)
- plt.plot(2 * [1. / n_classes], ylim, '--k', linewidth=3, label='Luck')
-
- plt.ylim(ylim)
- plt.legend()
- plt.xlabel('Score')
- plt.show()
-
-
- def initialize(self):
- # Names of CSV files
- self.signalFileName = self.setting['InputCSVSignal']
- self.stimFileName = self.setting['InputCSVStimulations']
- # Name of Save File
- self.saveFileName = self.setting['SaveFile']
- # other variables
- self.windowSize = int(self.setting['WindowSize (ms)']) # in ms
- self.numOfPreviousWindowsAsOne = int(self.setting['NumOfPrevWindows'])
- self.numOfWindowsBefore = int(self.setting['NumOfWindowsBefore'])-1
- self.numOfWindowsAfter = int(self.setting['NumOfWindowsAfter'])+1
- self.k_folds = int(self.setting['K-folds'])
-
- print "Reading files..."
- # read CSV files
- signalArray = pandas.read_csv(self.signalFileName, delimiter=";", encoding="utf-8-sig")
- stimsArray = pandas.read_csv(self.stimFileName, delimiter=";", encoding="utf-8-sig")
- print "Files read!"
-
- # sort information from tables (pandas dataframe)
- time = signalArray['Time (s)']
- electrodes = signalArray.iloc[:, 1:signalArray.shape[1]-1]
- sampleRate = signalArray['Sampling Rate'][0]
- stimulationTimes = stimsArray['Time (s)']
-
- numberOfSamplesWindow = floor(self.windowSize / ((1/sampleRate)*1000)) # number of samples for approximately self.windowSize ms
-
- splittedSignal = np.array_split(electrodes, ceil(len(time)/numberOfSamplesWindow)) # split signal into chunks of specified length
-
- s_c = self.filter_signal(sampleRate, numberOfSamplesWindow, stimulationTimes, splittedSignal) # filter signal and return sparsed version of chunks and assign classes
- splittedSignal_filtrd = s_c[0]
- classes_filtrd = s_c[1]
-
- splittedSignal_filtrd_means = np.array(np.mean(splittedSignal_filtrd, axis=1)) # for each window calculate mean value
- # additional attributes could be added besides splittedSignal_filtrd_means
- classes_filtrd = np.array(classes_filtrd)
-
- # print average k-fold CA
- print "Average " + str(self.k_folds) + "-folds value: " + str(self.avg_k_fold(splittedSignal_filtrd_means, classes_filtrd, k=self.k_folds))
- #self.permutation_significance_classification_score(splittedSignal_filtrd_means, classes_filtrd, k_folds=2) # k_folds=2, last long if k is bigger
-
- if self.saveFileName:
- clf = svm.SVC(kernel='linear', C=1).fit(splittedSignal_filtrd_means, classes_filtrd)
- clf.fit(splittedSignal_filtrd_means, classes_filtrd)
-
- print "Saving to pickle file: " + self.saveFileName
- pickle.dump(clf, open(self.saveFileName, 'wb'))
- print "Learned score: " + str(clf.score(splittedSignal_filtrd_means, classes_filtrd))
-
- # send finish stimulation output (for OpenViBE)
- self.finishBySendingStimulation(32774) # OVTK_StimulationId_TrialStop code
-
-
- def finishBySendingStimulation(self, stimulationCode):
- stimSetFinish = OVStimulationSet(self.getCurrentTime(), self.getCurrentTime()+1./self.getClock())
- stimSetFinish.append(OVStimulation(stimulationCode, self.getCurrentTime(), 0.))
- self.output[0].append(stimSetFinish)
-
-
- def process(self):
-
- return
-
- def uninitialize(self):
- # nop
-
- return
-
-box = MyOVBox()