New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

vecstack

Package Overview
Dependencies
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

vecstack - pypi Package Compare versions

Comparing version
0.4.0
to
0.5.0
+23
LICENSE.txt
MIT License
Vecstack. Python package for stacking (machine learning technique)
Copyright (c) 2016-2025 Igor Ivanov
Email: vecxoz@gmail.com
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
[build-system]
requires = ["setuptools >= 42.0.0"]
build-backend = "setuptools.build_meta"
[![PyPI version](https://img.shields.io/pypi/v/vecstack.svg?colorB=4cc61e)](https://pypi.python.org/pypi/vecstack)
[![PyPI license](https://img.shields.io/pypi/l/vecstack.svg)](https://github.com/vecxoz/vecstack/blob/master/LICENSE.txt)
[![Build status](https://github.com/vecxoz/vecstack/actions/workflows/actions.yaml/badge.svg?branch=master)](https://github.com/vecxoz/vecstack/actions)
[![Coverage Status](https://coveralls.io/repos/github/vecxoz/vecstack/badge.svg?branch=master)](https://coveralls.io/github/vecxoz/vecstack?branch=master)
[![PyPI pyversions](https://img.shields.io/pypi/pyversions/vecstack.svg)](https://pypi.python.org/pypi/vecstack/)
# vecstack
Python package for stacking (stacked generalization) featuring lightweight ***functional API*** and fully compatible ***scikit-learn API***
Convenient way to automate OOF computation, prediction and bagging using any number of models
* [Functional API](https://github.com/vecxoz/vecstack#usage-functional-api):
* Minimalistic. Get your stacked features in a single line
* RAM-friendly. The lowest possible memory consumption
* Kaggle-ready. Stacked features and hyperparameters from each run can be [automatically saved](https://github.com/vecxoz/vecstack/blob/master/vecstack/core.py#L210) in files. No more mess at the end of the competition. [Log example](https://github.com/vecxoz/vecstack/blob/master/examples/03_log_example.txt)
* [Scikit-learn API](https://github.com/vecxoz/vecstack#usage-scikit-learn-api):
* Standardized. Fully scikit-learn compatible transformer class exposing `fit` and `transform` methods
* Pipeline-certified. Implement and deploy [multilevel stacking](https://github.com/vecxoz/vecstack/blob/master/examples/04_sklearn_api_regression_pipeline.ipynb) like it's no big deal using `sklearn.pipeline.Pipeline`
* And of course `FeatureUnion` is also invited to the party
* Overall specs:
* Use any sklearn-like estimators
* Perform [classification and regression](https://github.com/vecxoz/vecstack/blob/master/vecstack/coresk.py#L85) tasks
* Predict [class labels or probabilities](https://github.com/vecxoz/vecstack/blob/master/vecstack/coresk.py#L121) in classification task
* Apply any [user-defined metric](https://github.com/vecxoz/vecstack/blob/master/vecstack/coresk.py#L126)
* Apply any [user-defined transformations](https://github.com/vecxoz/vecstack/blob/master/vecstack/coresk.py#L89) for target and prediction
* Python 3.9+, [unofficial support for Python 2.7 and 3.4](https://github.com/vecxoz/vecstack/blob/master/PY2.md)
* Win, Linux, Mac
* [MIT license](https://github.com/vecxoz/vecstack/blob/master/LICENSE.txt)
* Depends on **numpy**, **scipy**, **scikit-learn>=0.18**
# Get started
* [FAQ](https://github.com/vecxoz/vecstack#stacking-faq)
* [Installation guide](https://github.com/vecxoz/vecstack#installation)
* Usage:
* [Functional API](https://github.com/vecxoz/vecstack#usage-functional-api)
* [Scikit-learn API](https://github.com/vecxoz/vecstack#usage-scikit-learn-api)
* Tutorials:
* [Stacking concept + Pictures + Stacking implementation from scratch](https://github.com/vecxoz/vecstack/blob/master/examples/00_stacking_concept_pictures_code.ipynb)
* Examples (all examples are valid for both API with little [difference in parameters](https://github.com/vecxoz/vecstack#21-how-do-parameters-of-stacking-function-and-stackingtransformer-correspond)):
* Functional API:
* [Regression](https://github.com/vecxoz/vecstack/blob/master/examples/01_regression.ipynb)
* [Classification with class labels](https://github.com/vecxoz/vecstack/blob/master/examples/02_classification_with_class_labels.ipynb)
* [Classification with probabilities + Detailed workflow](https://github.com/vecxoz/vecstack/blob/master/examples/03_classification_with_proba_detailed_workflow.ipynb)
* Scikit-learn API:
* [Regression + Multilevel stacking using Pipeline](https://github.com/vecxoz/vecstack/blob/master/examples/04_sklearn_api_regression_pipeline.ipynb)
* Documentation:
* [Functional API](https://github.com/vecxoz/vecstack/blob/master/vecstack/core.py#L133) or type ```>>> help(stacking)```
* [Scikit-learn API](https://github.com/vecxoz/vecstack/blob/master/vecstack/coresk.py#L66) or type ```>>> help(StackingTransformer)```
# Installation
***Note:*** Python 3.9+ is officially supported and tested. If you’re still using Python 2.7 or 3.4 see [installation details here](https://github.com/vecxoz/vecstack/blob/master/PY2.md)
* ***Classic 1st time installation (recommended):***
* `pip install vecstack`
* Install for current user only (if you have some troubles with write permission):
* `pip install --user vecstack`
* If your PATH doesn't work:
* `/usr/bin/python -m pip install vecstack`
* `C:/Python3/python -m pip install vecstack`
* Upgrade vecstack and all dependencies:
* `pip install --upgrade vecstack`
* Upgrade vecstack WITHOUT upgrading dependencies:
* `pip install --upgrade --no-deps vecstack`
* Upgrade directly from GitHub WITHOUT upgrading dependencies:
* `pip install --upgrade --no-deps https://github.com/vecxoz/vecstack/archive/master.zip`
* Uninstall
* `pip uninstall vecstack`
# Usage. Functional API
```python
from vecstack import stacking
# Get your data
# Initialize 1st level estimators
models = [LinearRegression(),
Ridge(random_state=0)]
# Get your stacked features in a single line
S_train, S_test = stacking(models, X_train, y_train, X_test, regression=True, verbose=2)
# Use 2nd level estimator with stacked features
```
# Usage. Scikit-learn API
```python
from vecstack import StackingTransformer
# Get your data
# Initialize 1st level estimators
estimators = [('lr', LinearRegression()),
('ridge', Ridge(random_state=0))]
# Initialize StackingTransformer
stack = StackingTransformer(estimators, regression=True, verbose=2)
# Fit
stack = stack.fit(X_train, y_train)
# Get your stacked features
S_train = stack.transform(X_train)
S_test = stack.transform(X_test)
# Use 2nd level estimator with stacked features
```
# Stacking FAQ
1. [How can I report an issue? How can I ask a question about stacking or vecstack package?](https://github.com/vecxoz/vecstack#1-how-can-i-report-an-issue-how-can-i-ask-a-question-about-stacking-or-vecstack-package)
2. [How can I say thanks?](https://github.com/vecxoz/vecstack#2-how-can-i-say-thanks)
3. [How to cite vecstack?](https://github.com/vecxoz/vecstack#3-how-to-cite-vecstack)
4. [What is stacking?](https://github.com/vecxoz/vecstack#4-what-is-stacking)
5. [What about stacking name?](https://github.com/vecxoz/vecstack#5-what-about-stacking-name)
6. [Do I need stacking at all?](https://github.com/vecxoz/vecstack#6-do-i-need-stacking-at-all)
7. [Can you explain stacking (stacked generalization) in 10 lines of code?](https://github.com/vecxoz/vecstack#7-can-you-explain-stacking-stacked-generalization-in-10-lines-of-code)
8. [Why do I need complicated inner procedure for stacking?](https://github.com/vecxoz/vecstack#8-why-do-i-need-complicated-inner-procedure-for-stacking)
9. [I want to implement stacking (stacked generalization) from scratch. Can you help me?](https://github.com/vecxoz/vecstack#9-i-want-to-implement-stacking-stacked-generalization-from-scratch-can-you-help-me)
10. [What is OOF?](https://github.com/vecxoz/vecstack#10-what-is-oof)
11. [What are *estimator*, *learner*, *model*?](https://github.com/vecxoz/vecstack#11-what-are-estimator-learner-model)
12. [What is *blending*? How is it related to stacking?](https://github.com/vecxoz/vecstack#12-what-is-blending-how-is-it-related-to-stacking)
13. [How to optimize weights for weighted average?](https://github.com/vecxoz/vecstack#13-how-to-optimize-weights-for-weighted-average)
14. [What is better: weighted average for current level or additional level?](https://github.com/vecxoz/vecstack#14-what-is-better-weighted-average-for-current-level-or-additional-level)
15. [What is *bagging*? How is it related to stacking?](https://github.com/vecxoz/vecstack#15-what-is-bagging-how-is-it-related-to-stacking)
16. [How many models should I use on a given stacking level?](https://github.com/vecxoz/vecstack#16-how-many-models-should-i-use-on-a-given-stacking-level)
17. [How many stacking levels should I use?](https://github.com/vecxoz/vecstack#17-how-many-stacking-levels-should-i-use)
18. [How do I choose models for stacking?](https://github.com/vecxoz/vecstack#18-how-do-i-choose-models-for-stacking)
19. [I am trying hard but still can't beat my best single model with stacking. What is wrong?](https://github.com/vecxoz/vecstack#19-i-am-trying-hard-but-still-cant-beat-my-best-single-model-with-stacking-what-is-wrong)
20. [What should I choose: functional API (`stacking` function) or Scikit-learn API (`StackingTransformer`)?](https://github.com/vecxoz/vecstack#20-what-should-i-choose-functional-api-stacking-function-or-scikit-learn-api-stackingtransformer)
21. [How do parameters of `stacking` function and `StackingTransformer` correspond?](https://github.com/vecxoz/vecstack#21-how-do-parameters-of-stacking-function-and-stackingtransformer-correspond)
22. [Why Scikit-learn API was implemented as transformer and not predictor?](https://github.com/vecxoz/vecstack#22-why-scikit-learn-api-was-implemented-as-transformer-and-not-predictor)
23. [How to estimate stacking training time and number of models which will be built?](https://github.com/vecxoz/vecstack#23-how-to-estimate-stacking-training-time-and-number-of-models-which-will-be-built)
24. [Which stacking variant should I use: 'A' ('oof_pred_bag') or 'B' ('oof_pred')?](https://github.com/vecxoz/vecstack#24-which-stacking-variant-should-i-use-a-oof_pred_bag-or-b-oof_pred)
25. [How to choose number of folds?](https://github.com/vecxoz/vecstack#25-how-to-choose-number-of-folds)
26. [When I transform train set I see 'Train set was detected'. What does it mean?](https://github.com/vecxoz/vecstack#26-when-i-transform-train-set-i-see-train-set-was-detected-what-does-it-mean)
27. [How is the very first stacking level called: L0 or L1? Where does counting start?](https://github.com/vecxoz/vecstack#27-how-is-the-very-first-stacking-level-called-l0-or-l1-where-does-counting-start)
28. [Can I use `(Randomized)GridSearchCV` to tune the whole stacking Pipeline?](https://github.com/vecxoz/vecstack#28-can-i-use-randomizedgridsearchcv-to-tune-the-whole-stacking-pipeline)
29. [How to define custom metric, especially AUC?](https://github.com/vecxoz/vecstack#29-how-to-define-custom-metric-especially-auc)
30. [Do folds (splits) have to be the same across estimators and stacking levels? How does `random_state` work?](https://github.com/vecxoz/vecstack#30-do-folds-splits-have-to-be-the-same-across-estimators-and-stacking-levels-how-does-random_state-work)
31. [How does `vecstack.StackingTransformer` differ from `sklearn.ensemble.StackingClassifier`?](https://github.com/vecxoz/vecstack#31-how-does-vecstackstackingtransformer-differ-from-sklearnensemblestackingclassifier)
### 1. How can I report an issue? How can I ask a question about stacking or vecstack package?
Just open an issue [here](https://github.com/vecxoz/vecstack/issues).
Ask me anything on the topic.
I'm a bit busy, so typically I answer on the next day.
### 2. How can I say thanks?
Just give me a star in the top right corner of the repository page.
### 3. How to cite vecstack?
```
@misc{vecstack2016,
author = {Igor Ivanov},
title = {Vecstack},
year = {2016},
publisher = {GitHub},
howpublished = {\url{https://github.com/vecxoz/vecstack}},
}
```
### 4. What is stacking?
Stacking (stacked generalization) is a machine learning ensembling technique.
Main idea is to use predictions as features.
More specifically we predict train set (in CV-like fashion) and test set using some 1st level model(s), and then use these predictions as features for 2nd level model. You can find more details (concept, pictures, code) in [stacking tutorial](https://github.com/vecxoz/vecstack/blob/master/examples/00_stacking_concept_pictures_code.ipynb).
Also make sure to check out:
* [Ensemble Learning](https://en.wikipedia.org/wiki/Ensemble_learning) ([Stacking](https://en.wikipedia.org/wiki/Ensemble_learning#Stacking)) in Wikipedia
* Classical [Kaggle Ensembling Guide](https://mlwave.com/kaggle-ensembling-guide/) or try [another link](https://web.archive.org/web/20210727094233/https://mlwave.com/kaggle-ensembling-guide/)
* [Stacked Generalization](https://www.researchgate.net/publication/222467943_Stacked_Generalization) paper by David H. Wolpert
### 5. What about stacking name?
Often it is also called *stacked generalization*. The term is derived from the verb *to stack* (to put together, to put on top of each other). It implies that we put some models on top of other models, i.e. train some models on predictions of other models. From another point of view we can say that we stack predictions in order to use them as features.
### 6. Do I need stacking at all?
It depends on specific business case. The main thing to know about stacking is that it requires ***significant computing resources***. [No Free Lunch Theorem](https://en.wikipedia.org/wiki/There_ain%27t_no_such_thing_as_a_free_lunch) applies as always. Stacking can give you an improvement but for certain price (deployment, computation, maintenance). Only experiment for given business case will give you an answer: is it worth an effort and money.
At current point large part of stacking users are participants of machine learning competitions. On Kaggle you can't go too far without ensembling. I can secretly tell you that at least top half of leaderboard in pretty much any competition uses ensembling (stacking) in some way. Stacking is less popular in production due to time and resource constraints, but I think it gains popularity.
### 7. Can you explain stacking (stacked generalization) in 10 lines of code?
[Of course](https://github.com/vecxoz/vecstack/blob/master/examples/00_stacking_concept_pictures_code.ipynb)
### 8. Why do I need complicated inner procedure for stacking?
I can just do the following. Why not?
```python
model_L1 = XGBRegressor()
model_L1 = model_L1.fit(X_train, y_train)
S_train = model_L1.predict(X_train).reshape(-1, 1) # <- DOES NOT work due to overfitting. Must be CV
S_test = model_L1.predict(X_test).reshape(-1, 1)
model_L2 = LinearRegression()
model_L2 = model_L2.fit(S_train, y_train)
final_prediction = model_L2.predict(S_test)
```
Code above will give meaningless result. If we fit on `X_train` we can’t just predict `X_train`, because our 1st level model has already seen `X_train`, and its prediction will be overfitted. To avoid overfitting we perform cross-validation procedure and in each fold we predict out-of-fold (OOF) part of `X_train`. You can find more details (concept, pictures, code) in [stacking tutorial](https://github.com/vecxoz/vecstack/blob/master/examples/00_stacking_concept_pictures_code.ipynb).
### 9. I want to implement stacking (stacked generalization) from scratch. Can you help me?
[Not a problem](https://github.com/vecxoz/vecstack/blob/master/examples/00_stacking_concept_pictures_code.ipynb)
### 10. What is OOF?
OOF is abbreviation for out-of-fold prediction. It's also known as *OOF features*, *stacked features*, *stacking features*, etc. Basically it means predictions for the part of train data that model haven't seen during training.
### 11. What are *estimator*, *learner*, *model*?
Basically it is the same thing meaning *machine learning algorithm*. Often these terms are used interchangeably.
Speaking about inner stacking mechanics, you should remember that when you have *single 1st level model* there will be at least `n_folds` separate models *trained in each CV fold* on different subsets of data. See [Q23](https://github.com/vecxoz/vecstack#23-how-to-estimate-stacking-training-time-and-number-of-models-which-will-be-built) for more details.
### 12. What is *blending*? How is it related to stacking?
Basically it is the same thing. Both approaches use predictions as features.
Often these terms are used interchangeably.
The difference is how we generate features (predictions) for the next level:
* *stacking*: perform cross-validation procedure and predict each part of train set (OOF)
* *blending*: predict fixed holdout set
*vecstack* package supports only *stacking* i.e. cross-validation approach. For given `random_state` value (e.g. 42) folds (splits) will be the same across all estimators. See also [Q30](https://github.com/vecxoz/vecstack#30-do-folds-splits-have-to-be-the-same-across-estimators-and-stacking-levels-how-does-random_state-work).
### 13. How to optimize weights for weighted average?
You can use for example:
* `scipy.optimize.minimize`
* `scipy.optimize.differential_evolution`
### 14. What is better: weighted average for current level or additional level?
By default you can start from weighted average. It is easier to apply and more chances that it will give good result. Then you can try additional level which potentially can outperform weighted average (but not always and not in an easy way). Experiment is your friend.
### 15. What is *bagging*? How is it related to stacking?
[Bagging](https://en.wikipedia.org/wiki/Bootstrap_aggregating) or Bootstrap aggregating works as follows: generate subsets of training set, train models on these subsets and then find average of predictions.
Also term *bagging* is often used to describe following approaches:
* train several different models on the same data and average predictions
* train same model with different random seeds on the same data and average predictions
So if we run stacking and just average predictions - it is *bagging*.
### 16. How many models should I use on a given stacking level?
***Note 1:*** The best architecture can be found only by experiment.
***Note 2:*** Always remember that higher number of levels or models does NOT guarantee better result. The key to success in stacking (and ensembling in general) is diversity - low correlation between models.
It depends on many factors like type of problem, type of data, quality of models, correlation of models, expected result, etc.
Some example configurations are listed below.
* Reasonable starting point:
* `L1: 2-10 models -> L2: weighted (rank) average or single model`
* Then try to add more 1st level models and additional level:
* `L1: 10-50 models -> L2: 2-10 models -> L3: weighted (rank) average`
* If you're crunching numbers at Kaggle and decided to go wild:
* `L1: 100-inf models -> L2: 10-50 models -> L3: 2-10 models -> L4: weighted (rank) average`
You can also find some winning stacking architectures on [Kaggle blog](http://blog.kaggle.com/), e.g.: [1st place in Homesite Quote Conversion](http://blog.kaggle.com/2016/04/08/homesite-quote-conversion-winners-write-up-1st-place-kazanova-faron-clobber/).
### 17. How many stacking levels should I use?
***Note 1:*** The best architecture can be found only by experiment.
***Note 2:*** Always remember that higher number of levels or models does NOT guarantee better result. The key to success in stacking (and ensembling in general) is diversity - low correlation between models.
For some example configurations see [Q16](https://github.com/vecxoz/vecstack#16-how-many-models-should-i-use-on-a-given-stacking-level).
### 18. How do I choose models for stacking?
Based on experiments and correlation (e.g. Pearson). Less correlated models give better result. It means that we should never judge our models by accuracy only. We should also consider correlation (how given model is different from others). Sometimes inaccurate but very different model can add substantial value to resulting ensemble.
### 19. I am trying hard but still can't beat my best single model with stacking. What is wrong?
Nothing is wrong. Stacking is advanced complicated technique. It's hard to make it work. ***Solution:*** make sure to try weighted (rank) average first instead of additional level with some advanced models. Average is much easier to apply and in most cases it will surely outperform your best model. If still no luck - then probably your models are highly correlated.
### 20. What should I choose: functional API (`stacking` function) or Scikit-learn API (`StackingTransformer`)?
Quick guide:
* By default start from `StackingTransformer` with familiar scikit-learn interface and logic
* If you need low RAM consumption try `stacking` function but remember that it does not store models and does not have scikit-learn capabilities
Stacking API comparison:
| **Property** | **stacking function** | **StackingTransformer** |
|----------------|:---------------------:|:-----------------------:|
| Execution time | Same | Same |
| RAM | Consumes the ***smallest possible amount of RAM***. Does not store models. At any point in time only one model is alive. Logic: train model -> predict -> delete -> etc. When execution ends all RAM is released.| Consumes ***much more RAM***. It stores all models built in each fold. This price is paid for standard scikit-learn capabilities like `Pipeline` and `FeatureUnion`. |
| Access to models after training | No | Yes |
| Compatibility with `Pipeline` and `FeatureUnion` | No | Yes |
| Estimator implementation restrictions | Must have only `fit` and `predict` (`predict_proba`) methods | Must be fully scikit-learn compatible |
| `NaN` and `inf` in input data | Allowed | Not allowed |
| Can automatically save OOF and log in files | Yes | No |
| Input dimensionality (`X_train`, `X_test`) | Arbitrary | 2-D |
### 21. How do parameters of `stacking` function and `StackingTransformer` correspond?
| **stacking function** | **StackingTransformer** |
|---------------------------------------|-----------------------------------|
| `models=[Ridge()]` | `estimators=[('ridge', Ridge())]` |
| `mode='oof_pred_bag'` (alias `'A'`) | `variant='A'` |
| `mode='oof_pred'` (alias `'B'`) | `variant='B'` |
### 22. Why Scikit-learn API was implemented as transformer and not predictor?
* By nature stacking procedure is predictor, but ***by application*** it is definitely transformer.
* Transformer architecture was chosen because first of all user needs direct access to OOF. I.e. the ability to compute correlations, weighted average, etc.
* If you need predictor based on `StackingTransformer` you can easily create it via `Pipeline` by adding on the top of `StackingTransformer` some regressor or classifier.
* Transformer makes it easy to create any number of stacking levels. Using Pipeline we can easily create multilevel stacking by just adding several `StackingTransformer`'s on top of each other and then some final regressor or classifier.
### 23. How to estimate stacking training time and number of models which will be built?
***Note:*** Stacking usually takes long time. It's expected (inevitable) behavior.
We can compute total number of models which will be built during stacking procedure using following formulas:
* Variant A: `n_models_total = n_estimators * n_folds`
* Variant B: `n_models_total = n_estimators * n_folds + n_estimators`
Let's look at example. Say we define our stacking procedure as follows:
```python
estimators_L1 = [('lr', LinearRegression()),
('ridge', Ridge())]
stack = StackingTransformer(estimators_L1, n_folds=4)
```
So we have two 1st level estimators and 4 folds. It means stacking procedure will build the following number of models:
* Variant A: 8 models total. Each model is trained on 3/4 of `X_train`.
* Variant B: 10 models total. 8 models are trained on 3/4 of `X_train` and 2 models on full `X_train`.
Compute time:
* If estimators have relatively *similar training time*, we can roughly compute total training time as: `time_total = n_models_total * time_of_one_model`
* If estimators have *different training time*, we should compute number of models and time for each estimator separately (set `n_estimators=1` in formulas above) and then sum up times.
### 24. Which stacking variant should I use: 'A' ('oof_pred_bag') or 'B' ('oof_pred')?
You can find out only by experiment. Default choice is variant ***A***, because it takes ***less time*** and there should be no significant difference in result. But of course you may also try variant B. For more details see [stacking tutorial](https://github.com/vecxoz/vecstack/blob/master/examples/00_stacking_concept_pictures_code.ipynb).
### 25. How to choose number of folds?
***Note:*** Remember that higher number of folds substantially increase training time (and RAM consumption for StackingTransformer). See [Q23](https://github.com/vecxoz/vecstack#23-how-to-estimate-stacking-training-time-and-number-of-models-which-will-be-built).
* Standard approach: 4 or 5 folds.
* If data is big: 3 folds.
* If data is small: you can try more folds like 10 or so.
### 26. When I transform train set I see 'Train set was detected'. What does it mean?
***Note 1:*** It is ***NOT allowed to change train set*** between calls to `fit` and `transform` methods. Due to stacking nature transformation is different for train set and any other set. If train set is changed after training, stacking procedure will not be able to correctly identify it and transformation will be wrong.
***Note 2:*** To be correctly detected train set does not necessarily have to be identical (exactly the same). It must have the same shape and all values must be *close* (`np.isclose` is used for checking). So if you somehow regenerate your train set you should not worry about numerical precision.
If you transform `X_train` and see 'Train set was detected' everything is OK. If you transform `X_train` but you don't see this message then something went wrong. Probably your train set was changed (it is not allowed). In this case you have to retrain `StackingTransformer`. For more details see [stacking tutorial](https://github.com/vecxoz/vecstack/blob/master/examples/00_stacking_concept_pictures_code.ipynb) or [Q8](https://github.com/vecxoz/vecstack#8-why-do-i-need-complicated-inner-procedure-for-stacking).
### 27. How is the very first stacking level called: L0 or L1? Where does counting start?
***Common convention:*** The very first bunch of models which are trained on initial raw data are called ***L1***. On top of L1 we have so called *stacker level* or *meta level* or L2 i.e. models which are trained on predictions of L1 models. Count continues in the same fashion up to arbitrary number of levels.
I use this convention in my code and docs. But of course your Kaggle teammates may use some other naming approach, so you should clarify this for your specific case.
### 28. Can I use `(Randomized)GridSearchCV` to tune the whole stacking Pipeline?
Yes, technically you can, but it ***is not recommended*** because this approach will lead to redundant computations. General practical advice is to ***tune each estimator separately*** and then use tuned estimators on the 1st level of stacking. Higher level estimators should be tuned in the same fashion using OOF from previous level. For manual tuning you can use `stacking` function or `StackingTransformer` with a single 1st level estimator.
### 29. How to define custom metric, especially AUC?
```python
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import OneHotEncoder
def auc(y_true, y_pred):
"""ROC AUC metric for both binary and multiclass classification.
Parameters
----------
y_true : 1d numpy array
True class labels
y_pred : 2d numpy array
Predicted probabilities for each class
"""
ohe = OneHotEncoder(sparse=False)
y_true = ohe.fit_transform(y_true.reshape(-1, 1))
auc_score = roc_auc_score(y_true, y_pred)
return auc_score
```
### 30. Do folds (splits) have to be the same across estimators and stacking levels? How does `random_state` work?
To ensure better result, folds (splits) have to be the same across all estimators and all stacking levels. It means that `random_state` has to be the same in every call to `stacking` function or `StackingTransformer`. This is default behavior of `stacking` function and `StackingTransformer` (by default `random_state=0`). If you want to try different folds (splits) try to set different `random_state` values.
### 31. How does `vecstack.StackingTransformer` differ from `sklearn.ensemble.StackingClassifier`?
It significantly differs. Please see a [detailed explanation](https://github.com/vecxoz/vecstack/issues/37).
# Stacking concept
1. We want to predict train set and test set with some 1st level model(s), and then use these predictions as features for 2nd level model(s).
2. Any model can be used as 1st level model or 2nd level model.
3. To avoid overfitting (for train set) we use cross-validation technique and in each fold we predict out-of-fold (OOF) part of train set.
4. The common practice is to use from 3 to 10 folds.
5. Predict test set:
* **Variant A:** In each fold we predict test set, so after completion of all folds we need to find mean (mode) of all temporary test set predictions made in each fold.
* **Variant B:** We do not predict test set during cross-validation cycle. After completion of all folds we perform additional step: fit model on full train set and predict test set once. This approach takes more time because we need to perform one additional fitting.
6. As an example we look at stacking implemented with single 1st level model and 3-fold cross-validation.
7. Pictures:
* **Variant A:** Three pictures describe three folds of cross-validation. After completion of all three folds we get single train feature and single test feature to use with 2nd level model.
* **Variant B:** First three pictures describe three folds of cross-validation (like in Variant A) to get single train feature and fourth picture describes additional step to get single test feature.
8. We can repeat this cycle using other 1st level models to get more features for 2nd level model.
9. You can also look at animation of [Variant A](https://github.com/vecxoz/vecstack#variant-a-animation) and [Variant B](https://github.com/vecxoz/vecstack#variant-b-animation).
# Variant A
![Fold 1 of 3](https://github.com/vecxoz/vecstack/raw/master/pic/dia1.png "Fold 1 of 3")
***
![Fold 2 of 3](https://github.com/vecxoz/vecstack/raw/master/pic/dia2.png "Fold 2 of 3")
***
![Fold 3 of 3](https://github.com/vecxoz/vecstack/raw/master/pic/dia3.png "Fold 3 of 3")
# Variant A. Animation
![Variant A. Animation](https://github.com/vecxoz/vecstack/raw/master/pic/animation1.gif "Variant A. Animation")
# Variant B
![Step 1 of 4](https://github.com/vecxoz/vecstack/raw/master/pic/dia4.png "Step 1 of 4")
***
![Step 2 of 4](https://github.com/vecxoz/vecstack/raw/master/pic/dia5.png "Step 2 of 4")
***
![Step 3 of 4](https://github.com/vecxoz/vecstack/raw/master/pic/dia6.png "Step 3 of 4")
***
![Step 4 of 4](https://github.com/vecxoz/vecstack/raw/master/pic/dia7.png "Step 4 of 4")
# Variant B. Animation
![Variant B. Animation](https://github.com/vecxoz/vecstack/raw/master/pic/animation2.gif "Variant B. Animation")
# References
* [Ensemble Learning](https://en.wikipedia.org/wiki/Ensemble_learning) ([Stacking](https://en.wikipedia.org/wiki/Ensemble_learning#Stacking)) in Wikipedia
* Classical [Kaggle Ensembling Guide](https://mlwave.com/kaggle-ensembling-guide/) or try [another link](https://web.archive.org/web/20210727094233/https://mlwave.com/kaggle-ensembling-guide/)
* [Stacked Generalization](https://www.researchgate.net/publication/222467943_Stacked_Generalization) paper by David H. Wolpert
#-------------------------------------------------------------------------------
# Exactly the same as multiclass but ``n_classes=2``
# and class name is ``TestFuncClassificationBinary``
#-------------------------------------------------------------------------------
# !!! cross_val_predict uses stratified split
#-------------------------------------------------------------------------------
# Main concept for testing returned arrays:
# 1). create ground truth e.g. with cross_val_predict
# 2). run vecstack
# 3). compare returned arrays with ground truth
# 4). compare arrays from file with ground truth
#-------------------------------------------------------------------------------
from __future__ import print_function
from __future__ import division
import unittest
from numpy.testing import assert_array_equal
# from numpy.testing import assert_allclose
from numpy.testing import assert_equal
import os
import glob
import numpy as np
import scipy.stats as st
from sklearn.model_selection import cross_val_predict
from sklearn.model_selection import cross_val_score
# from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.metrics import log_loss
from sklearn.metrics import make_scorer
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from vecstack import stacking
from sklearn.multiclass import OneVsRestClassifier
n_classes = 2
n_folds = 5
temp_dir = 'tmpdw35lg54ms80eb42'
X, y = make_classification(n_samples = 500, n_features = 5, n_informative = 3, n_redundant = 1,
n_classes = n_classes, flip_y = 0, random_state = 0)
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)
# Make train/test split by hand to avoid strange errors probably related to testing suit:
# https://github.com/scikit-learn/scikit-learn/issues/1684
# https://github.com/scikit-learn/scikit-learn/issues/1704
# Note: Python 2.7, 3.4 - OK, but 3.5, 3.6 - error
np.random.seed(0)
ind = np.arange(500)
np.random.shuffle(ind)
ind_train = ind[:400]
ind_test = ind[400:]
X_train = X[ind_train]
X_test = X[ind_test]
y_train = y[ind_train]
y_test = y[ind_test]
# Create 4-dim data
np.random.seed(42)
X_train_4d = np.random.normal(size=(400, 8, 8, 3))
X_test_4d = np.random.normal(size=(100, 8, 8, 3))
y_train_4d = np.random.randint(n_classes, size=400)
# Reshape 4-dim to 2-dim
X_train_4d_unrolled = X_train_4d.reshape(X_train_4d.shape[0], -1)
X_test_4d_unrolled = X_test_4d.reshape(X_test_4d.shape[0], -1)
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
class LogisticRegressionUnrolled(LogisticRegression):
"""
For tests related to N-dim input.
Estimator accepts N-dim array and reshape it to 2-dim array
"""
def fit(self, X, y):
return super(LogisticRegressionUnrolled, self).fit(X.reshape(X.shape[0], -1), y)
def predict(self, X):
return super(LogisticRegressionUnrolled, self).predict(X.reshape(X.shape[0], -1))
def predict_proba(self, X):
return super(LogisticRegressionUnrolled, self).predict_proba(X.reshape(X.shape[0], -1))
class OneVsRestClassifierUnrolled(OneVsRestClassifier):
"""
Just to avoid data shape checks
"""
def fit(self, X, y):
return super(OneVsRestClassifierUnrolled, self).fit(X.reshape(X.shape[0], -1), y)
def predict(self, X):
return super(OneVsRestClassifierUnrolled, self).predict(X.reshape(X.shape[0], -1))
def predict_proba(self, X):
return super(OneVsRestClassifierUnrolled, self).predict_proba(X.reshape(X.shape[0], -1))
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
class TestFuncClassificationBinary(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
os.mkdir(temp_dir)
except:
print('Unable to create temp dir')
@classmethod
def tearDownClass(cls):
try:
os.rmdir(temp_dir)
except:
print('Unable to remove temp dir')
def tearDown(self):
# Remove files after each test
files = glob.glob(os.path.join(temp_dir, '*.npy'))
files.extend(glob.glob(os.path.join(temp_dir, '*.log.txt')))
try:
for file in files:
os.remove(file)
except:
print('Unable to remove temp file')
#---------------------------------------------------------------------------
# Test returned and saved arrays in each mode (parameter <mode>)
# Here we also test parameter <stratified>
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
# Predict labels
#---------------------------------------------------------------------------
def test_oof_pred_mode(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_mode(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
S_test_1 = None
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_pred_mode(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = None
_ = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'pred', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_bag_mode(self):
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_pred_bag_mode(self):
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
S_train_1 = None
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'pred_bag', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Predict proba
#---------------------------------------------------------------------------
def test_oof_pred_mode_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
_ = model.fit(X_train, y_train)
S_test_1 = model.predict_proba(X_test)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, stratified = True,
mode = 'oof_pred', random_state = 0, verbose = 0, needs_proba = True, save_dir=temp_dir)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_mode_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
S_test_1 = None
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, stratified = True,
mode = 'oof', random_state = 0, verbose = 0, needs_proba = True, save_dir=temp_dir)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_pred_mode_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = None
_ = model.fit(X_train, y_train)
S_test_1 = model.predict_proba(X_test)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, stratified = True,
mode = 'pred', random_state = 0, verbose = 0, needs_proba = True, save_dir=temp_dir)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_bag_mode_proba(self):
S_test_1 = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis = 1)
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True, needs_proba = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
#@@@@
# Look at proba
# print('\nOne model')
# print('etalon')
# print(S_test_1[:2])
# print('vecstack')
# print(S_test_2[:2])
#@@@@
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_pred_bag_mode_proba(self):
S_test_1 = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis = 1)
S_train_1 = None
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'pred_bag', random_state = 0, verbose = 0, stratified = True, needs_proba = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Test <shuffle> and <random_state> parameters
#---------------------------------------------------------------------------
def test_oof_pred_bag_mode_shuffle(self):
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = True, random_state = 0)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
# !!! Important. Here we pass CV-generator not number of folds <cv = kf>
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = kf,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = True, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Test <metric> parameter and its default values depending on <regression> parameter
# Labels
# Important. We use <greater_is_better = True> in <make_scorer> for any error function
# because we need raw scores (without minus sign)
#---------------------------------------------------------------------------
def test_oof_mode_metric(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(accuracy_score)
scores = cross_val_score(model, X_train, y = y_train, cv = n_folds,
scoring = scorer, n_jobs = 1, verbose = 0)
mean_str_1 = '%.8f' % np.mean(scores)
std_str_1 = '%.8f' % np.std(scores)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train, S_test = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, save_dir=temp_dir,
mode = 'oof', random_state = 0, verbose = 0, stratified = True)
# Load mean score and std from file
# Normally if cleaning is performed there is only one .log.txt file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.log.txt')))[-1] # take the latest file
with open(file_name) as f:
for line in f:
if 'MEAN' in line:
split = line.strip().split()
break
mean_str_2 = split[1][1:-1]
std_str_2 = split[3][1:-1]
assert_equal(mean_str_1, mean_str_2)
assert_equal(std_str_1, std_str_2)
#---------------------------------------------------------------------------
# Test <metric> parameter and its default values depending on <regression> parameter
# Proba
# Important. We use <greater_is_better = True> in <make_scorer> for any error function
# because we need raw scores (without minus sign)
#---------------------------------------------------------------------------
def test_oof_mode_metric_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(log_loss, response_method='predict_proba')
scores = cross_val_score(model, X_train, y = y_train, cv = n_folds,
scoring = scorer, n_jobs = 1, verbose = 0)
mean_str_1 = '%.8f' % np.mean(scores)
std_str_1 = '%.8f' % np.std(scores)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train, S_test = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, save_dir=temp_dir,
mode = 'oof', random_state = 0, verbose = 0, stratified = True,
needs_proba = True)
# Load mean score and std from file
# Normally if cleaning is performed there is only one .log.txt file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.log.txt')))[-1] # take the latest file
with open(file_name) as f:
for line in f:
if 'MEAN' in line:
split = line.strip().split()
break
mean_str_2 = split[1][1:-1]
std_str_2 = split[3][1:-1]
assert_equal(mean_str_1, mean_str_2)
assert_equal(std_str_1, std_str_2)
#-------------------------------------------------------------------------------
# Test several mdels in one run
#-------------------------------------------------------------------------------
def test_oof_pred_mode_2_models(self):
# Model a
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_a = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1_a = model.predict(X_test).reshape(-1, 1)
# Model b
model = GaussianNB()
S_train_1_b = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1_b = model.predict(X_test).reshape(-1, 1)
S_train_1 = np.c_[S_train_1_a, S_train_1_b]
S_test_1 = np.c_[S_test_1_a, S_test_1_b]
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')),
GaussianNB()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_bag_mode_2_models(self):
# Model a
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1_a = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_a = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
# Model b
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = GaussianNB()
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1_b = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
model = GaussianNB()
S_train_1_b = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
S_train_1 = np.c_[S_train_1_a, S_train_1_b]
S_test_1 = np.c_[S_test_1_a, S_test_1_b]
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')),
GaussianNB()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_mode_proba_2_models(self):
# Model a
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_a = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
_ = model.fit(X_train, y_train)
S_test_1_a = model.predict_proba(X_test)
# Model b
model = GaussianNB()
S_train_1_b = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
_ = model.fit(X_train, y_train)
S_test_1_b = model.predict_proba(X_test)
S_train_1 = np.c_[S_train_1_a, S_train_1_b]
S_test_1 = np.c_[S_test_1_a, S_test_1_b]
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')),
GaussianNB()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, stratified = True,
mode = 'oof_pred', random_state = 0, verbose = 0, needs_proba = True, save_dir=temp_dir)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_bag_mode_proba_2_models(self):
# Model a
S_test_1_a = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1_a[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis = 1)
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_a = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
# Model b
S_test_1_b = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = GaussianNB()
_ = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1_b[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis = 1)
model = GaussianNB()
S_train_1_b = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
S_train_1 = np.c_[S_train_1_a, S_train_1_b]
S_test_1 = np.c_[S_test_1_a, S_test_1_b]
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')),
GaussianNB()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True, needs_proba = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
#@@@@
# Look at proba
# print('\nTwo models')
# print('etalon')
# print(S_test_1[:2])
# print('vecstack')
# print(S_test_2[:2])
#@@@@
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_N_dim_input(self):
"""
This is `test_oof_pred_bag_mode` function with `LogisticRegressionUnrolled` estimator
"""
S_test_temp = np.zeros((X_test_4d_unrolled.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train_4d_unrolled, y_train_4d)):
# Split data and target
X_tr = X_train_4d_unrolled[tr_index]
y_tr = y_train_4d[tr_index]
X_te = X_train_4d_unrolled[te_index]
y_te = y_train_4d[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test_4d_unrolled)
S_test_1 = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train_4d_unrolled, y = y_train_4d, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
models = [OneVsRestClassifierUnrolled(LogisticRegressionUnrolled(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train_4d, y_train_4d, X_test_4d,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# !!! cross_val_predict uses stratified split
#-------------------------------------------------------------------------------
# Main concept for testing returned arrays:
# 1). create ground truth e.g. with cross_val_predict
# 2). run vecstack
# 3). compare returned arrays with ground truth
# 4). compare arrays from file with ground truth
#-------------------------------------------------------------------------------
from __future__ import print_function
from __future__ import division
import unittest
from numpy.testing import assert_array_equal
# from numpy.testing import assert_allclose
from numpy.testing import assert_equal
import os
import glob
import numpy as np
import scipy.stats as st
from sklearn.model_selection import cross_val_predict
from sklearn.model_selection import cross_val_score
# from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.metrics import log_loss
from sklearn.metrics import make_scorer
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from vecstack import stacking
from sklearn.multiclass import OneVsRestClassifier
n_classes = 3
n_folds = 5
temp_dir = 'tmpdw35lg54ms80eb42'
X, y = make_classification(n_samples = 500, n_features = 5, n_informative = 3, n_redundant = 1,
n_classes = n_classes, flip_y = 0, random_state = 0)
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)
# Make train/test split by hand to avoid strange errors probably related to testing suit:
# https://github.com/scikit-learn/scikit-learn/issues/1684
# https://github.com/scikit-learn/scikit-learn/issues/1704
# Note: Python 2.7, 3.4 - OK, but 3.5, 3.6 - error
np.random.seed(0)
ind = np.arange(500)
np.random.shuffle(ind)
ind_train = ind[:400]
ind_test = ind[400:]
X_train = X[ind_train]
X_test = X[ind_test]
y_train = y[ind_train]
y_test = y[ind_test]
# Create 4-dim data
np.random.seed(42)
X_train_4d = np.random.normal(size=(400, 8, 8, 3))
X_test_4d = np.random.normal(size=(100, 8, 8, 3))
y_train_4d = np.random.randint(n_classes, size=400)
# Reshape 4-dim to 2-dim
X_train_4d_unrolled = X_train_4d.reshape(X_train_4d.shape[0], -1)
X_test_4d_unrolled = X_test_4d.reshape(X_test_4d.shape[0], -1)
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
class LogisticRegressionUnrolled(LogisticRegression):
"""
For tests related to N-dim input.
Estimator accepts N-dim array and reshape it to 2-dim array
"""
def fit(self, X, y):
return super(LogisticRegressionUnrolled, self).fit(X.reshape(X.shape[0], -1), y)
def predict(self, X):
return super(LogisticRegressionUnrolled, self).predict(X.reshape(X.shape[0], -1))
def predict_proba(self, X):
return super(LogisticRegressionUnrolled, self).predict_proba(X.reshape(X.shape[0], -1))
class OneVsRestClassifierUnrolled(OneVsRestClassifier):
"""
Just to avoid data shape checks
"""
def fit(self, X, y):
return super(OneVsRestClassifierUnrolled, self).fit(X.reshape(X.shape[0], -1), y)
def predict(self, X):
return super(OneVsRestClassifierUnrolled, self).predict(X.reshape(X.shape[0], -1))
def predict_proba(self, X):
return super(OneVsRestClassifierUnrolled, self).predict_proba(X.reshape(X.shape[0], -1))
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
class TestFuncClassificationMulticlass(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
os.mkdir(temp_dir)
except:
print('Unable to create temp dir')
@classmethod
def tearDownClass(cls):
try:
os.rmdir(temp_dir)
except:
print('Unable to remove temp dir')
def tearDown(self):
# Remove files after each test
files = glob.glob(os.path.join(temp_dir, '*.npy'))
files.extend(glob.glob(os.path.join(temp_dir, '*.log.txt')))
try:
for file in files:
os.remove(file)
except:
print('Unable to remove temp file')
#---------------------------------------------------------------------------
# Test returned and saved arrays in each mode (parameter <mode>)
# Here we also test parameter <stratified>
#---------------------------------------------------------------------------
#---------------------------------------------------------------------------
# Predict labels
#---------------------------------------------------------------------------
def test_oof_pred_mode(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_mode(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
S_test_1 = None
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_pred_mode(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = None
_ = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'pred', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_bag_mode(self):
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_pred_bag_mode(self):
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
S_train_1 = None
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'pred_bag', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Predict proba
#---------------------------------------------------------------------------
def test_oof_pred_mode_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
_ = model.fit(X_train, y_train)
S_test_1 = model.predict_proba(X_test)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, stratified = True,
mode = 'oof_pred', random_state = 0, verbose = 0, needs_proba = True, save_dir=temp_dir)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_mode_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
S_test_1 = None
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, stratified = True,
mode = 'oof', random_state = 0, verbose = 0, needs_proba = True, save_dir=temp_dir)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_pred_mode_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = None
_ = model.fit(X_train, y_train)
S_test_1 = model.predict_proba(X_test)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, stratified = True,
mode = 'pred', random_state = 0, verbose = 0, needs_proba = True, save_dir=temp_dir)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_bag_mode_proba(self):
S_test_1 = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis = 1)
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True, needs_proba = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
#@@@@
# Look at proba
# print('\nOne model')
# print('etalon')
# print(S_test_1[:2])
# print('vecstack')
# print(S_test_2[:2])
#@@@@
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_pred_bag_mode_proba(self):
S_test_1 = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis = 1)
S_train_1 = None
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'pred_bag', random_state = 0, verbose = 0, stratified = True, needs_proba = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Test <shuffle> and <random_state> parameters
#---------------------------------------------------------------------------
def test_oof_pred_bag_mode_shuffle(self):
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = True, random_state = 0)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
# !!! Important. Here we pass CV-generator not number of folds <cv = kf>
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = kf,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = True, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Test <metric> parameter and its default values depending on <regression> parameter
# Labels
# Important. We use <greater_is_better = True> in <make_scorer> for any error function
# because we need raw scores (without minus sign)
#---------------------------------------------------------------------------
def test_oof_mode_metric(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(accuracy_score)
scores = cross_val_score(model, X_train, y = y_train, cv = n_folds,
scoring = scorer, n_jobs = 1, verbose = 0)
mean_str_1 = '%.8f' % np.mean(scores)
std_str_1 = '%.8f' % np.std(scores)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train, S_test = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, save_dir=temp_dir,
mode = 'oof', random_state = 0, verbose = 0, stratified = True)
# Load mean score and std from file
# Normally if cleaning is performed there is only one .log.txt file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.log.txt')))[-1] # take the latest file
with open(file_name) as f:
for line in f:
if 'MEAN' in line:
split = line.strip().split()
break
mean_str_2 = split[1][1:-1]
std_str_2 = split[3][1:-1]
assert_equal(mean_str_1, mean_str_2)
assert_equal(std_str_1, std_str_2)
#---------------------------------------------------------------------------
# Test <metric> parameter and its default values depending on <regression> parameter
# Proba
# Important. We use <greater_is_better = True> in <make_scorer> for any error function
# because we need raw scores (without minus sign)
#---------------------------------------------------------------------------
def test_oof_mode_metric_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(log_loss, response_method='predict_proba')
scores = cross_val_score(model, X_train, y = y_train, cv = n_folds,
scoring = scorer, n_jobs = 1, verbose = 0)
mean_str_1 = '%.8f' % np.mean(scores)
std_str_1 = '%.8f' % np.std(scores)
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))]
S_train, S_test = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, save_dir=temp_dir,
mode = 'oof', random_state = 0, verbose = 0, stratified = True,
needs_proba = True)
# Load mean score and std from file
# Normally if cleaning is performed there is only one .log.txt file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.log.txt')))[-1] # take the latest file
with open(file_name) as f:
for line in f:
if 'MEAN' in line:
split = line.strip().split()
break
mean_str_2 = split[1][1:-1]
std_str_2 = split[3][1:-1]
assert_equal(mean_str_1, mean_str_2)
assert_equal(std_str_1, std_str_2)
#-------------------------------------------------------------------------------
# Test several mdels in one run
#-------------------------------------------------------------------------------
def test_oof_pred_mode_2_models(self):
# Model a
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_a = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1_a = model.predict(X_test).reshape(-1, 1)
# Model b
model = GaussianNB()
S_train_1_b = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1_b = model.predict(X_test).reshape(-1, 1)
S_train_1 = np.c_[S_train_1_a, S_train_1_b]
S_test_1 = np.c_[S_test_1_a, S_test_1_b]
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')),
GaussianNB()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_bag_mode_2_models(self):
# Model a
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1_a = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_a = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
# Model b
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = GaussianNB()
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1_b = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
model = GaussianNB()
S_train_1_b = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
S_train_1 = np.c_[S_train_1_a, S_train_1_b]
S_test_1 = np.c_[S_test_1_a, S_test_1_b]
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')),
GaussianNB()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_mode_proba_2_models(self):
# Model a
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_a = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
_ = model.fit(X_train, y_train)
S_test_1_a = model.predict_proba(X_test)
# Model b
model = GaussianNB()
S_train_1_b = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
_ = model.fit(X_train, y_train)
S_test_1_b = model.predict_proba(X_test)
S_train_1 = np.c_[S_train_1_a, S_train_1_b]
S_test_1 = np.c_[S_test_1_a, S_test_1_b]
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')),
GaussianNB()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, stratified = True,
mode = 'oof_pred', random_state = 0, verbose = 0, needs_proba = True, save_dir=temp_dir)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_bag_mode_proba_2_models(self):
# Model a
S_test_1_a = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1_a[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis = 1)
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_a = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
# Model b
S_test_1_b = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = GaussianNB()
_ = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1_b[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis = 1)
model = GaussianNB()
S_train_1_b = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict_proba')
S_train_1 = np.c_[S_train_1_a, S_train_1_b]
S_test_1 = np.c_[S_test_1_a, S_test_1_b]
models = [OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')),
GaussianNB()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True, needs_proba = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
#@@@@
# Look at proba
# print('\nTwo models')
# print('etalon')
# print(S_test_1[:2])
# print('vecstack')
# print(S_test_2[:2])
#@@@@
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_N_dim_input(self):
"""
This is `test_oof_pred_bag_mode` function with `LogisticRegressionUnrolled` estimator
"""
S_test_temp = np.zeros((X_test_4d_unrolled.shape[0], n_folds))
# Usind StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train_4d_unrolled, y_train_4d)):
# Split data and target
X_tr = X_train_4d_unrolled[tr_index]
y_tr = y_train_4d[tr_index]
X_te = X_train_4d_unrolled[te_index]
y_te = y_train_4d[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test_4d_unrolled)
S_test_1 = st.mode(S_test_temp, axis = 1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train_4d_unrolled, y = y_train_4d, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
models = [OneVsRestClassifierUnrolled(LogisticRegressionUnrolled(random_state=0, solver='liblinear'))]
S_train_2, S_test_2 = stacking(models, X_train_4d, y_train_4d, X_test_4d,
regression = False, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0, stratified = True)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Main concept for testing returned arrays:
# 1). create ground truth e.g. with cross_val_predict
# 2). run vecstack
# 3). compare returned arrays with ground truth
# 4). compare arrays from file with ground truth
#-------------------------------------------------------------------------------
from __future__ import print_function
from __future__ import division
import unittest
from numpy.testing import assert_array_equal
# from numpy.testing import assert_allclose
from numpy.testing import assert_equal
from numpy.testing import assert_raises
from numpy.testing import assert_warns
import os
import glob
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse import csc_matrix
from scipy.sparse import coo_matrix
from sklearn.model_selection import cross_val_predict
from sklearn.model_selection import cross_val_score
# from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
# from sklearn.datasets import load_boston
from sklearn.datasets import fetch_openml
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import make_scorer
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
from vecstack import stacking
from vecstack.core import model_action
n_folds = 5
temp_dir = 'tmpdw35lg54ms80eb42'
# boston = load_boston()
boston = fetch_openml(name='boston', version=1, as_frame=False, parser='auto')
# X, y = boston.data, boston.target
X, y = boston.data.astype(float), boston.target.astype(float)
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)
# Make train/test split by hand to avoid strange errors probably related to testing suit:
# https://github.com/scikit-learn/scikit-learn/issues/1684
# https://github.com/scikit-learn/scikit-learn/issues/1704
# Note: Python 2.7, 3.4 - OK, but 3.5, 3.6 - error
np.random.seed(0)
ind = np.arange(500)
np.random.shuffle(ind)
ind_train = ind[:400]
ind_test = ind[400:]
X_train = X[ind_train]
X_test = X[ind_test]
y_train = y[ind_train]
y_test = y[ind_test]
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
class MinimalEstimator:
"""Has no get_params attribute"""
def __init__(self, random_state=0):
self.random_state = random_state
def __repr__(self):
return 'Demo string from __repr__'
def fit(self, X, y):
return self
def predict(self, X):
return np.ones(X.shape[0])
def predict_proba(self, X):
return np.zeros(X.shape[0])
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
class TestFuncRegression(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
os.mkdir(temp_dir)
except:
print('Unable to create temp dir')
@classmethod
def tearDownClass(cls):
try:
os.rmdir(temp_dir)
except:
print('Unable to remove temp dir')
def tearDown(self):
# Remove files after each test
files = glob.glob(os.path.join(temp_dir, '*.npy'))
files.extend(glob.glob(os.path.join(temp_dir, '*.log.txt')))
try:
for file in files:
os.remove(file)
except:
print('Unable to remove temp file')
#---------------------------------------------------------------------------
# Testing returned and saved arrays in each mode
#---------------------------------------------------------------------------
def test_oof_pred_mode(self):
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_B_mode(self):
""" 'B' is alias for 'oof_pred' """
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'B', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_mode(self):
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
S_test_1 = None
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_pred_mode(self):
model = LinearRegression()
S_train_1 = None
_ = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'pred', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_bag_mode(self):
S_test_temp = np.zeros((X_test.shape[0], n_folds))
kf = KFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = LinearRegression()
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = np.mean(S_test_temp, axis = 1).reshape(-1, 1)
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_A_mode(self):
""" 'A' is alias for 'oof_pred_bag' """
S_test_temp = np.zeros((X_test.shape[0], n_folds))
kf = KFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = LinearRegression()
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = np.mean(S_test_temp, axis = 1).reshape(-1, 1)
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'A', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_pred_bag_mode(self):
S_test_temp = np.zeros((X_test.shape[0], n_folds))
kf = KFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = LinearRegression()
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = np.mean(S_test_temp, axis = 1).reshape(-1, 1)
S_train_1 = None
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'pred_bag', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Testing <sample_weight> all ones
#---------------------------------------------------------------------------
def test_oof_pred_mode_sample_weight_one(self):
sw = np.ones(len(y_train))
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict',
params = {'sample_weight': sw}).reshape(-1, 1)
_ = model.fit(X_train, y_train, sample_weight = sw)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0,
sample_weight = sw)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Test <sample_weight> all random
#---------------------------------------------------------------------------
def test_oof_pred_mode_sample_weight_random(self):
np.random.seed(0)
sw = np.random.rand(len(y_train))
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict',
params = {'sample_weight': sw}).reshape(-1, 1)
_ = model.fit(X_train, y_train, sample_weight = sw)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0,
sample_weight = sw)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Testing <transform_target> and <transform_pred> parameters
#---------------------------------------------------------------------------
def test_oof_pred_mode_transformations(self):
model = LinearRegression()
S_train_1 = np.expm1(cross_val_predict(model, X_train, y = np.log1p(y_train), cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict')).reshape(-1, 1)
_ = model.fit(X_train, np.log1p(y_train))
S_test_1 = np.expm1(model.predict(X_test)).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0,
transform_target = np.log1p, transform_pred = np.expm1)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Testing <verbose> parameter
#---------------------------------------------------------------------------
def test_oof_pred_mode_verbose_1(self):
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0)
models = [LinearRegression()]
S_train_3, S_test_3 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 1)
models = [LinearRegression()]
S_train_4, S_test_4 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 2)
models = [LinearRegression()]
S_train_5, S_test_5 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False,
mode = 'oof_pred', random_state = 0, verbose = 0)
models = [LinearRegression()]
S_train_6, S_test_6 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False,
mode = 'oof_pred', random_state = 0, verbose = 1)
models = [LinearRegression()]
S_train_7, S_test_7 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False,
mode = 'oof_pred', random_state = 0, verbose = 2)
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
assert_array_equal(S_train_1, S_train_4)
assert_array_equal(S_test_1, S_test_4)
assert_array_equal(S_train_1, S_train_5)
assert_array_equal(S_test_1, S_test_5)
assert_array_equal(S_train_1, S_train_6)
assert_array_equal(S_test_1, S_test_6)
assert_array_equal(S_train_1, S_train_7)
assert_array_equal(S_test_1, S_test_7)
#---------------------------------------------------------------------------
# Test <metric> parameter and its default values depending on <regression> parameter
# Important. We use <greater_is_better = True> in <make_scorer> for any error function
# because we need raw scores (without minus sign)
#---------------------------------------------------------------------------
def test_oof_mode_metric(self):
model = LinearRegression()
scorer = make_scorer(mean_absolute_error)
scores = cross_val_score(model, X_train, y = y_train, cv = n_folds,
scoring = scorer, n_jobs = 1, verbose = 0)
mean_str_1 = '%.8f' % np.mean(scores)
std_str_1 = '%.8f' % np.std(scores)
models = [LinearRegression()]
S_train, S_test = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, save_dir=temp_dir,
mode = 'oof', random_state = 0, verbose = 0)
# Load mean score and std from file
# Normally if cleaning is performed there is only one .log.txt file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.log.txt')))[-1] # take the latest file
with open(file_name) as f:
for line in f:
if 'MEAN' in line:
split = line.strip().split()
break
mean_str_2 = split[1][1:-1]
std_str_2 = split[3][1:-1]
assert_equal(mean_str_1, mean_str_2)
assert_equal(std_str_1, std_str_2)
#-------------------------------------------------------------------------------
# Test several mdels in one run
#-------------------------------------------------------------------------------
def test_oof_pred_mode_2_models(self):
model = LinearRegression()
S_train_1_a = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1_a = model.predict(X_test).reshape(-1, 1)
model = Ridge(random_state = 0)
S_train_1_b = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(X_train, y_train)
S_test_1_b = model.predict(X_test).reshape(-1, 1)
S_train_1 = np.c_[S_train_1_a, S_train_1_b]
S_test_1 = np.c_[S_test_1_a, S_test_1_b]
models = [LinearRegression(),
Ridge(random_state = 0)]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_bag_mode_2_models(self):
# Model a
S_test_temp = np.zeros((X_test.shape[0], n_folds))
kf = KFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = LinearRegression()
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1_a = np.mean(S_test_temp, axis = 1).reshape(-1, 1)
model = LinearRegression()
S_train_1_a = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
# Model b
S_test_temp = np.zeros((X_test.shape[0], n_folds))
kf = KFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = Ridge(random_state = 0)
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1_b = np.mean(S_test_temp, axis = 1).reshape(-1, 1)
model = Ridge(random_state = 0)
S_train_1_b = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
S_train_1 = np.c_[S_train_1_a, S_train_1_b]
S_test_1 = np.c_[S_test_1_a, S_test_1_b]
models = [LinearRegression(),
Ridge(random_state = 0)]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Testing sparse types CSR, CSC, COO
#---------------------------------------------------------------------------
def test_oof_pred_mode_sparse_csr(self):
model = LinearRegression()
S_train_1 = cross_val_predict(model, csr_matrix(X_train), y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(csr_matrix(X_train), y_train)
S_test_1 = model.predict(csr_matrix(X_test)).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, csr_matrix(X_train), y_train, csr_matrix(X_test),
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_mode_sparse_csc(self):
model = LinearRegression()
S_train_1 = cross_val_predict(model, csc_matrix(X_train), y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(csc_matrix(X_train), y_train)
S_test_1 = model.predict(csc_matrix(X_test)).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, csc_matrix(X_train), y_train, csc_matrix(X_test),
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
def test_oof_pred_mode_sparse_coo(self):
model = LinearRegression()
S_train_1 = cross_val_predict(model, coo_matrix(X_train), y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(coo_matrix(X_train), y_train)
S_test_1 = model.predict(coo_matrix(X_test)).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, coo_matrix(X_train), y_train, coo_matrix(X_test),
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Testing X_train -> SCR, X_test -> COO
#---------------------------------------------------------------------------
def test_oof_pred_mode_sparse_csr_coo(self):
model = LinearRegression()
S_train_1 = cross_val_predict(model, csr_matrix(X_train), y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(csr_matrix(X_train), y_train)
S_test_1 = model.predict(coo_matrix(X_test)).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, csr_matrix(X_train), y_train, coo_matrix(X_test),
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Testing X_train -> SCR, X_test -> Dense
#---------------------------------------------------------------------------
def test_oof_pred_mode_sparse_csr_dense(self):
model = LinearRegression()
S_train_1 = cross_val_predict(model, csr_matrix(X_train), y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
_ = model.fit(csr_matrix(X_train), y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, csr_matrix(X_train), y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Testing X_test=None
#---------------------------------------------------------------------------
def test_oof_mode_xtest_is_none(self):
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
S_test_1 = None
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, None,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Testing parameter exceptions
#---------------------------------------------------------------------------
def test_exceptions(self):
# Empty model list
assert_raises(ValueError, stacking, [], X_train, y_train, X_test)
# Wrong mode
assert_raises(ValueError, stacking, [LinearRegression()],
X_train, y_train, X_test, mode='abc')
# Path does not exist
assert_raises(ValueError, stacking, [LinearRegression()],
X_train, y_train, X_test, save_dir='./As26bV85')
# n_folds is not int
assert_raises(ValueError, stacking, [LinearRegression()],
X_train, y_train, X_test, n_folds='A')
# n_folds is less than 2
assert_raises(ValueError, stacking, [LinearRegression()],
X_train, y_train, X_test, n_folds=1)
# Wrong verbose value
assert_raises(ValueError, stacking, [LinearRegression()],
X_train, y_train, X_test, verbose=25)
# Internal function model_action
assert_raises(ValueError, model_action, LinearRegression(),
X_train, y_train, X_test, sample_weight=None,
action='abc', transform=None)
# X_test is None when mode != 'oof'
assert_raises(ValueError, stacking, [LinearRegression()],
X_train, y_train, None, mode='oof_pred_bag')
#---------------------------------------------------------------------------
# Testing parameter warnings
#---------------------------------------------------------------------------
def test_warnings(self):
# Parameters specific for classification are ignored if regression=True
assert_warns(UserWarning, stacking, [LinearRegression()],
X_train, y_train, X_test, regression=True,
needs_proba=True)
assert_warns(UserWarning, stacking, [LinearRegression()],
X_train, y_train, X_test, regression=True,
stratified=True)
assert_warns(UserWarning, stacking, [LinearRegression()],
X_train, y_train, X_test, regression=True,
needs_proba=True, stratified=True)
#---------------------------------------------------------------------------
# Test if model has no 'get_params'
#---------------------------------------------------------------------------
def test_oof_pred_mode_no_get_params(self):
S_train_1 = np.ones(X_train.shape[0]).reshape(-1, 1)
S_test_1 = np.ones(X_test.shape[0]).reshape(-1, 1)
models = [MinimalEstimator()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#--------------------------------------------------------------------------
# Test inconsistent data shape or type
#--------------------------------------------------------------------------
def test_inconsistent_data(self):
# nan or inf in y
y_train_nan = y_train.copy()
y_train_nan[0] = np.nan
assert_raises(ValueError, stacking, [LinearRegression()],
X_train, y_train_nan, X_test)
# y has two or more columns
assert_raises(ValueError, stacking, [LinearRegression()],
X_train, np.c_[y_train, y_train], X_test)
# X_train and y_train shape nismatch
assert_raises(ValueError, stacking, [LinearRegression()],
X_train, y_train[:10], X_test)
#---------------------------------------------------------------------------
# Test small input
#---------------------------------------------------------------------------
def test_small_input(self):
"""
This is `test_oof_pred_bag_mode` with small input data
Train: 20 examples
Test: 10 examples
"""
S_test_temp = np.zeros((X_test[:10].shape[0], n_folds))
kf = KFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train[:20], y_train[:20])):
# Split data and target
X_tr = X_train[:20][tr_index]
y_tr = y_train[:20][tr_index]
X_te = X_train[:20][te_index]
y_te = y_train[:20][te_index]
model = LinearRegression()
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test[:10])
S_test_1 = np.mean(S_test_temp, axis = 1).reshape(-1, 1)
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train[:20], y = y_train[:20], cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train[:20], y_train[:20], X_test[:10],
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof_pred_bag', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Mode 'oof', X_test=None
#---------------------------------------------------------------------------
def test_oof_mode_with_none(self):
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
S_test_1 = None
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, None,
regression = True, n_folds = n_folds, shuffle = False, save_dir=temp_dir,
mode = 'oof', random_state = 0, verbose = 0)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# All default values (mode='oof_pred_bag')
#---------------------------------------------------------------------------
def test_all_defaults(self):
# Override global n_folds=5, because default value in stacking function is 4
n_folds=4
S_test_temp = np.zeros((X_test.shape[0], n_folds))
kf = KFold(n_splits = n_folds, shuffle = False, random_state = None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
X_te = X_train[te_index]
y_te = y_train[te_index]
model = LinearRegression()
_ = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = np.mean(S_test_temp, axis = 1).reshape(-1, 1)
model = LinearRegression()
S_train_1 = cross_val_predict(model, X_train, y = y_train, cv = n_folds,
n_jobs = 1, verbose = 0, method = 'predict').reshape(-1, 1)
models = [LinearRegression()]
S_train_2, S_test_2 = stacking(models, X_train, y_train, X_test, save_dir=temp_dir)
# Load OOF from file
# Normally if cleaning is performed there is only one .npy file at given moment
# But if we have no cleaning there may be more then one file so we take the latest
file_name = sorted(glob.glob(os.path.join(temp_dir, '*.npy')))[-1] # take the latest file
S = np.load(file_name, allow_pickle=True)
S_train_3 = S[0]
S_test_3 = S[1]
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Exactly the same as multiclass but ``n_classes=2``
# and class name is ``TestSklearnClassificationBinary``
# -----------------------------------------------------------------------------
# !!! cross_val_predict uses stratified split
# -----------------------------------------------------------------------------
# Main concept for testing returned arrays:
# 1). create ground truth e.g. with cross_val_predict
# 2). run vecstack
# 3). compare returned arrays with ground truth
# -----------------------------------------------------------------------------
from __future__ import print_function
from __future__ import division
import unittest
from numpy.testing import assert_array_equal
# from numpy.testing import assert_allclose
from numpy.testing import assert_equal
# import os
# import glob
import numpy as np
import scipy.stats as st
from sklearn.model_selection import cross_val_predict
from sklearn.model_selection import cross_val_score
# from sklearn.model_selection import train_test_split
# from sklearn.model_selection import KFold
from sklearn.model_selection import StratifiedKFold
from sklearn.datasets import make_classification
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import accuracy_score
from sklearn.metrics import zero_one_loss
from sklearn.metrics import log_loss
from sklearn.metrics import roc_auc_score
from sklearn.metrics import make_scorer
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.dummy import DummyClassifier
from vecstack import StackingTransformer
from sklearn.multiclass import OneVsRestClassifier
n_classes = 2
n_folds = 5
# temp_dir = 'tmpdw35lg54ms80eb42'
X, y = make_classification(n_samples=500, n_features=5,
n_informative=3, n_redundant=1,
n_classes=n_classes, flip_y=0,
random_state=0)
# X_train, X_test, y_train, y_test = train_test_split(X, y,
# test_size=0.2,
# random_state=0)
# Make train/test split by hand to avoid strange errors probably related to testing suit:
# https://github.com/scikit-learn/scikit-learn/issues/1684
# https://github.com/scikit-learn/scikit-learn/issues/1704
# Note: Python 2.7, 3.4 - OK, but 3.5, 3.6 - error
np.random.seed(0)
ind = np.arange(500)
np.random.shuffle(ind)
ind_train = ind[:400]
ind_test = ind[400:]
X_train = X[ind_train]
X_test = X[ind_test]
y_train = y[ind_train]
y_test = y[ind_test]
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def roc_auc_score_universal(y_true, y_pred):
"""ROC AUC metric for both binary and multiclass classification.
Parameters
----------
y_true - 1d numpy array
True class labels
y_pred - 2d numpy array
Predicted probabilities for each class
"""
ohe = OneHotEncoder(sparse_output=False)
y_true = ohe.fit_transform(y_true.reshape(-1, 1))
#@@@@
if len(y_pred.shape) == 1:
y_pred = np.c_[y_pred, y_pred]
y_pred[:, 0] = 1 - y_pred[:, 1]
#@@@@
auc_score = roc_auc_score(y_true, y_pred)
return auc_score
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
class TestSklearnClassificationBinary(unittest.TestCase):
# -------------------------------------------------------------------------
# Test returned arrays in variant B. Labels
# -------------------------------------------------------------------------
def test_variant_B_labels(self):
# reference
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
model = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test returned arrays in variant A. Labels
# -------------------------------------------------------------------------
def test_variant_A_labels(self):
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
model = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = st.mode(S_test_temp, axis=1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='A', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test returned arrays in variant B. Probabilities
# -------------------------------------------------------------------------
def test_variant_B_proba(self):
# reference
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
model = model.fit(X_train, y_train)
S_test_1 = model.predict_proba(X_test)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test returned arrays in variant A. Probabilities
# -------------------------------------------------------------------------
def test_variant_A_proba(self):
S_test_1 = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
model = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis=1)
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='A', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Test ``shuffle`` and ``random_state`` parameters in variant A
#---------------------------------------------------------------------------
def test_variant_A_proba_shuffle_random_state(self):
S_test_1 = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=0)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
model = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis=1)
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
# !!! Important. Here we pass CV-generator ``cv=kf`` not number of folds
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=kf, n_jobs=1, verbose=0,
method='predict_proba')
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=True,
variant='A', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test default metric and scores. Labels
# ``metric`` parameter and its default values depends on ``regression`` parameter.
# Important. We use ``greater_is_better=True`` in ``make_scorer``
# for any error function because we need raw scores (without minus sign)
# -------------------------------------------------------------------------
def test_default_metric_and_scores_labels(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(accuracy_score)
scores_1 = cross_val_score(model, X_train, y=y_train,
cv=n_folds, scoring=scorer,
n_jobs=1, verbose=0)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
scores_2 = stack.scores_[0].copy()
# fit_transform
# also check refitting already fitted transformer
_ = stack.fit_transform(X_train, y_train)
scores_3 = stack.scores_[0].copy()
assert_array_equal(scores_1, scores_2)
assert_array_equal(scores_1, scores_3)
# mean and std
mean_1 = np.mean(scores_1)
std_1 = np.std(scores_1)
mean_2 = stack.mean_std_[0][1]
std_2 = stack.mean_std_[0][2]
assert_equal(mean_1, mean_2)
assert_equal(std_1, std_2)
# -------------------------------------------------------------------------
# Test custom metric and scores. Labels
# -------------------------------------------------------------------------
def test_custom_metric_and_scores_labels(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(zero_one_loss)
scores_1 = cross_val_score(model, X_train, y=y_train,
cv=n_folds, scoring=scorer,
n_jobs=1, verbose=0)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, metric=zero_one_loss,
verbose=0)
stack = stack.fit(X_train, y_train)
scores_2 = stack.scores_[0].copy()
# fit_transform
# also check refitting already fitted transformer
_ = stack.fit_transform(X_train, y_train)
scores_3 = stack.scores_[0].copy()
assert_array_equal(scores_1, scores_2)
assert_array_equal(scores_1, scores_3)
# mean and std
mean_1 = np.mean(scores_1)
std_1 = np.std(scores_1)
mean_2 = stack.mean_std_[0][1]
std_2 = stack.mean_std_[0][2]
assert_equal(mean_1, mean_2)
assert_equal(std_1, std_2)
# -------------------------------------------------------------------------
# Test default metric and scores. Probabilities
# -------------------------------------------------------------------------
def test_default_metric_and_scores_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(log_loss, response_method='predict_proba')
scores_1 = cross_val_score(model, X_train, y=y_train,
cv=n_folds, scoring=scorer,
n_jobs=1, verbose=0)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
scores_2 = stack.scores_[0].copy()
# fit_transform
# also check refitting already fitted transformer
_ = stack.fit_transform(X_train, y_train)
scores_3 = stack.scores_[0].copy()
assert_array_equal(scores_1, scores_2)
assert_array_equal(scores_1, scores_3)
# mean and std
mean_1 = np.mean(scores_1)
std_1 = np.std(scores_1)
mean_2 = stack.mean_std_[0][1]
std_2 = stack.mean_std_[0][2]
assert_equal(mean_1, mean_2)
assert_equal(std_1, std_2)
# -------------------------------------------------------------------------
# Test custom metric and scores. Probabilities
# -------------------------------------------------------------------------
def test_custom_metric_and_scores_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(roc_auc_score_universal, response_method='predict_proba')
scores_1 = cross_val_score(model, X_train, y=y_train,
cv=n_folds, scoring=scorer,
n_jobs=1, verbose=0)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, needs_proba=True,
metric=roc_auc_score_universal, verbose=0)
stack = stack.fit(X_train, y_train)
scores_2 = stack.scores_[0].copy()
# fit_transform
# also check refitting already fitted transformer
_ = stack.fit_transform(X_train, y_train)
scores_3 = stack.scores_[0].copy()
assert_array_equal(scores_1, scores_2)
assert_array_equal(scores_1, scores_3)
# mean and std
mean_1 = np.mean(scores_1)
std_1 = np.std(scores_1)
mean_2 = stack.mean_std_[0][1]
std_2 = stack.mean_std_[0][2]
assert_equal(mean_1, mean_2)
assert_equal(std_1, std_2)
# -------------------------------------------------------------------------
# Test several estimators in one run. Variant B. Labels
# -------------------------------------------------------------------------
def test_variant_B_2_estimators_labels(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_e1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
model = model.fit(X_train, y_train)
S_test_1_e1 = model.predict(X_test).reshape(-1, 1)
model = GaussianNB()
S_train_1_e2 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
model = model.fit(X_train, y_train)
S_test_1_e2 = model.predict(X_test).reshape(-1, 1)
S_train_1 = np.c_[S_train_1_e1, S_train_1_e2]
S_test_1 = np.c_[S_test_1_e1, S_test_1_e2]
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))),
('bayes', GaussianNB())]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test several estimators in one run. Variant B. Probabilities
# -------------------------------------------------------------------------
def test_variant_B_2_estimators_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_e1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
model = model.fit(X_train, y_train)
S_test_1_e1 = model.predict_proba(X_test)
model = GaussianNB()
S_train_1_e2 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
model = model.fit(X_train, y_train)
S_test_1_e2 = model.predict_proba(X_test)
S_train_1 = np.c_[S_train_1_e1, S_train_1_e2]
S_test_1 = np.c_[S_test_1_e1, S_test_1_e2]
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))),
('bayes', GaussianNB())]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test several estimators in one run. Variant A. Labels
# -------------------------------------------------------------------------
def test_variant_A_2_estimators_labels(self):
# Estimator 1
S_test_temp_e1 = np.zeros((X_test.shape[0], n_folds))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
model = model.fit(X_tr, y_tr)
S_test_temp_e1[:, fold_counter] = model.predict(X_test)
S_test_1_e1 = st.mode(S_test_temp_e1, axis=1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_e1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
# Estimator 2
S_test_temp_e2 = np.zeros((X_test.shape[0], n_folds))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = GaussianNB()
model = model.fit(X_tr, y_tr)
S_test_temp_e2[:, fold_counter] = model.predict(X_test)
S_test_1_e2 = st.mode(S_test_temp_e2, axis=1, keepdims=True)[0]
model = GaussianNB()
S_train_1_e2 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
S_train_1 = np.c_[S_train_1_e1, S_train_1_e2]
S_test_1 = np.c_[S_test_1_e1, S_test_1_e2]
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))),
('bayes', GaussianNB())]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='A', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test several estimators in one run. Variant A. Probabilities
# -------------------------------------------------------------------------
def test_variant_A_2_estimators_proba(self):
# Estimator 1
S_test_1_e1 = np.zeros((X_test.shape[0], n_classes))
S_test_temp_e1 = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
model = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp_e1[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1_e1[:, class_id] = np.mean(S_test_temp_e1[:, class_id::n_classes], axis=1)
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_e1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
# Estimator 2
S_test_1_e2 = np.zeros((X_test.shape[0], n_classes))
S_test_temp_e2 = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = GaussianNB()
model = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp_e2[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1_e2[:, class_id] = np.mean(S_test_temp_e2[:, class_id::n_classes], axis=1)
model = GaussianNB()
S_train_1_e2 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
S_train_1 = np.c_[S_train_1_e1, S_train_1_e2]
S_test_1 = np.c_[S_test_1_e1, S_test_1_e2]
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))),
('bayes', GaussianNB())]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='A', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test default (dummy) classifier. Labels
# -------------------------------------------------------------------------
def test_variant_B_default_classifier_labels(self):
# reference
model = DummyClassifier(strategy='constant', constant=1)
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
model = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
# fit then transform
stack = StackingTransformer(estimators=None, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test default (dummy) classifier. Probabilities
# -------------------------------------------------------------------------
def test_variant_B_default_classifier_proba(self):
# reference
model = DummyClassifier(strategy='constant', constant=1)
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
model = model.fit(X_train, y_train)
S_test_1 = model.predict_proba(X_test)
# fit then transform
stack = StackingTransformer(estimators=None, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
needs_proba=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Testing ``verbose`` parameter
#---------------------------------------------------------------------------
def test_variant_B_verbose(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
model = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
# verbose=0
# fit then transform
estimators = [('lr', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# verbose=1
# fit then transform
estimators = [('lr', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=1)
stack = stack.fit(X_train, y_train)
S_train_4 = stack.transform(X_train)
S_test_4 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_5 = stack.fit_transform(X_train, y_train)
S_test_5 = stack.transform(X_test)
# verbose=2
# fit then transform
estimators = [('lr', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=2)
stack = stack.fit(X_train, y_train)
S_train_6 = stack.transform(X_train)
S_test_6 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_7 = stack.fit_transform(X_train, y_train)
S_test_7 = stack.transform(X_test)
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
assert_array_equal(S_train_1, S_train_4)
assert_array_equal(S_test_1, S_test_4)
assert_array_equal(S_train_1, S_train_5)
assert_array_equal(S_test_1, S_test_5)
assert_array_equal(S_train_1, S_train_6)
assert_array_equal(S_test_1, S_test_6)
assert_array_equal(S_train_1, S_train_7)
assert_array_equal(S_test_1, S_test_7)
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# !!! cross_val_predict uses stratified split
# -----------------------------------------------------------------------------
# Main concept for testing returned arrays:
# 1). create ground truth e.g. with cross_val_predict
# 2). run vecstack
# 3). compare returned arrays with ground truth
# -----------------------------------------------------------------------------
from __future__ import print_function
from __future__ import division
import unittest
from numpy.testing import assert_array_equal
# from numpy.testing import assert_allclose
from numpy.testing import assert_equal
from numpy.testing import assert_raises
# import os
# import glob
import numpy as np
import scipy.stats as st
from sklearn.model_selection import cross_val_predict
from sklearn.model_selection import cross_val_score
# from sklearn.model_selection import train_test_split
# from sklearn.model_selection import KFold
from sklearn.model_selection import StratifiedKFold
from sklearn.datasets import make_classification
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import accuracy_score
from sklearn.metrics import zero_one_loss
from sklearn.metrics import log_loss
from sklearn.metrics import roc_auc_score
from sklearn.metrics import make_scorer
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.dummy import DummyClassifier
from vecstack import StackingTransformer
from sklearn.multiclass import OneVsRestClassifier
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import StackingClassifier
from sklearn.pipeline import Pipeline
n_classes = 3
n_folds = 5
# temp_dir = 'tmpdw35lg54ms80eb42'
X, y = make_classification(n_samples=500, n_features=5,
n_informative=3, n_redundant=1,
n_classes=n_classes, flip_y=0,
random_state=0)
# X_train, X_test, y_train, y_test = train_test_split(X, y,
# test_size=0.2,
# random_state=0)
# Make train/test split by hand to avoid strange errors probably related to testing suit:
# https://github.com/scikit-learn/scikit-learn/issues/1684
# https://github.com/scikit-learn/scikit-learn/issues/1704
# Note: Python 2.7, 3.4 - OK, but 3.5, 3.6 - error
np.random.seed(0)
ind = np.arange(500)
np.random.shuffle(ind)
ind_train = ind[:400]
ind_test = ind[400:]
X_train = X[ind_train]
X_test = X[ind_test]
y_train = y[ind_train]
y_test = y[ind_test]
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
def roc_auc_score_universal(y_true, y_pred):
"""ROC AUC metric for both binary and multiclass classification.
Parameters
----------
y_true - 1d numpy array
True class labels
y_pred - 2d numpy array
Predicted probabilities for each class
"""
ohe = OneHotEncoder(sparse_output=False)
y_true = ohe.fit_transform(y_true.reshape(-1, 1))
auc_score = roc_auc_score(y_true, y_pred)
return auc_score
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
class TestSklearnClassificationMulticlass(unittest.TestCase):
# -------------------------------------------------------------------------
# Test returned arrays in variant B. Labels
# -------------------------------------------------------------------------
def test_variant_B_labels(self):
# reference
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
model = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test returned arrays in variant A. Labels
# -------------------------------------------------------------------------
def test_variant_A_labels(self):
S_test_temp = np.zeros((X_test.shape[0], n_folds))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
model = model.fit(X_tr, y_tr)
S_test_temp[:, fold_counter] = model.predict(X_test)
S_test_1 = st.mode(S_test_temp, axis=1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='A', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test returned arrays in variant B. Probabilities
# -------------------------------------------------------------------------
def test_variant_B_proba(self):
# reference
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
model = model.fit(X_train, y_train)
S_test_1 = model.predict_proba(X_test)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test returned arrays in variant A. Probabilities
# -------------------------------------------------------------------------
def test_variant_A_proba(self):
S_test_1 = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
model = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis=1)
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='A', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Test ``shuffle`` and ``random_state`` parameters in variant A
#---------------------------------------------------------------------------
def test_variant_A_proba_shuffle_random_state(self):
S_test_1 = np.zeros((X_test.shape[0], n_classes))
S_test_temp = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=0)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
model = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1[:, class_id] = np.mean(S_test_temp[:, class_id::n_classes], axis=1)
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
# !!! Important. Here we pass CV-generator ``cv=kf`` not number of folds
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=kf, n_jobs=1, verbose=0,
method='predict_proba')
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=True,
variant='A', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test default metric and scores. Labels
# ``metric`` parameter and its default values depends on ``regression`` parameter.
# Important. We use ``greater_is_better=True`` in ``make_scorer``
# for any error function because we need raw scores (without minus sign)
# -------------------------------------------------------------------------
def test_default_metric_and_scores_labels(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(accuracy_score)
scores_1 = cross_val_score(model, X_train, y=y_train,
cv=n_folds, scoring=scorer,
n_jobs=1, verbose=0)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
scores_2 = stack.scores_[0].copy()
# fit_transform
# also check refitting already fitted transformer
_ = stack.fit_transform(X_train, y_train)
scores_3 = stack.scores_[0].copy()
assert_array_equal(scores_1, scores_2)
assert_array_equal(scores_1, scores_3)
# mean and std
mean_1 = np.mean(scores_1)
std_1 = np.std(scores_1)
mean_2 = stack.mean_std_[0][1]
std_2 = stack.mean_std_[0][2]
assert_equal(mean_1, mean_2)
assert_equal(std_1, std_2)
# -------------------------------------------------------------------------
# Test custom metric and scores. Labels
# -------------------------------------------------------------------------
def test_custom_metric_and_scores_labels(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(zero_one_loss)
scores_1 = cross_val_score(model, X_train, y=y_train,
cv=n_folds, scoring=scorer,
n_jobs=1, verbose=0)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, metric=zero_one_loss,
verbose=0)
stack = stack.fit(X_train, y_train)
scores_2 = stack.scores_[0].copy()
# fit_transform
# also check refitting already fitted transformer
_ = stack.fit_transform(X_train, y_train)
scores_3 = stack.scores_[0].copy()
assert_array_equal(scores_1, scores_2)
assert_array_equal(scores_1, scores_3)
# mean and std
mean_1 = np.mean(scores_1)
std_1 = np.std(scores_1)
mean_2 = stack.mean_std_[0][1]
std_2 = stack.mean_std_[0][2]
assert_equal(mean_1, mean_2)
assert_equal(std_1, std_2)
# -------------------------------------------------------------------------
# Test default metric and scores. Probabilities
# -------------------------------------------------------------------------
def test_default_metric_and_scores_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(log_loss, response_method='predict_proba')
scores_1 = cross_val_score(model, X_train, y=y_train,
cv=n_folds, scoring=scorer,
n_jobs=1, verbose=0)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
scores_2 = stack.scores_[0].copy()
# fit_transform
# also check refitting already fitted transformer
_ = stack.fit_transform(X_train, y_train)
scores_3 = stack.scores_[0].copy()
assert_array_equal(scores_1, scores_2)
assert_array_equal(scores_1, scores_3)
# mean and std
mean_1 = np.mean(scores_1)
std_1 = np.std(scores_1)
mean_2 = stack.mean_std_[0][1]
std_2 = stack.mean_std_[0][2]
assert_equal(mean_1, mean_2)
assert_equal(std_1, std_2)
# -------------------------------------------------------------------------
# Test custom metric and scores. Probabilities
# -------------------------------------------------------------------------
def test_custom_metric_and_scores_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
scorer = make_scorer(roc_auc_score_universal, response_method='predict_proba')
scores_1 = cross_val_score(model, X_train, y=y_train,
cv=n_folds, scoring=scorer,
n_jobs=1, verbose=0)
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, needs_proba=True,
metric=roc_auc_score_universal, verbose=0)
stack = stack.fit(X_train, y_train)
scores_2 = stack.scores_[0].copy()
# fit_transform
# also check refitting already fitted transformer
_ = stack.fit_transform(X_train, y_train)
scores_3 = stack.scores_[0].copy()
assert_array_equal(scores_1, scores_2)
assert_array_equal(scores_1, scores_3)
# mean and std
mean_1 = np.mean(scores_1)
std_1 = np.std(scores_1)
mean_2 = stack.mean_std_[0][1]
std_2 = stack.mean_std_[0][2]
assert_equal(mean_1, mean_2)
assert_equal(std_1, std_2)
# -------------------------------------------------------------------------
# Test several estimators in one run. Variant B. Labels
# -------------------------------------------------------------------------
def test_variant_B_2_estimators_labels(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_e1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
model = model.fit(X_train, y_train)
S_test_1_e1 = model.predict(X_test).reshape(-1, 1)
model = GaussianNB()
S_train_1_e2 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
model = model.fit(X_train, y_train)
S_test_1_e2 = model.predict(X_test).reshape(-1, 1)
S_train_1 = np.c_[S_train_1_e1, S_train_1_e2]
S_test_1 = np.c_[S_test_1_e1, S_test_1_e2]
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))),
('bayes', GaussianNB())]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test several estimators in one run. Variant B. Probabilities
# -------------------------------------------------------------------------
def test_variant_B_2_estimators_proba(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_e1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
model = model.fit(X_train, y_train)
S_test_1_e1 = model.predict_proba(X_test)
model = GaussianNB()
S_train_1_e2 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
model = model.fit(X_train, y_train)
S_test_1_e2 = model.predict_proba(X_test)
S_train_1 = np.c_[S_train_1_e1, S_train_1_e2]
S_test_1 = np.c_[S_test_1_e1, S_test_1_e2]
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))),
('bayes', GaussianNB())]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test several estimators in one run. Variant A. Labels
# -------------------------------------------------------------------------
def test_variant_A_2_estimators_labels(self):
# Estimator 1
S_test_temp_e1 = np.zeros((X_test.shape[0], n_folds))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
model = model.fit(X_tr, y_tr)
S_test_temp_e1[:, fold_counter] = model.predict(X_test)
S_test_1_e1 = st.mode(S_test_temp_e1, axis=1, keepdims=True)[0]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_e1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
# Estimator 2
S_test_temp_e2 = np.zeros((X_test.shape[0], n_folds))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = GaussianNB()
model = model.fit(X_tr, y_tr)
S_test_temp_e2[:, fold_counter] = model.predict(X_test)
S_test_1_e2 = st.mode(S_test_temp_e2, axis=1, keepdims=True)[0]
model = GaussianNB()
S_train_1_e2 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
S_train_1 = np.c_[S_train_1_e1, S_train_1_e2]
S_test_1 = np.c_[S_test_1_e1, S_test_1_e2]
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))),
('bayes', GaussianNB())]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='A', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test several estimators in one run. Variant A. Probabilities
# -------------------------------------------------------------------------
def test_variant_A_2_estimators_proba(self):
# Estimator 1
S_test_1_e1 = np.zeros((X_test.shape[0], n_classes))
S_test_temp_e1 = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
model = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp_e1[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1_e1[:, class_id] = np.mean(S_test_temp_e1[:, class_id::n_classes], axis=1)
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1_e1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
# Estimator 2
S_test_1_e2 = np.zeros((X_test.shape[0], n_classes))
S_test_temp_e2 = np.zeros((X_test.shape[0], n_folds * n_classes))
# Using StratifiedKFold because by defauld cross_val_predict uses StratifiedKFold
kf = StratifiedKFold(n_splits=n_folds, shuffle=False, random_state=None)
for fold_counter, (tr_index, te_index) in enumerate(kf.split(X_train, y_train)):
# Split data and target
X_tr = X_train[tr_index]
y_tr = y_train[tr_index]
# X_te = X_train[te_index]
# y_te = y_train[te_index]
model = GaussianNB()
model = model.fit(X_tr, y_tr)
col_slice_fold = slice(fold_counter * n_classes, fold_counter * n_classes + n_classes)
S_test_temp_e2[:, col_slice_fold] = model.predict_proba(X_test)
for class_id in range(n_classes):
S_test_1_e2[:, class_id] = np.mean(S_test_temp_e2[:, class_id::n_classes], axis=1)
model = GaussianNB()
S_train_1_e2 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
S_train_1 = np.c_[S_train_1_e1, S_train_1_e2]
S_test_1 = np.c_[S_test_1_e1, S_test_1_e2]
# fit then transform
estimators = [('logit', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))),
('bayes', GaussianNB())]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='A', random_state=0,
stratified=True, needs_proba=True,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test default (dummy) classifier. Labels
# -------------------------------------------------------------------------
def test_variant_B_default_classifier_labels(self):
# reference
model = DummyClassifier(strategy='constant', constant=1)
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
model = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
# fit then transform
stack = StackingTransformer(estimators=None, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
# -------------------------------------------------------------------------
# Test default (dummy) classifier. Probabilities
# -------------------------------------------------------------------------
def test_variant_B_default_classifier_proba(self):
# reference
model = DummyClassifier(strategy='constant', constant=1)
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict_proba')
model = model.fit(X_train, y_train)
S_test_1 = model.predict_proba(X_test)
# fit then transform
stack = StackingTransformer(estimators=None, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
needs_proba=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# compare
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
#---------------------------------------------------------------------------
# Testing ``verbose`` parameter
#---------------------------------------------------------------------------
def test_variant_B_verbose(self):
model = OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear'))
S_train_1 = cross_val_predict(model, X_train, y=y_train,
cv=n_folds, n_jobs=1, verbose=0,
method='predict').reshape(-1, 1)
model = model.fit(X_train, y_train)
S_test_1 = model.predict(X_test).reshape(-1, 1)
# verbose=0
# fit then transform
estimators = [('lr', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=0)
stack = stack.fit(X_train, y_train)
S_train_2 = stack.transform(X_train)
S_test_2 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_3 = stack.fit_transform(X_train, y_train)
S_test_3 = stack.transform(X_test)
# verbose=1
# fit then transform
estimators = [('lr', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=1)
stack = stack.fit(X_train, y_train)
S_train_4 = stack.transform(X_train)
S_test_4 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_5 = stack.fit_transform(X_train, y_train)
S_test_5 = stack.transform(X_test)
# verbose=2
# fit then transform
estimators = [('lr', OneVsRestClassifier(LogisticRegression(random_state=0, solver='liblinear')))]
stack = StackingTransformer(estimators, regression=False,
n_folds=n_folds, shuffle=False,
variant='B', random_state=0,
stratified=True, verbose=2)
stack = stack.fit(X_train, y_train)
S_train_6 = stack.transform(X_train)
S_test_6 = stack.transform(X_test)
# fit_transform
# also check refitting already fitted transformer
S_train_7 = stack.fit_transform(X_train, y_train)
S_test_7 = stack.transform(X_test)
assert_array_equal(S_train_1, S_train_2)
assert_array_equal(S_test_1, S_test_2)
assert_array_equal(S_train_1, S_train_3)
assert_array_equal(S_test_1, S_test_3)
assert_array_equal(S_train_1, S_train_4)
assert_array_equal(S_test_1, S_test_4)
assert_array_equal(S_train_1, S_train_5)
assert_array_equal(S_test_1, S_test_5)
assert_array_equal(S_train_1, S_train_6)
assert_array_equal(S_test_1, S_test_6)
assert_array_equal(S_train_1, S_train_7)
assert_array_equal(S_test_1, S_test_7)
#--------------------------------------------------------------------------
# Added 20250921
# Compare with StackingClassifier
#--------------------------------------------------------------------------
def test_compare_with_stackinglassifier_from_sklearn(self):
estimators = [
('et', ExtraTreesClassifier(n_estimators=100, random_state=0)),
('rf', RandomForestClassifier(n_estimators=100, random_state=0))]
final_estimator = LogisticRegression(random_state=0)
# vecstack.StackingTransformer
stack = StackingTransformer(estimators=estimators,
regression=False,
variant='B',
n_folds=5,
shuffle=False,
stratified=True,
needs_proba=True)
steps = [('stack', stack),
('final_estimator', final_estimator)]
pipe = Pipeline(steps)
y_pred_vecstack = pipe.fit(X_train, y_train).predict_proba(X_test)
# sklearn.ensemble.StackingClassifier
clf = StackingClassifier(estimators=estimators,
final_estimator=final_estimator,
stack_method='predict_proba')
y_pred_sklearn = clf.fit(X_train, y_train).predict_proba(X_test)
assert_array_equal(y_pred_vecstack, y_pred_sklearn)
# Compare transformation
# Transformation for test set is equal
S_test_vecstack = stack.transform(X_test)
S_test_sklearn = clf.transform(X_test)
assert_array_equal(S_test_vecstack, S_test_sklearn)
# Transformation for train set set is different because StackingClassifier does not use CV procedure
S_train_vecstack = stack.transform(X_train)
S_train_sklearn = clf.transform(X_train)
assert_raises(AssertionError, assert_array_equal, S_train_vecstack, S_train_sklearn)
# Instead of CV procedure it just uses models trained on the whole train set
et = ExtraTreesClassifier(random_state=0, n_estimators=100)
rf = RandomForestClassifier(random_state=0, n_estimators=100)
y_pred_et = et.fit(X_train, y_train).predict_proba(X_train)
y_pred_rf = rf.fit(X_train, y_train).predict_proba(X_train)
assert_array_equal(S_train_sklearn, np.hstack([y_pred_et, y_pred_rf]))
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------

Sorry, the diff of this file is too big to display

+31
-10

@@ -1,4 +0,4 @@

Metadata-Version: 1.1
Metadata-Version: 2.4
Name: vecstack
Version: 0.4.0
Version: 0.5.0
Summary: Python package for stacking (machine learning technique)

@@ -9,8 +9,3 @@ Home-page: https://github.com/vecxoz/vecstack

License: MIT
Description:
Python package for stacking (stacked generalization) featuring lightweight functional API and fully compatible scikit-learn API.
Convenient way to automate OOF computation, prediction and bagging using any number of models.
Keywords: stacking,blending,bagging,ensemble,ensembling,machine learning
Platform: UNKNOWN
Classifier: License :: OSI Approved :: MIT License

@@ -23,5 +18,6 @@ Classifier: Operating System :: MacOS

Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering

@@ -34,1 +30,26 @@ Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence

Classifier: Intended Audience :: Science/Research
License-File: LICENSE.txt
Requires-Dist: numpy
Requires-Dist: scipy
Requires-Dist: scikit-learn>=0.18
Provides-Extra: test
Requires-Dist: pytest; extra == "test"
Requires-Dist: pytest-cov; extra == "test"
Requires-Dist: pandas; extra == "test"
Requires-Dist: pyarrow; extra == "test"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: license-file
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: summary
Python package for stacking (stacked generalization) featuring lightweight functional API and fully compatible scikit-learn API.
Convenient way to automate OOF computation, prediction and bagging using any number of models.
All details, FAQ, and tutorials: https://github.com/vecxoz/vecstack
[egg_info]
tag_build =
tag_date = 0
tag_svn_revision = 0

@@ -8,6 +8,7 @@ #! /usr/bin/env python

Convenient way to automate OOF computation, prediction and bagging using any number of models.
All details, FAQ, and tutorials: https://github.com/vecxoz/vecstack
'''
setup(name='vecstack',
version='0.4.0',
version='0.5.0',
description='Python package for stacking (machine learning technique)',

@@ -23,5 +24,6 @@ long_description=long_desc,

'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Topic :: Scientific/Engineering',

@@ -46,4 +48,10 @@ 'Topic :: Scientific/Engineering :: Artificial Intelligence',

],
test_suite='nose.collector',
tests_require=['nose'],
extras_require={
'test': [
'pytest',
'pytest-cov',
'pandas',
'pyarrow'
]
},
zip_safe=False)

@@ -1,4 +0,4 @@

Metadata-Version: 1.1
Metadata-Version: 2.4
Name: vecstack
Version: 0.4.0
Version: 0.5.0
Summary: Python package for stacking (machine learning technique)

@@ -9,8 +9,3 @@ Home-page: https://github.com/vecxoz/vecstack

License: MIT
Description:
Python package for stacking (stacked generalization) featuring lightweight functional API and fully compatible scikit-learn API.
Convenient way to automate OOF computation, prediction and bagging using any number of models.
Keywords: stacking,blending,bagging,ensemble,ensembling,machine learning
Platform: UNKNOWN
Classifier: License :: OSI Approved :: MIT License

@@ -23,5 +18,6 @@ Classifier: Operating System :: MacOS

Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering

@@ -34,1 +30,26 @@ Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence

Classifier: Intended Audience :: Science/Research
License-File: LICENSE.txt
Requires-Dist: numpy
Requires-Dist: scipy
Requires-Dist: scikit-learn>=0.18
Provides-Extra: test
Requires-Dist: pytest; extra == "test"
Requires-Dist: pytest-cov; extra == "test"
Requires-Dist: pandas; extra == "test"
Requires-Dist: pyarrow; extra == "test"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: license-file
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: summary
Python package for stacking (stacked generalization) featuring lightweight functional API and fully compatible scikit-learn API.
Convenient way to automate OOF computation, prediction and bagging using any number of models.
All details, FAQ, and tutorials: https://github.com/vecxoz/vecstack
numpy
scipy
scikit-learn>=0.18
[test]
pytest
pytest-cov
pandas
pyarrow

@@ -0,2 +1,11 @@

LICENSE.txt
README.md
pyproject.toml
setup.py
tests/test_func_api_classification_binary.py
tests/test_func_api_classification_multiclass.py
tests/test_func_api_regression.py
tests/test_sklearn_api_classification_binary.py
tests/test_sklearn_api_classification_multiclass.py
tests/test_sklearn_api_regression.py
vecstack/__init__.py

@@ -3,0 +12,0 @@ vecstack/core.py

@@ -11,3 +11,3 @@ """Python package for stacking (machine learning technique)

Copyright (c) 2016-2018 Igor Ivanov
Copyright (c) 2016-2025 Igor Ivanov
Email: vecxoz@gmail.com

@@ -42,3 +42,3 @@

__license__ = 'MIT'
__version__ = '0.4.0'
__version__ = '0.5.0'

@@ -45,0 +45,0 @@ __all__ = ['stacking', 'StackingTransformer']

@@ -9,3 +9,3 @@ """Functional API for stacking.

Copyright (c) 2016-2018 Igor Ivanov
Copyright (c) 2016-2025 Igor Ivanov
Email: vecxoz@gmail.com

@@ -155,9 +155,10 @@

X_test : numpy array or sparse matrix of N-dim shape, e.g. 2-dim [n_test_samples, n_features]
X_test : numpy array or sparse matrix of N-dim shape, e.g. 2-dim [n_test_samples, n_features], or None
Test data
Note: X_test can be set to None when mode='oof'
sample_weight : numpy array of shape [n_train_samples]
sample_weight : numpy array of shape [n_train_samples], default None
Individual weights for each sample (passed to fit method of the model).
Note: sample_weight has length of full training set X_train and it would be
split automatically for each fold.
Note: sample_weight must have the same length as full training set X_train.
It will be split automatically for each fold.

@@ -193,3 +194,3 @@ regression : boolean, default True

Note: for detailes see terminology below
'oof' - return only oof
'oof' - return only oof. X_test can be set to None
'oof_pred' (alias 'B') - return oof and pred

@@ -259,2 +260,3 @@ 'oof_pred_bag' (alias 'A') - return oof and bagged pred

Random seed
Ignored if shuffle=False

@@ -308,3 +310,3 @@ verbose : int, default 0

----------
from sklearn.datasets import load_boston
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

@@ -318,31 +320,32 @@ from sklearn.metrics import mean_absolute_error

# Load demo data
boston = load_boston()
X, y = boston.data, boston.target
X, y = fetch_california_housing(return_X_y=True)
# Make train/test split
# As usual in machine learning task we have X_train, y_train, and X_test
X_train, X_test, y_train, y_test = train_test_split(X, y,
test_size = 0.2, random_state = 0)
X_train, X_test, y_train, y_test = train_test_split(X, y,
test_size=0.2,
random_state=0)
# Caution! All models and parameter values are just
# Caution! All models and parameter values are just
# demonstrational and shouldn't be considered as recommended.
# Initialize 1-st level models.
models = [
ExtraTreesRegressor(random_state = 0, n_jobs = -1,
n_estimators = 100, max_depth = 3),
ExtraTreesRegressor(random_state=0, n_jobs=-1,
n_estimators=100, max_depth=3),
RandomForestRegressor(random_state = 0, n_jobs = -1,
n_estimators = 100, max_depth = 3),
RandomForestRegressor(random_state=0, n_jobs=-1,
n_estimators=100, max_depth=3),
XGBRegressor(seed = 0, n_jobs = -1, learning_rate = 0.1,
n_estimators = 100, max_depth = 3)]
XGBRegressor(random_state=0, n_jobs=-1, learning_rate=0.1,
n_estimators=100, max_depth=3)]
# Compute stacking features
S_train, S_test = stacking(models, X_train, y_train, X_test,
regression = True, metric = mean_absolute_error, n_folds = 4,
shuffle = True, random_state = 0, verbose = 2)
S_train, S_test = stacking(models, X_train, y_train, X_test,
regression=True, metric=mean_absolute_error,
n_folds=4, shuffle=True,
random_state=0, verbose=2)
# Initialize 2-nd level model
model = XGBRegressor(seed = 0, n_jobs = -1, learning_rate = 0.1,
n_estimators = 100, max_depth = 3)
model = XGBRegressor(random_state=0, n_jobs=-1, learning_rate=0.1,
n_estimators=100, max_depth=3)

@@ -370,31 +373,32 @@ # Fit 2-nd level model

# Load demo data
iris = load_iris()
X, y = iris.data, iris.target
X, y = load_iris(return_X_y=True)
# Make train/test split
# As usual in machine learning task we have X_train, y_train, and X_test
X_train, X_test, y_train, y_test = train_test_split(X, y,
test_size = 0.2, random_state = 0)
X_train, X_test, y_train, y_test = train_test_split(X, y,
test_size=0.2,
random_state=0)
# Caution! All models and parameter values are just
# Caution! All models and parameter values are just
# demonstrational and shouldn't be considered as recommended.
# Initialize 1-st level models.
models = [
ExtraTreesClassifier(random_state = 0, n_jobs = -1,
n_estimators = 100, max_depth = 3),
ExtraTreesClassifier(random_state=0, n_jobs=-1,
n_estimators=100, max_depth=3),
RandomForestClassifier(random_state = 0, n_jobs = -1,
n_estimators = 100, max_depth = 3),
RandomForestClassifier(random_state=0, n_jobs=-1,
n_estimators=100, max_depth=3),
XGBClassifier(seed = 0, n_jobs = -1, learning_rate = 0.1,
n_estimators = 100, max_depth = 3)]
XGBClassifier(seed=0, n_jobs=-1, learning_rate=0.1,
n_estimators=100, max_depth=3)]
# Compute stacking features
S_train, S_test = stacking(models, X_train, y_train, X_test,
regression = False, metric = accuracy_score, n_folds = 4,
stratified = True, shuffle = True, random_state = 0, verbose = 2)
S_train, S_test = stacking(models, X_train, y_train, X_test,
regression=False, metric=accuracy_score,
n_folds=4, stratified=True, shuffle=True,
random_state=0, verbose=2)
# Initialize 2-nd level model
model = XGBClassifier(seed = 0, n_jobs = -1, learning_rate = 0.1,
n_estimators = 100, max_depth = 3)
model = XGBClassifier(seed=0, n_jobs=-1, learning_rate=0.1,
n_estimators=100, max_depth=3)

@@ -416,2 +420,5 @@ # Fit 2-nd level model

raise ValueError('List of models is empty')
# X_test can be None only if mode='oof'
if X_test is None and mode != 'oof':
raise ValueError("X_test can be None only if mode='oof'")
# Check arrays

@@ -422,4 +429,4 @@ # y_train and sample_weight must be 1d ndarrays (i.e. row, not column)

accept_sparse=['csr'], # allow csr and cast all other sparse types to csr
force_all_finite=False, # allow nan and inf because
# some models (xgboost) can handle
ensure_all_finite=False, # allow nan and inf because
# some models (xgboost) can handle
allow_nd=True,

@@ -432,4 +439,4 @@ multi_output=False) # do not allow several columns in y_train

allow_nd=True,
force_all_finite=False) # allow nan and inf because
# some models (xgboost) can handle
ensure_all_finite=False) # allow nan and inf because
# some models (xgboost) can handle
if sample_weight is not None:

@@ -458,2 +465,6 @@ sample_weight = np.array(sample_weight).ravel()

shuffle = bool(shuffle)
# <random_state>
# To comply with sklearn requirement
if not shuffle:
random_state = None
# <verbose>

@@ -679,3 +690,6 @@ if verbose not in [0, 1, 2]:

# Save OOF
np.save(full_path, np.array([S_train, S_test]))
array_to_save = np.empty(2, dtype='object')
array_to_save[0] = S_train
array_to_save[1] = S_test
np.save(full_path, array_to_save)

@@ -682,0 +696,0 @@ # Save log

@@ -9,3 +9,3 @@ """Scikit-learn compatible API for stacking.

Copyright (c) 2016-2018 Igor Ivanov
Copyright (c) 2016-2025 Igor Ivanov
Email: vecxoz@gmail.com

@@ -42,2 +42,3 @@

import warnings
from contextlib import suppress
import numpy as np

@@ -54,2 +55,3 @@ import scipy.stats as st

from sklearn.utils.validation import has_fit_parameter
from sklearn.utils.validation import validate_data
from sklearn.model_selection import KFold

@@ -66,3 +68,3 @@ from sklearn.model_selection import StratifiedKFold

class StackingTransformer(BaseEstimator, TransformerMixin):
class StackingTransformer(TransformerMixin, BaseEstimator):
"""StackingTransformer. Scikit-learn compatible API for stacking.

@@ -156,2 +158,3 @@

Same seed and correspondingly same split is used for all estimators.
Ignored if ``shuffle=False``

@@ -208,3 +211,3 @@ verbose : int, default 0

--------
>>> from sklearn.datasets import load_boston
>>> from sklearn.datasets import fetch_california_housing
>>> from sklearn.model_selection import train_test_split

@@ -217,4 +220,3 @@ >>> from sklearn.metrics import mean_absolute_error

>>> # Load demo data
>>> boston = load_boston()
>>> X, y = boston.data, boston.target
>>> X, y = fetch_california_housing(return_X_y=True)
>>>

@@ -292,2 +294,13 @@ >>> # Make train/test split

def __sklearn_tags__(self):
tags = super().__sklearn_tags__()
tags.estimator_type = 'transformer'
tags.transformer_tags.preserves_dtype = []
tags.target_tags.required = True
tags.input_tags.sparse = True
return tags
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
def fit(self, X, y, sample_weight=None):

@@ -319,8 +332,18 @@ """Fit all base estimators.

# ---------------------------------------------------------------------
# Check data and set `self.n_features_in_` and `self.feature_names_in_`
X, y = validate_data(self, X, y,
reset=True, # default: True, if True will set `self.n_features_in_` and `self.feature_names_in_`
validate_separately=False, # default: False, if False will use `check_X_y`
skip_check_array=False, # default: False, if False will NOT skip checks
accept_sparse=['csr'],
ensure_all_finite=True,
multi_output=False)
# Legacy check included in `validate_data`
# Check X and y
# ``check_estimator`` does not allow ``force_all_finite=False``
X, y = check_X_y(X, y,
accept_sparse=['csr'], # allow csr, cast all others to csr
force_all_finite=True, # do not allow nan and inf
multi_output=False) # allow only one column in y_train
# ``check_estimator`` does not allow ``ensure_all_finite=False``
# X, y = check_X_y(X, y,
# accept_sparse=['csr'], # allow csr, cast all others to csr
# ensure_all_finite=True, # do not allow nan and inf
# multi_output=False) # allow only one column in y_train

@@ -332,3 +355,3 @@ # Check X and sample_weight

accept_sparse=['csr'],
force_all_finite=True,
ensure_all_finite=True,
multi_output=False)

@@ -394,2 +417,8 @@

# To comply with sklearn requirement
if not self.shuffle:
random_state_internal = None
else:
random_state_internal = self.random_state
# ---------------------------------------------------------------------

@@ -446,3 +475,3 @@ # Compute attributes (basic properties of data, number of estimators, etc.)

shuffle=self.shuffle,
random_state=self.random_state)
random_state=random_state_internal)
# Save target to be able to create stratified split in ``transform`` method

@@ -454,3 +483,3 @@ # This is more efficient than to save split indices

shuffle=self.shuffle,
random_state=self.random_state)
random_state=random_state_internal)
self._y_ = None

@@ -647,5 +676,14 @@

# Check data without resetting `self.n_features_in_` and `self.feature_names_in_`
X = validate_data(self, X,
reset=False, # default: True, if True will set `self.n_features_in_` and `self.feature_names_in_`
validate_separately=False, # default: False, if False will use `check_X_y`
skip_check_array=False, # default: False, if False will NOT skip checks
accept_sparse=['csr'],
ensure_all_finite=True) # no need for `multi_output`, because no `y`
# Legacy check included in `validate_data`
# Input validation
# ``check_estimator`` does not allow ``force_all_finite=False``
X = check_array(X, accept_sparse=['csr'], force_all_finite=True)
# ``check_estimator`` does not allow ``ensure_all_finite=False``
# X = check_array(X, accept_sparse=['csr'], ensure_all_finite=True)

@@ -734,5 +772,6 @@ # *********************************************************************

else:
# Legacy check included in `validate_data`
# Check n_features
if X.shape[1] != self.n_features_:
raise ValueError('Inconsistent number of features.')
# if X.shape[1] != self.n_features_:
# raise ValueError('Inconsistent number of features.')

@@ -963,14 +1002,24 @@ # Create empty numpy array for test predictions

def _get_params(self, attr, deep=True):
"""Gives ability to get parameters of nested estimators
"""
out = super(StackingTransformer, self).get_params(deep=False)
Gives ability to get parameters of nested estimators
"""
out = super().get_params(deep=deep)
if not deep:
return out
estimators = getattr(self, attr)
if estimators is None:
try:
out.update(estimators)
except (TypeError, ValueError):
# Ignore TypeError for cases where estimators is not a list of
# (name, estimator) and ignore ValueError when the list is not
# formatted correctly. This is to prevent errors when calling
# `set_params`. `BaseEstimator.set_params` calls `get_params` which
# can error for invalid values for `estimators`.
return out
out.update(estimators)
for name, estimator in estimators:
for key, value in iter(estimator.get_params(deep=True).items()):
out['%s__%s' % (name, key)] = value
if hasattr(estimator, 'get_params'):
for key, value in estimator.get_params(deep=True).items():
out['%s__%s' % (name, key)] = value
return out

@@ -982,3 +1031,4 @@

def get_params(self, deep=True):
"""Get parameters of StackingTransformer and base estimators.
"""
Get parameters of StackingTransformer and base estimators.

@@ -990,2 +1040,8 @@ Parameters

If True - get parameters of StackingTransformer and base estimators
Returns
-------
params : dict
Parameter and estimator names mapped to their values or parameter
names mapped to their values.
"""

@@ -997,2 +1053,69 @@ return self._get_params('estimators', deep=deep)

def _set_params(self, attr, **params):
"""
Gives ability to set parameters of nested estimators,
and replace individual estimators in the list.
"""
# Ensure strict ordering of parameter setting:
# 1. Replace the entire estimators collection
if attr in params:
setattr(self, attr, params.pop(attr))
# 2. Replace individual estimators by name
items = getattr(self, attr)
if isinstance(items, list) and items:
# Get item names used to identify valid names in params
# `zip` raises a TypeError when `items` does not contains
# elements of length 2
with suppress(TypeError):
item_names, _ = zip(*items)
for name in list(params.keys()):
if '__' not in name and name in item_names:
self._replace_estimator(attr, name, params.pop(name))
# 3. Individual estimator parameters and other initialisation arguments
super().set_params(**params)
return self
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
def _replace_estimator(self, attr, name, new_val):
"""
Replace estimator, assuming `name` is a valid estimator name
"""
new_estimators = list(getattr(self, attr))
for i, (estimator_name, _) in enumerate(new_estimators):
if estimator_name == name:
new_estimators[i] = (name, new_val)
break
setattr(self, attr, new_estimators)
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
def set_params(self, **params):
"""
Set parameters of StackingTransformer and base estimators.
Valid parameter keys can be listed with `get_params()`. Note that you
can directly set the parameters of the estimators contained in `estimators`.
Parameters
----------
**params : keyword arguments
Specific parameters using e.g. `set_params(parameter_name=new_value)`.
In addition, to setting the parameters of the estimator,
the individual estimator of the estimators can also be set.
Dropping individual estimators using 'drop' is not supported.
Returns
-------
self : object
Estimator instance.
"""
self._set_params('estimators', **params)
return self
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
def _validate_names(self, names):

@@ -1034,3 +1157,3 @@ """Validates estimator names

# Input validation
X = check_array(X, accept_sparse=['csr'], force_all_finite=True)
X = check_array(X, accept_sparse=['csr'], ensure_all_finite=True)
return self._check_identity(X)

@@ -1037,0 +1160,0 @@

Sorry, the diff of this file is not supported yet