summaryrefslogtreecommitdiff
path: root/abml/rules_prolog.py
blob: f67ba9349c8793ee968ac09344af13ded4e00688 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import itertools
import os.path
import pickle

import numpy as np
from Orange.classification.rules import _RuleClassifier, GuardianValidator
import orangecontrib.abml.abrules as rules
from Orange.classification.rules import Rule

class TrueCondValidator:
    """
    Checks whether all conditions have positive values
    """
    def __init__(self, max_rule_length, min_covered_examples):
        self.max_rule_length = max_rule_length
        self.min_covered_examples = min_covered_examples
        self.guardian = GuardianValidator(self.max_rule_length, self.min_covered_examples)

    def validate_rule(self, rule):
        for att, op, val in rule.selectors:
            if op == "!=" and rule.domain[att].values[int(val)] == "T" or \
                    op == "==" and rule.domain[att].values[int(val)] == "F":
                return False
        return self.guardian.validate_rule(rule)

class PureAccuracyValidator:
    def __init__(self, negative, threshold):
        self.negative = negative
        self.threshold = threshold

    def validate_rule(self, rule):
        if (rule.target_class == self.negative and
                (rule.curr_class_dist[rule.target_class] != rule.curr_class_dist.sum() and
                 rule.quality < self.threshold)):
            return False
        return True

class RelativePureValidator:
    def __init__(self, target, threshold, covered, Y):
        self.target = target
        self.threshold = threshold
        self.covered = covered
        self.Y = Y

    def validate_rule(self, rule):
        if rule.target_class == self.target: 
            rel_covered = rule.covered_examples & ~self.covered
            rel_Y = self.Y[rel_covered]
            rf = rel_Y[rel_Y == rule.target_class].sum()
            rf /= rel_covered.sum()
            if rf < self.threshold:
                return False
        return True

class NegativeFirstClassifier(_RuleClassifier):
    """ 
    Classificator from rules that first checks if a negative rule covers 
    an example. If it does, it will automatically classify example as negative.
    If it doesnt, then it checks for positive rules and assigns this example
    best rule's class accuracy. """
    def __init__(self, domain, rule_list):
        self.domain = domain
        self.rule_list = rule_list
        self.num_classes = len(self.domain.class_var.values)
        self.negative = self.domain.class_var.values.index("F")

    def coverage(self, data):
        self.predict(data.X)
        coverages = np.zeros((self.X.shape[0], len(self.rule_list)), dtype=bool)
        for ri, r in enumerate(self.rule_list):
            coverages[:, ri] = r.evaluate_data(self.X)
        return coverages

    def predict(self, X):
        self.X = X
        probabilities = np.zeros((X.shape[0], self.num_classes), dtype=float)
        # negative rules first
        neg_rules = [r for r in self.rule_list if r.target_class == self.negative]
        solved = np.zeros(X.shape[0], dtype=bool)
        for rule in neg_rules:
            covered = rule.evaluate_data(X)
            solved |= covered
        probabilities[solved, self.negative] = 1.0
        # now positive class
        pos_rules = [r for r in self.rule_list if r.target_class != self.negative]
        for rule in pos_rules:
            covered = rule.evaluate_data(X)
            to_change = covered & ~solved
            probabilities[to_change, rule.target_class] = rule.quality
            probabilities[to_change, np.arange(self.num_classes) != rule.target_class] = (1-rule.quality)/(self.num_classes-1)
            solved |= covered

        probabilities[~solved] = np.ones(self.num_classes) / self.num_classes
        return probabilities

class Rules4Prolog:
    def __init__(self, path, threshold):
        self.threshold = threshold
        self.learner = rules.ABRuleLearner(width=50, parent_alpha=0.05)
        self.learner.rule_finder.general_validator = TrueCondValidator(self.learner.rule_finder.general_validator.max_rule_length,
                                                                       self.learner.rule_finder.general_validator.min_covered_examples)
        self.learner.rule_validator = PureAccuracyValidator(0, self.threshold)
        self.learner.classifier = NegativeFirstClassifier
        self.learner.evds = pickle.load(open(os.path.join(path, 'evds.pickle'), 'rb'))

    def __call__(self, data):
        # first learn rules for negative class (quality should be higher than
        # threshold or distribution should be pure)
        self.learner.target_class = "F"
        neg_rules = self.learner(data).rule_list

        # then create another data set and remove all examples that negative
        # rules cover
        coverage = np.zeros(len(data), dtype=bool)
        for r in neg_rules:
            coverage |= r.covered_examples

        # learn positive rules, however accept them only if relative frequency
        # of rules on the temporary data set is higher than threshold OR there
        # are no negative examples
        X, Y, W = data.X, data.Y, data.W if data.W else None
        Y = Y.astype(dtype=int)
        self.learner.target_class = "T"
        old_validator = self.learner.rule_validator
        self.learner.rule_validator = RelativePureValidator(1, self.threshold, 
                                           coverage, Y)
        cls = self.learner(data)
        pos_rules = cls.rule_list

        # create sub rules that satisfy rule_validator's conditions
        """all_rules = []
        all_dists = set()
        for r in pos_rules:
            covered = r.covered_examples.tostring()
            tc = r.target_class
            if (covered, tc) not in all_dists:
                all_dists.add((covered, tc))
                all_rules.append(r)
                # add sub rules to all_rules
                s = r.selectors
                ps = itertools.chain.from_iterable(itertools.combinations(s, i) for i in range(len(s)))
                for p in ps:
                    if not p:
                        continue
                    newr = Rule(selectors = p, domain=r.domain,
                            initial_class_dist=r.initial_class_dist,
                            prior_class_dist=r.prior_class_dist,
                            quality_evaluator=r.quality_evaluator,
                            complexity_evaluator=r.complexity_evaluator)
                    newr.filter_and_store(X, Y, W, tc)
                    newr.do_evaluate()
                    covered = newr.covered_examples.tostring()
                    if (covered, tc) not in all_dists and \
                            self.learner.rule_validator.validate_rule(newr): # such rule is not in the set yet
                        all_dists.add((covered, tc))
                        all_rules.append(newr)
                        newr.create_model()"""

        # restore old validator to self.learner
        self.learner.rule_validator = old_validator
        return self.learner.classifier(domain=cls.domain, rule_list=neg_rules+pos_rules) #all_rules)




def create_learner(path, evds=True):
    rule_learner = rules.ABRuleLearner(width=50, parent_alpha=0.05)
    rule_learner.rule_finder.general_validator = TrueCondValidator(rule_learner.rule_finder.general_validator.max_rule_length,
                                                                   rule_learner.rule_finder.general_validator.min_covered_examples)
    rule_learner.rule_validator = PureAccuracyValidator(0, 0.8)
    rule_learner.classifier = NegativeFirstClassifier
    if evds:
        rule_learner.evds = pickle.load(open(os.path.join(path, 'evds.pickle'), 'rb'))
    return rule_learner