#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using std::cout; using std::endl; using namespace std; #define STRING_BUF_SIZE 204800 class ThreadPool { public: ThreadPool(size_t); template auto enqueue(F&& f, Args&&... args) ->std::future::type>; ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition; bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for (size_t i = 0;i < threads;++i) workers.emplace_back( [this] { for (;;) { std::function task; { std::unique_lock lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } ); } // add new work item to the pool template auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future::type> { using return_type = typename std::result_of::type; auto task = std::make_shared< std::packaged_task >( std::bind(std::forward(f), std::forward(args)...) ); std::future res = task->get_future(); { std::unique_lock lock(queue_mutex); // don't allow enqueueing after stopping the pool if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) worker.join(); } /* 读取一维double数据 */ void Read1DDouble(const mxArray* pMxArray, vector& vDat) { int rowNum, colNum; double* matData; rowNum = (int)mxGetM(pMxArray); colNum = (int)mxGetN(pMxArray); // cout << rowNum << " " << colNum << endl; matData = (double*)mxGetData(pMxArray); //获取指针 vDat.resize(rowNum * colNum); for (int i = 0; i < vDat.size(); ++i) vDat[i] = matData[i]; } /* 读取二维double数据 */ void Read2DDouble(const mxArray* pMxArray, vector>& vvDat) { int rowNum, colNum; double* matData; rowNum = (int)mxGetM(pMxArray); colNum = (int)mxGetN(pMxArray); vvDat.resize(rowNum); matData = (double*)mxGetData(pMxArray); //获取指针 for (int i = 0; i < rowNum; ++i) { vvDat[i].resize(colNum); for (int j = 0; j < colNum; ++j) { vvDat[i][j] = matData[j * rowNum + i]; } } } // 将结果写入mxArray, 作为后续的返回值 mxArray* writeToMatDouble(const double* data, int rowNum, int colNum) { mxArray* pWriteArray = NULL;//matlab格式矩阵 int len = rowNum * colNum; //创建一个rowNum*colNum的矩阵 pWriteArray = mxCreateDoubleMatrix(rowNum, colNum, mxREAL); //把data的值赋给pWriteArray指针 memcpy((void*)(mxGetPr(pWriteArray)), (void*)data, sizeof(double) * len); return pWriteArray; // 赋值给返回值 } #define START_IDX(tid, threadNum, arrLen) (arrLen) * (tid) / (threadNum) #define END_IDX(tid, threadNum, arrLen) (arrLen) * (tid + 1) / (threadNum) // 线程参数 struct TPRandSim { vector>* pvvSum; vector>* pvvSqSum; vector>* pvvRandPos; vector* pvIx; vector* pvCateNum; vector>* pvvRound; vector>* pvvX; int loopNum; int maxCategoryNum; int tid; int numThread; }; // 多线程入口函数 void ThreadRandSim(TPRandSim param) { vector>& vvSum = *param.pvvSum; vector>& vvSqSum = *param.pvvSqSum; vector>& vvRandPos = *param.pvvRandPos; vector& vIx = *param.pvIx; vector& vCateNum = *param.pvCateNum; vector>& vvRound = *param.pvvRound; vector>& vvX = *param.pvvX; int tid = param.tid; int numThread = param.numThread; int loopNum = param.loopNum; int maxCategoryNum = param.maxCategoryNum; int rowNum = vvX.size(); int colNum = vvX[0].size(); clock_t begin = clock(), mid = clock(), finish; /* 随机模拟 */ int startIdx = START_IDX(tid, numThread, loopNum); int endIdx = END_IDX(tid, numThread, loopNum); // if (tid == 0) cout << startIdx << '\t' << endIdx << endl; for (int idx = startIdx; idx < endIdx; ++idx) { // 模拟轮次 // if (tid == 0 && idx % 100 == 0) { // finish = clock(); // cout << idx << ": time: " << (double)(finish - mid) / CLOCKS_PER_SEC << " s" << endl << flush; // mid = finish; // } auto& vRandPos = vvRandPos[idx]; for (int i = 0; i < rowNum; ++i) { const int hRowIdx = vRandPos[i]; // 随机打乱之后的行索引 const int cateIdx = vIx[hRowIdx] - 1; // 聚类的编号 //if (cateIdx == 0) { auto& vRound = vvRound[cateIdx]; for (int j = 0; j < colNum; ++j) { vRound[j] += vvX[i][j]; } //} } for (int c = 0; c < maxCategoryNum; ++c) { auto& vRound = vvRound[c]; const int numPositive = vCateNum[c]; for (int j = 0; j < colNum; ++j) { const double val = vRound[j] / numPositive; vvSum[c][j] += val; vvSqSum[c][j] += val * val; } } for (auto& vRound : vvRound) for (auto &val : vRound) val = 0; } finish = clock(); // cout << tid << ": Random simulation time: " << (double)(finish - begin) / CLOCKS_PER_SEC << " s" << endl << flush; } /* 多线程进行shuffle操作 */ struct TPShuffle { vector>* pvvRandPos; int loopNum; int tid; int numThread; }; void ThreadShuffle(TPShuffle param) { clock_t begin = clock(), finish; vector>& vvRandPos = *param.pvvRandPos; int tid = param.tid; int numThread = param.numThread; int loopNum = param.loopNum; std::random_device rd; int startIdx = START_IDX(tid, numThread, loopNum); int endIdx = END_IDX(tid, numThread, loopNum); for (int roundIdx = startIdx; roundIdx < endIdx; ++roundIdx) { vector& vRandPos = vvRandPos[roundIdx]; for (int i = 0; i < vRandPos.size(); ++i) vRandPos[i] = i; std::shuffle(vRandPos.begin(), vRandPos.end(), std::default_random_engine(rd())); } finish = clock(); // cout << tid << ": thread shuffle time: " << (double)(finish - begin) / CLOCKS_PER_SEC << " s" << endl << flush; } /* 入口函数 */ /* 三个参数,一个返回值 输入: 1. x 二维数据,double类型,行数为文献数量,列数为字典长度(每个单词在所有文献中出现的次数超过5) 2. h 长度为文献个数,值为1代表该文献属于该知识颗粒(应该是),为0则不属于 3. numThread 4. loopNum 输出: vs z score,显著性指数,一维 ps 与vs长度一致 */ void mexFunction(int nlhs, mxArray* plhs[], int nrhs, const mxArray* prhs[]) { if (nrhs < 2) { cout << "At least 2 arguments should be given for this function!" << endl; return; } clock_t begin = clock(), mid, finish; vector vIx; // 类别 vector> vvX; Read2DDouble(prhs[0], vvX); Read1DDouble(prhs[1], vIx); int rowNum = vvX.size(); int colNum = vvX[0].size(); // 查找最大的类别编号 int maxCategoryNum = 1; for (auto category : vIx) if (maxCategoryNum < category) maxCategoryNum = category; int numThread = 1; int loopNum = 1000; if (nrhs > 2) { double* pNumThread = (double*)mxGetData(prhs[2]); numThread = (int)pNumThread[0]; if (numThread < 1) numThread = 1; } if (nrhs > 3) { double* pLoopNum = (double*)mxGetData(prhs[3]); loopNum = (int)pLoopNum[0]; if (loopNum < 1000) loopNum = 1000; } // 线程中用到的数据 vector vCateNum(maxCategoryNum); // 每个类别包含的个数 vector> vvTs(maxCategoryNum, vector(colNum)); //记录各个类别的行的行均值 vector>> vvvSum(numThread, vector>(maxCategoryNum, vector(colNum))); vector>> vvvSqSum(numThread, vector>(maxCategoryNum, vector(colNum))); vector>> vvvRound(numThread, vector>(maxCategoryNum, vector(colNum))); finish = clock(); cout << "Load Data time: " << (double)(finish - begin) / CLOCKS_PER_SEC << " s" << endl << flush; // cout << numThread << '\t' << loopNum << endl; // maxCategoryNum = 1; /* 计算实际分类的均值 */ mid = clock(); for (int i = 0; i < rowNum; ++i) { const int cateIdx = vIx[i] - 1; vCateNum[cateIdx] ++; for (int j = 0; j < colNum; ++j) vvTs[cateIdx][j] += vvX[i][j]; } for (int i = 0; i < maxCategoryNum; ++i) { for (int j = 0; j < colNum; ++j) { vvTs[i][j] /= vCateNum[i]; } } // for (auto c : vCateNum) cout << c << endl; // 先把loopNum次随机shuffle做了 mid = clock(); vector> vvRandPos(loopNum, vector(rowNum)); vector vTHShuffle; for (int i = 0; i < numThread; ++i) { TPShuffle param = { &vvRandPos, loopNum, i, numThread }; vTHShuffle.push_back(std::thread(ThreadShuffle, param)); } for (auto& t : vTHShuffle) t.join(); finish = clock(); cout << "Shuffle time: " << (double)(finish - begin) / CLOCKS_PER_SEC << " s" << endl << flush; vector vTHRandSim; for (int i = 0; i < numThread; ++i) { TPRandSim tParam = { &vvvSum[i], &vvvSqSum[i], &vvRandPos, &vIx, &vCateNum, &vvvRound[i], & vvX, loopNum, maxCategoryNum, i, numThread}; vTHRandSim.push_back(std::thread(ThreadRandSim, tParam)); } for (auto& t : vTHRandSim) t.join(); // 合并所有线程的数据 auto& vvSum = vvvSum[0]; auto& vvSqSum = vvvSqSum[0]; for (int t = 1; t < numThread; ++t) { for (int i = 0; i < maxCategoryNum; ++i) { for (int j = 0; j < colNum; ++j) { vvSum[i][j] += vvvSum[t][i][j]; vvSqSum[i][j] += vvvSqSum[t][i][j]; } } } finish = clock(); cout << "Random simulation time: " << (double)(finish - mid) / CLOCKS_PER_SEC << " s" << endl << flush; /* 计算结果 */ vector> vvVs(maxCategoryNum, vector(colNum)); // 按列计算平均值 vector> vvMean(maxCategoryNum, vector(colNum)); vector> vvStd(maxCategoryNum, vector(colNum)); for (int c = 0; c < maxCategoryNum; ++c) { auto& vMean = vvMean[c]; auto& vStd = vvStd[c]; auto& vVs = vvVs[c]; auto& vSum = vvSum[c]; auto& vSqSum = vvSqSum[c]; auto& vTs = vvTs[c]; for (int i = 0; i < colNum; ++i) vMean[i] = vSum[i] / loopNum; for (int i = 0; i < colNum; ++i) { const double meanVal = vSum[i] / loopNum; vMean[i] = meanVal; // 均值 const double sqDiff = vSqSum[i] + loopNum * meanVal * meanVal - 2 * meanVal * vSum[i]; vStd[i] = sqrt(sqDiff / (loopNum - 1)); // 均方根 } // 计算vs for (int i = 0; i < vVs.size(); ++i) { vVs[i] = (vTs[i] - vMean[i]) / vStd[i]; } } // auto& vVs = vvVs[0]; // ofstream ofs("d:\\result_new.txt"); // for (int i = 0; i < colNum; ++i) { // ofs << vVs[i] << endl; // } // ofs.close(); /* 写入结果 */ if (nlhs > 0) { // vs vector vVsData(maxCategoryNum* colNum); for (int i = 0; i < maxCategoryNum; ++i) { for (int j = 0; j < colNum; ++j) { vVsData[j * maxCategoryNum + i] = vvVs[i][j]; } } plhs[0] = writeToMatDouble(vVsData.data(), maxCategoryNum, colNum); } finish = clock(); cout << "All Cluster Random simulation Total time: " << (double)(finish - begin) / CLOCKS_PER_SEC << " s" << endl << flush; } // 供c++调试用 void mexFunctionWrap(int nlhs, mxArray* plhs[], int nrhs, const mxArray* prhs[]) { return mexFunction(nlhs, plhs, nrhs, prhs); }