-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathSemi_EM_NB.py
232 lines (222 loc) · 13.1 KB
/
Semi_EM_NB.py
1
import numpy as npfrom copy import deepcopyfrom scipy.sparse import csr_matrix, vstackfrom sklearn.naive_bayes import MultinomialNBfrom sklearn.naive_bayes import GaussianNBfrom scipy.linalg import get_blas_funcsfrom sklearn.semi_supervised import LabelPropagation, LabelSpreadingclass Semi_EM_MultinomialNB(): """ Naive Bayes classifier for multinomial models for semi-supervised learning. Use both labeled and unlabeled data to train NB classifier, update parameters using unlabeled data, and all data to evaluate performance of classifier. Optimize classifier using Expectation-Maximization algorithm. """ def __init__(self, alpha=1.0, fit_prior=True, class_prior=None, max_iter=30, tol=1e-6, print_log_lkh=True): self.alpha = alpha self.fit_prior = fit_prior self.class_prior = class_prior self.clf = MultinomialNB(alpha=self.alpha, fit_prior=self.fit_prior, class_prior=self.class_prior) self.log_lkh = -np.inf # log likelihood self.max_iter = max_iter # max number of EM iterations self.tol = tol # tolerance of log likelihood increment self.feature_log_prob_ = np.array([]) # Empirical log probability of features given a class, P(x_i|y). self.coef_ = np.array([]) # Mirrors feature_log_prob_ for interpreting MultinomialNB as a linear model. self.print_log_lkh = print_log_lkh # if True, print log likelihood during EM iterations def fit(self, X_l, y_l, X_u): """ Initialize the parameter using labeled data only. Assume unlabeled class as missing values, apply EM on unlabeled data to refine classifier. """ n_ul_docs = X_u.shape[0] # number of unlabeled samples n_l_docs = X_l.shape[0] # number of labeled samples # initialization (n_docs = n_ul_docs) clf = deepcopy(self.clf)# build new copy of classifier clf.fit(X_l, y_l) # use labeled data only to initialize classifier parameters prev_log_lkh = self.log_lkh # record log likelihood of previous EM iteration lp_w_c = clf.feature_log_prob_ # log CP of word given class [n_classes, n_words] b_w_d = (X_u > 0) # words in each document [n_docs, n_words] lp_d_c = get_blas_funcs("gemm", [lp_w_c, b_w_d.T.toarray()]) # log CP of doc given class [n_classes, n_docs] lp_d_c = lp_d_c(alpha=1.0, a=lp_w_c, b=b_w_d.T.toarray()) lp_c = np.matrix(clf.class_log_prior_).T # log prob of classes [n_classes, 1] lp_c = np.repeat(lp_c, n_ul_docs, axis=1) # repeat for each doc [n_classes, n_docs] lp_dc = lp_d_c + lp_c # joint prob of doc and class [n_classes, n_docs] p_c_d = clf.predict_proba(X_u) # weight of each class in each doc [n_docs, n_classes] expectation = get_blas_funcs("gemm", [p_c_d, lp_dc]) # expectation of log likelihood over all unlabeled docs expectation = expectation(alpha=1.0, a=p_c_d, b=lp_dc).trace() self.clf = deepcopy(clf) self.log_lkh = expectation if self.print_log_lkh: print("Initial expected log likelihood = %0.3f\n" % expectation) # Loop until log likelihood does not improve iter_count = 0 # count EM iteration while (self.log_lkh-prev_log_lkh>=self.tol and iter_count<self.max_iter): # while (iter_count<self.max_iter): iter_count += 1 if self.print_log_lkh: print("EM iteration #%d" % iter_count) # debug # E-step: Estimate class membership of unlabeled documents y_u = clf.predict(X_u) # M-step: Re-estimate classifier parameters X = vstack([X_l, X_u]) y = np.concatenate((y_l, y_u), axis=0) clf.fit(X, y) # check convergence: update log likelihood p_c_d = clf.predict_proba(X_u) lp_w_c = clf.feature_log_prob_ # log CP of word given class [n_classes, n_words] b_w_d = (X_u > 0) # words in each document lp_d_c = get_blas_funcs("gemm", [lp_w_c, b_w_d.transpose().toarray()]) # log CP of doc given class [n_classes, n_docs] lp_d_c = lp_d_c(alpha=1.0, a=lp_w_c, b=b_w_d.transpose().toarray()) lp_c = np.matrix(clf.class_log_prior_).T # log prob of classes [n_classes, 1] lp_c = np.repeat(lp_c, n_ul_docs, axis=1) # repeat for each doc [n_classes, n_docs] lp_dc = lp_d_c + lp_c # joint prob of doc and class [n_classes, n_docs] expectation = get_blas_funcs("gemm", [p_c_d, lp_dc]) # expectation of log likelihood over all unlabeled docs expectation = expectation(alpha=1.0, a=p_c_d, b=lp_dc).trace() if self.print_log_lkh: print("\tExpected log likelihood = %0.3f" % expectation) if (expectation-self.log_lkh >= self.tol): prev_log_lkh = self.log_lkh self.log_lkh = expectation self.clf = deepcopy(clf) else: break self.feature_log_prob_ = self.clf.feature_log_prob_ self.coef_ = self.clf.coef_ return self def fit_with_clustering(self, X_l, y_l, X_u, y_u=None): """ Initialize the parameter using both labeled and unlabeled data. The classes of unlabeled data are assigned using similarity with labeled data. Assume unlabeled class as missing values, apply EM on unlabeled data to refine classifier. The label propagation can only use dense matrix, so it is quite time consuming. """ n_ul_docs = X_u.shape[0] # number of unlabeled samples n_l_docs = X_l.shape[0] # number of labeled samples # initialization (n_docs = n_ul_docs): # assign class to unlabeled data using similarity with labeled data if y_u is not given if (y_u==None): label_prop_model = LabelSpreading(kernel='rbf', max_iter=5, n_jobs=-1) y_u = np.array([-1.0]*n_ul_docs) X = vstack([X_l, X_u]) y = np.concatenate((y_l, y_u), axis=0) label_prop_model.fit(X.toarray(), y) y_u = label_prop_model.predict(X_u.toarray()) y = np.concatenate((y_l, y_u), axis=0) clf = deepcopy(self.clf)# build new copy of classifier clf.fit(X, y) # use labeled data only to initialize classifier parameters prev_log_lkh = self.log_lkh # record log likelihood of previous EM iteration lp_w_c = clf.feature_log_prob_ # log CP of word given class [n_classes, n_words] b_w_d = (X_u > 0) # words in each document [n_docs, n_words] lp_d_c = get_blas_funcs("gemm", [lp_w_c, b_w_d.T.toarray()]) # log CP of doc given class [n_classes, n_docs] lp_d_c = lp_d_c(alpha=1.0, a=lp_w_c, b=b_w_d.T.toarray()) lp_c = np.matrix(clf.class_log_prior_).T # log prob of classes [n_classes, 1] lp_c = np.repeat(lp_c, n_ul_docs, axis=1) # repeat for each doc [n_classes, n_docs] lp_dc = lp_d_c + lp_c # joint prob of doc and class [n_classes, n_docs] p_c_d = clf.predict_proba(X_u) # weight of each class in each doc [n_docs, n_classes] expectation = get_blas_funcs("gemm", [p_c_d, lp_dc]) # expectation of log likelihood over all unlabeled docs expectation = expectation(alpha=1.0, a=p_c_d, b=lp_dc).trace() self.clf = deepcopy(clf) self.log_lkh = expectation if self.print_log_lkh: print("Initial expected log likelihood = %0.3f\n" % expectation) # Loop until log likelihood does not improve iter_count = 0 # count EM iteration while (self.log_lkh-prev_log_lkh>=self.tol and iter_count<self.max_iter): # while (iter_count<self.max_iter): iter_count += 1 if self.print_log_lkh: print("EM iteration #%d" % iter_count) # debug # E-step: Estimate class membership of unlabeled documents y_u = clf.predict(X_u) # M-step: Re-estimate classifier parameters X = vstack([X_l, X_u]) y = np.concatenate((y_l, y_u), axis=0) clf.fit(X, y) # check convergence: update log likelihood p_c_d = clf.predict_proba(X_u) lp_w_c = clf.feature_log_prob_ # log CP of word given class [n_classes, n_words] b_w_d = (X_u > 0) # words in each document lp_d_c = get_blas_funcs("gemm", [lp_w_c, b_w_d.transpose().toarray()]) # log CP of doc given class [n_classes, n_docs] lp_d_c = lp_d_c(alpha=1.0, a=lp_w_c, b=b_w_d.transpose().toarray()) lp_c = np.matrix(clf.class_log_prior_).T # log prob of classes [n_classes, 1] lp_c = np.repeat(lp_c, n_ul_docs, axis=1) # repeat for each doc [n_classes, n_docs] lp_dc = lp_d_c + lp_c # joint prob of doc and class [n_classes, n_docs] expectation = get_blas_funcs("gemm", [p_c_d, lp_dc]) # expectation of log likelihood over all unlabeled docs expectation = expectation(alpha=1.0, a=p_c_d, b=lp_dc).trace() if self.print_log_lkh: print("\tExpected log likelihood = %0.3f" % expectation) if (expectation-self.log_lkh >= self.tol): prev_log_lkh = self.log_lkh self.log_lkh = expectation self.clf = deepcopy(clf) else: break self.feature_log_prob_ = self.clf.feature_log_prob_ self.coef_ = self.clf.coef_ return self def partial_fit(self, X_l, y_l, X_u=np.array([])): """ Initialize the parameter using labeled data only. Assume unlabeled class as missing values, apply EM on unlabeled data to refine classifier. This function can only be used after fit() """ n_ul_docs = X_u.shape[0] # number of unlabeled samples n_l_docs = X_l.shape[0] # number of labeled samples # initialization (n_docs = n_ul_docs) clf = deepcopy(self.clf)# build new copy of classifier clf.partial_fit(X_l, y_l) # use labeled data only to initialize classifier parameters prev_log_lkh = self.log_lkh # record log likelihood of previous EM iteration lp_w_c = clf.feature_log_prob_ # log CP of word given class [n_classes, n_words] b_w_d = (X_u > 0) # words in each document [n_docs, n_words] lp_d_c = get_blas_funcs("gemm", [lp_w_c, b_w_d.T.toarray()]) # log CP of doc given class [n_classes, n_docs] lp_d_c = lp_d_c(alpha=1.0, a=lp_w_c, b=b_w_d.T.toarray()) lp_c = np.matrix(clf.class_log_prior_).T # log prob of classes [n_classes, 1] lp_c = np.repeat(lp_c, n_ul_docs, axis=1) # repeat for each doc [n_classes, n_docs] lp_dc = lp_d_c + lp_c # joint prob of doc and class [n_classes, n_docs] p_c_d = clf.predict_proba(X_u) # weight of each class in each doc [n_docs, n_classes] expectation = get_blas_funcs("gemm", [p_c_d, lp_dc]) # expectation of log likelihood over all unlabeled docs expectation = expectation(alpha=1.0, a=p_c_d, b=lp_dc).trace() self.clf = deepcopy(clf) self.log_lkh = expectation print("Initial expected log likelihood = %0.3f\n" % expectation) # Loop until log likelihood does not improve iter_count = 0 # count EM iteration while (self.log_lkh-prev_log_lkh>=self.tol and iter_count<self.max_iter): # while (iter_count<self.max_iter): iter_count += 1 print("EM iteration #%d" % iter_count) # debug # E-step: Estimate class membership of unlabeled documents y_u = clf.predict(X_u) # M-step: Re-estimate classifier parameters X = vstack([X_l, X_u]) y = np.concatenate((y_l, y_u), axis=0) clf.partial_fit(X, y) # check convergence: update log likelihood p_c_d = clf.predict_proba(X_u) lp_w_c = clf.feature_log_prob_ # log CP of word given class [n_classes, n_words] b_w_d = (X_u > 0) # words in each document lp_d_c = get_blas_funcs("gemm", [lp_w_c, b_w_d.transpose().toarray()]) # log CP of doc given class [n_classes, n_docs] lp_d_c = lp_d_c(alpha=1.0, a=lp_w_c, b=b_w_d.transpose().toarray()) lp_c = np.matrix(clf.class_log_prior_).T # log prob of classes [n_classes, 1] lp_c = np.repeat(lp_c, n_ul_docs, axis=1) # repeat for each doc [n_classes, n_docs] lp_dc = lp_d_c + lp_c # joint prob of doc and class [n_classes, n_docs] expectation = get_blas_funcs("gemm", [p_c_d, lp_dc]) # expectation of log likelihood over all unlabeled docs expectation = expectation(alpha=1.0, a=p_c_d, b=lp_dc).trace() print("\tExpected log likelihood = %0.3f" % expectation) if (expectation-self.log_lkh >= self.tol): prev_log_lkh = self.log_lkh self.log_lkh = expectation self.clf = deepcopy(clf) else: break self.feature_log_prob_ = self.clf.feature_log_prob_ self.coef_ = self.clf.coef_ return self def predict(self, X): return self.clf.predict(X) def score(self, X, y): return self.clf.score(X, y) def get_params(deep=True): return self.clf.get_params(deep) def __str__(self): return self.clf.__str__()