twirls/MexFunc/AllClusterRandSim.cpp

415 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#include <mex.h>
#include <mat.h>
#include <iostream>
#include <algorithm>
#include <string>
#include <unordered_set>
#include <ctime>
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <unordered_map>
#include <set>
#include <fstream>
#include <random>
#include <cmath>
#include <stdlib.h>
#include <limits.h>
#include <atomic>
using std::cout;
using std::endl;
using namespace std;
#define STRING_BUF_SIZE 204800
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
->std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for (size_t i = 0;i < threads;++i)
workers.emplace_back(
[this]
{
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)
worker.join();
}
/* <20><>ȡһάdouble<6C><65><EFBFBD><EFBFBD> */
void Read1DDouble(const mxArray* pMxArray, vector<double>& vDat) {
int rowNum, colNum;
double* matData;
rowNum = (int)mxGetM(pMxArray);
colNum = (int)mxGetN(pMxArray);
// cout << rowNum << " " << colNum << endl;
matData = (double*)mxGetData(pMxArray); //<2F><>ȡָ<C8A1><D6B8>
vDat.resize(rowNum * colNum);
for (int i = 0; i < vDat.size(); ++i) vDat[i] = matData[i];
}
/* <20><>ȡ<EFBFBD><C8A1>άdouble<6C><65><EFBFBD><EFBFBD> */
void Read2DDouble(const mxArray* pMxArray, vector<vector<double>>& vvDat) {
int rowNum, colNum;
double* matData;
rowNum = (int)mxGetM(pMxArray);
colNum = (int)mxGetN(pMxArray);
vvDat.resize(rowNum);
matData = (double*)mxGetData(pMxArray); //<2F><>ȡָ<C8A1><D6B8>
for (int i = 0; i < rowNum; ++i) {
vvDat[i].resize(colNum);
for (int j = 0; j < colNum; ++j) {
vvDat[i][j] = matData[j * rowNum + i];
}
}
}
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>д<EFBFBD><D0B4>mxArray, <20><>Ϊ<EFBFBD><CEAA><EFBFBD><EFBFBD><EFBFBD>ķ<EFBFBD><C4B7><EFBFBD>ֵ
mxArray* writeToMatDouble(const double* data, int rowNum, int colNum) {
mxArray* pWriteArray = NULL;//matlab<61><62>ʽ<EFBFBD><CABD><EFBFBD><EFBFBD>
int len = rowNum * colNum;
//<2F><><EFBFBD><EFBFBD>һ<EFBFBD><D2BB>rowNum*colNum<75>ľ<EFBFBD><C4BE><EFBFBD>
pWriteArray = mxCreateDoubleMatrix(rowNum, colNum, mxREAL);
//<2F><>data<74><61>ֵ<EFBFBD><D6B5><EFBFBD><EFBFBD>pWriteArrayָ<79><D6B8>
memcpy((void*)(mxGetPr(pWriteArray)), (void*)data, sizeof(double) * len);
return pWriteArray; // <20><>ֵ<EFBFBD><D6B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֵ
}
#define START_IDX(tid, threadNum, arrLen) (arrLen) * (tid) / (threadNum)
#define END_IDX(tid, threadNum, arrLen) (arrLen) * (tid + 1) / (threadNum)
// <20>̲߳<DFB3><CCB2><EFBFBD>
struct TPRandSim {
vector<vector<double>>* pvvSum;
vector<vector<double>>* pvvSqSum;
vector<vector<int>>* pvvRandPos;
vector<double>* pvIx;
vector<int>* pvCateNum;
vector<vector<double>>* pvvRound;
vector<vector<double>>* pvvX;
int loopNum;
int maxCategoryNum;
int tid;
int numThread;
};
// <20><><EFBFBD>߳<EFBFBD><DFB3><EFBFBD><EFBFBD>ں<EFBFBD><DABA><EFBFBD>
void ThreadRandSim(TPRandSim param) {
vector<vector<double>>& vvSum = *param.pvvSum;
vector<vector<double>>& vvSqSum = *param.pvvSqSum;
vector<vector<int>>& vvRandPos = *param.pvvRandPos;
vector<double>& vIx = *param.pvIx;
vector<int>& vCateNum = *param.pvCateNum;
vector<vector<double>>& vvRound = *param.pvvRound;
vector<vector<double>>& vvX = *param.pvvX;
int tid = param.tid;
int numThread = param.numThread;
int loopNum = param.loopNum;
int maxCategoryNum = param.maxCategoryNum;
int rowNum = vvX.size();
int colNum = vvX[0].size();
clock_t begin = clock(), mid = clock(), finish;
/* <20><><EFBFBD><EFBFBD>ģ<EFBFBD><C4A3> */
int startIdx = START_IDX(tid, numThread, loopNum);
int endIdx = END_IDX(tid, numThread, loopNum);
// if (tid == 0) cout << startIdx << '\t' << endIdx << endl;
for (int idx = startIdx; idx < endIdx; ++idx) { // ģ<><C4A3><EFBFBD>ִ<EFBFBD>
// if (tid == 0 && idx % 100 == 0) {
// finish = clock();
// cout << idx << ": time: " << (double)(finish - mid) / CLOCKS_PER_SEC << " s" << endl << flush;
// mid = finish;
// }
auto& vRandPos = vvRandPos[idx];
for (int i = 0; i < rowNum; ++i) {
const int hRowIdx = vRandPos[i]; // <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>֮<EFBFBD><D6AE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
const int cateIdx = vIx[hRowIdx] - 1; // <20><><EFBFBD><EFBFBD><EFBFBD>ı<EFBFBD><C4B1><EFBFBD>
//if (cateIdx == 0) {
auto& vRound = vvRound[cateIdx];
for (int j = 0; j < colNum; ++j) {
vRound[j] += vvX[i][j];
}
//}
}
for (int c = 0; c < maxCategoryNum; ++c) {
auto& vRound = vvRound[c];
const int numPositive = vCateNum[c];
for (int j = 0; j < colNum; ++j) {
const double val = vRound[j] / numPositive;
vvSum[c][j] += val;
vvSqSum[c][j] += val * val;
}
}
for (auto& vRound : vvRound) for (auto &val : vRound) val = 0;
}
finish = clock();
// cout << tid << ": Random simulation time: " << (double)(finish - begin) / CLOCKS_PER_SEC << " s" << endl << flush;
}
/* <20><><EFBFBD>߳̽<DFB3><CCBD><EFBFBD>shuffle<6C><65><EFBFBD><EFBFBD> */
struct TPShuffle {
vector<vector<int>>* pvvRandPos;
int loopNum;
int tid;
int numThread;
};
void ThreadShuffle(TPShuffle param) {
clock_t begin = clock(), finish;
vector<vector<int>>& vvRandPos = *param.pvvRandPos;
int tid = param.tid;
int numThread = param.numThread;
int loopNum = param.loopNum;
std::random_device rd;
int startIdx = START_IDX(tid, numThread, loopNum);
int endIdx = END_IDX(tid, numThread, loopNum);
for (int roundIdx = startIdx; roundIdx < endIdx; ++roundIdx) {
vector<int>& vRandPos = vvRandPos[roundIdx];
for (int i = 0; i < vRandPos.size(); ++i) vRandPos[i] = i;
std::shuffle(vRandPos.begin(), vRandPos.end(), std::default_random_engine(rd()));
}
finish = clock();
// cout << tid << ": thread shuffle time: " << (double)(finish - begin) / CLOCKS_PER_SEC << " s" << endl << flush;
}
/* <20><><EFBFBD>ں<EFBFBD><DABA><EFBFBD> */
/*
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֵ
<EFBFBD><EFBFBD><EFBFBD>
1. x <20><>ά<EFBFBD><CEAC><EFBFBD>ݣ<EFBFBD>double<6C><65><EFBFBD>ͣ<EFBFBD><CDA3><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD><CEAA><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>ֵ䳤<D6B5>ȣ<EFBFBD>ÿ<EFBFBD><C3BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>г<EFBFBD><D0B3>ֵĴ<D6B5><C4B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>5<EFBFBD><35>
2. h <20><><EFBFBD><EFBFBD>Ϊ<EFBFBD><CEAA><EFBFBD>׸<EFBFBD><D7B8><EFBFBD><EFBFBD><EFBFBD>ֵΪ1<CEAA><31><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڸ<EFBFBD>֪ʶ<D6AA><CAB6><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӧ<EFBFBD><D3A6><EFBFBD>ǣ<EFBFBD><C7A3><EFBFBD>Ϊ0<CEAA><30><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
3. numThread
4. loopNum
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
vs z score,<2C><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ָ<EFBFBD><D6B8><EFBFBD><EFBFBD>һά
ps <20><>vs<76><73><EFBFBD><EFBFBD>һ<EFBFBD><D2BB>
*/
void mexFunction(int nlhs, mxArray* plhs[], int nrhs, const mxArray* prhs[]) {
if (nrhs < 2) {
cout << "At least 2 arguments should be given for this function!" << endl;
return;
}
clock_t begin = clock(), mid, finish;
vector<double> vIx; // <20><><EFBFBD><EFBFBD>
vector<vector<double>> vvX;
Read2DDouble(prhs[0], vvX);
Read1DDouble(prhs[1], vIx);
int rowNum = vvX.size();
int colNum = vvX[0].size();
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
int maxCategoryNum = 1;
for (auto category : vIx) if (maxCategoryNum < category) maxCategoryNum = category;
int numThread = 1;
int loopNum = 1000;
if (nrhs > 2) {
double* pNumThread = (double*)mxGetData(prhs[2]);
numThread = (int)pNumThread[0];
if (numThread < 1) numThread = 1;
}
if (nrhs > 3) {
double* pLoopNum = (double*)mxGetData(prhs[3]);
loopNum = (int)pLoopNum[0];
if (loopNum < 1000) loopNum = 1000;
}
// <20>߳<EFBFBD><DFB3><EFBFBD><EFBFBD>õ<EFBFBD><C3B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
vector<int> vCateNum(maxCategoryNum); // ÿ<><C3BF><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ĸ<EFBFBD><C4B8><EFBFBD>
vector<vector<double>> vvTs(maxCategoryNum, vector<double>(colNum)); //<2F><>¼<EFBFBD><C2BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>е<EFBFBD><D0B5>о<EFBFBD>ֵ
vector<vector<vector<double>>> vvvSum(numThread, vector<vector<double>>(maxCategoryNum, vector<double>(colNum)));
vector<vector<vector<double>>> vvvSqSum(numThread, vector<vector<double>>(maxCategoryNum, vector<double>(colNum)));
vector<vector<vector<double>>> vvvRound(numThread, vector<vector<double>>(maxCategoryNum, vector<double>(colNum)));
finish = clock();
cout << "Load Data time: " << (double)(finish - begin) / CLOCKS_PER_SEC << " s" << endl << flush;
// cout << numThread << '\t' << loopNum << endl;
// maxCategoryNum = 1;
/* <20><><EFBFBD><EFBFBD>ʵ<EFBFBD>ʷ<EFBFBD><CAB7><EFBFBD><EFBFBD>ľ<EFBFBD>ֵ */
mid = clock();
for (int i = 0; i < rowNum; ++i) {
const int cateIdx = vIx[i] - 1;
vCateNum[cateIdx] ++;
for (int j = 0; j < colNum; ++j) vvTs[cateIdx][j] += vvX[i][j];
}
for (int i = 0; i < maxCategoryNum; ++i) {
for (int j = 0; j < colNum; ++j) {
vvTs[i][j] /= vCateNum[i];
}
}
// for (auto c : vCateNum) cout << c << endl;
// <20>Ȱ<EFBFBD>loopNum<75><6D><EFBFBD><EFBFBD><EFBFBD><EFBFBD>shuffle<6C><65><EFBFBD><EFBFBD>
mid = clock();
vector<vector<int>> vvRandPos(loopNum, vector<int>(rowNum));
vector<std::thread> vTHShuffle;
for (int i = 0; i < numThread; ++i) {
TPShuffle param = { &vvRandPos, loopNum, i, numThread };
vTHShuffle.push_back(std::thread(ThreadShuffle, param));
}
for (auto& t : vTHShuffle) t.join();
finish = clock();
cout << "Shuffle time: " << (double)(finish - begin) / CLOCKS_PER_SEC << " s" << endl << flush;
vector<std::thread> vTHRandSim;
for (int i = 0; i < numThread; ++i) {
TPRandSim tParam = { &vvvSum[i], &vvvSqSum[i], &vvRandPos, &vIx, &vCateNum, &vvvRound[i], & vvX, loopNum, maxCategoryNum, i, numThread};
vTHRandSim.push_back(std::thread(ThreadRandSim, tParam));
}
for (auto& t : vTHRandSim) t.join();
// <20>ϲ<EFBFBD><CFB2><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̵߳<DFB3><CCB5><EFBFBD><EFBFBD><EFBFBD>
auto& vvSum = vvvSum[0];
auto& vvSqSum = vvvSqSum[0];
for (int t = 1; t < numThread; ++t) {
for (int i = 0; i < maxCategoryNum; ++i) {
for (int j = 0; j < colNum; ++j) {
vvSum[i][j] += vvvSum[t][i][j];
vvSqSum[i][j] += vvvSqSum[t][i][j];
}
}
}
finish = clock();
cout << "Random simulation time: " << (double)(finish - mid) / CLOCKS_PER_SEC << " s" << endl << flush;
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
vector<vector<double>> vvVs(maxCategoryNum, vector<double>(colNum));
// <20><><EFBFBD>м<EFBFBD><D0BC><EFBFBD>ƽ<EFBFBD><C6BD>ֵ
vector<vector<double>> vvMean(maxCategoryNum, vector<double>(colNum));
vector<vector<double>> vvStd(maxCategoryNum, vector<double>(colNum));
for (int c = 0; c < maxCategoryNum; ++c) {
auto& vMean = vvMean[c];
auto& vStd = vvStd[c];
auto& vVs = vvVs[c];
auto& vSum = vvSum[c];
auto& vSqSum = vvSqSum[c];
auto& vTs = vvTs[c];
for (int i = 0; i < colNum; ++i) vMean[i] = vSum[i] / loopNum;
for (int i = 0; i < colNum; ++i) {
const double meanVal = vSum[i] / loopNum;
vMean[i] = meanVal; // <20><>ֵ
const double sqDiff = vSqSum[i] + loopNum * meanVal * meanVal - 2 * meanVal * vSum[i];
vStd[i] = sqrt(sqDiff / (loopNum - 1)); // <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
}
// <20><><EFBFBD><EFBFBD>vs
for (int i = 0; i < vVs.size(); ++i) {
vVs[i] = (vTs[i] - vMean[i]) / vStd[i];
}
}
// auto& vVs = vvVs[0];
// ofstream ofs("d:\\result_new.txt");
// for (int i = 0; i < colNum; ++i) {
// ofs << vVs[i] << endl;
// }
// ofs.close();
/* д<><D0B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
if (nlhs > 0) { // vs
vector<double> vVsData(maxCategoryNum* colNum);
for (int i = 0; i < maxCategoryNum; ++i) {
for (int j = 0; j < colNum; ++j) {
vVsData[j * maxCategoryNum + i] = vvVs[i][j];
}
}
plhs[0] = writeToMatDouble(vVsData.data(), maxCategoryNum, colNum);
}
finish = clock();
cout << "All Cluster Random simulation Total time: " << (double)(finish - begin) / CLOCKS_PER_SEC << " s" << endl << flush;
}
// <20><>c++<2B><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
void mexFunctionWrap(int nlhs, mxArray* plhs[], int nrhs, const mxArray* prhs[]) {
return mexFunction(nlhs, plhs, nrhs, prhs);
}