Skip to content

Commit

Permalink
parallel diff with -m & -s is run ok;
Browse files Browse the repository at this point in the history
  • Loading branch information
sisong committed Oct 8, 2022
1 parent 77238a9 commit c46c93d
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 38 deletions.
6 changes: 5 additions & 1 deletion hdiffz.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ static void printUsage(){
" matchScore>=0, DEFAULT -m-6, recommended bin: 0--4 text: 4--9 etc...\n"
" -s[-matchBlockSize]\n"
" all file load as Stream; fast;\n"
" requires O(oldFileSize*16/matchBlockSize+matchBlockSize*5)bytes of memory;\n"
" requires O(oldFileSize*16/matchBlockSize+matchBlockSize*5"
#if (_IS_USED_MULTITHREAD)
"*parallelThreadNumber"
#endif
")bytes of memory;\n"
" matchBlockSize>=4, DEFAULT -s-64, recommended 16,32,48,1k,64k,1m etc...\n"
"special options:\n"
" -block[-fastMatchBlockSize] \n"
Expand Down
13 changes: 5 additions & 8 deletions libHDiffPatch/HDiff/diff.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ static void search_and_dispose_cover(std::vector<TOldCover>& covers,const TDiffD
static void _search_and_dispose_cover_MT(std::vector<TOldCover>* _covers,const TDiffData* _diff,
const TSuffixString* sstring,int kMinSingleMatchScore,
size_t workCount,size_t* pworkIndex){
const size_t kPartPepeatSize=1024;
const size_t kPartPepeatSize=1024*2;
std::vector<TOldCover>& covers=*_covers;
const TDiffData& diff=*_diff;
std::atomic<size_t>& workIndex=*(std::atomic<size_t>*)pworkIndex;
Expand All @@ -700,14 +700,13 @@ static void search_and_dispose_cover(std::vector<TOldCover>& covers,const TDiffD
}
}
#endif
#include "../../_clock_for_demo.h"

static void search_and_dispose_cover_MT(std::vector<TOldCover>& covers,const TDiffData& diff,
const TSuffixString& sstring,int kMinSingleMatchScore,
TDiffLimit* diffLimit=0,size_t threadNum=1){
double t0=clock_s();
TDiffLimit* diffLimit=0,size_t threadNum=1){
#if (_IS_USED_MULTITHREAD)
const size_t kMinParallelSize=1024*64;
const size_t kBestParallelSize=1024*1024*16;
const size_t kMinParallelSize=1024*1024*2;
const size_t kBestParallelSize=1024*1024*8;
size_t newSize=diff.newData_end-diff.newData;
if ((threadNum>1)&&(diffLimit==0)&&(diff.oldData!=diff.oldData_end)&&(newSize>=kMinParallelSize)){
const size_t maxThreanNum=newSize/(kMinParallelSize/2);
Expand All @@ -733,8 +732,6 @@ double t0=clock_s();
{
search_and_dispose_cover(covers,diff,sstring,kMinSingleMatchScore,diffLimit);
}
double t1=clock_s();
printf("search_and_dispose_cover time:%3.3f s\n",t1-t0);
}

static const hpatch_StreamPos_t _kNullCoverHitEndPos =~(hpatch_StreamPos_t)0;
Expand Down
54 changes: 26 additions & 28 deletions libHDiffPatch/HDiff/private_diff/limit_mem_diff/digest_matcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,17 @@ static const size_t kMinReadSize=1024*4; //for random first read speed
static const size_t kMinBackupReadSize=256;
static const size_t kBestMatchRange=1024*64;
static const size_t kMaxLinkIndexFindCount=64;
static const size_t kMinParallelSize=1024*64;
static const size_t kBestParallelSize=1024*1024*16;
static const size_t kMinParallelSize=1024*1024*2;
static const size_t kBestParallelSize=1024*1024*8;


#define readStream(stream,pos,dst,n) { \
if (((n)>0)&&(!(stream)->read(stream,m_streamOffset+pos,dst,dst+(n)))) \
if (((n)>0)&&(!(stream)->read(stream,pos,dst,dst+(n)))) \
throw std::runtime_error("TStreamCache::_resetPos_continue() stream->read() error!"); }

struct TStreamCache{
TStreamCache(const hpatch_TStreamInput* _stream,unsigned char* _cache,size_t _cacheSize,
hpatch_StreamPos_t _streamOffset,void* _locker)
:stream(_stream),m_readPos(0),m_readPosEnd(0),m_streamOffset(_streamOffset),m_locker(_locker),
TStreamCache(const hpatch_TStreamInput* _stream,unsigned char* _cache,size_t _cacheSize,void* _locker)
:stream(_stream),m_readPos(0),m_readPosEnd(0),m_locker(_locker),
cache(_cache),cacheSize(_cacheSize),cachePos(_cacheSize){ }
inline hpatch_StreamPos_t streamSize()const{ return stream->streamSize; }
inline hpatch_StreamPos_t pos()const { return m_readPosEnd-dataLength(); }
Expand Down Expand Up @@ -115,7 +114,6 @@ struct TStreamCache{
protected:
hpatch_StreamPos_t m_readPos;
hpatch_StreamPos_t m_readPosEnd;
const hpatch_StreamPos_t m_streamOffset;
void* m_locker;
unsigned char* cache;
size_t cacheSize;
Expand Down Expand Up @@ -154,7 +152,8 @@ size_t TDigestMatcher::getSearchThreadNum()const{
#if (_IS_USED_MULTITHREAD)
const size_t threadNum=m_threadNum;
hpatch_StreamPos_t size=m_newData->streamSize;
if ((threadNum>1)&&(size>=kMinParallelSize)&&(size/2>=m_kMatchBlockSize)) {
if ((threadNum>1)&&(m_oldData->streamSize>=m_kMatchBlockSize)
&&(size>=kMinParallelSize)&&(size/2>=m_kMatchBlockSize)) {
const hpatch_StreamPos_t maxThreanNum=size/(kMinParallelSize/2);
return (threadNum<=maxThreanNum)?threadNum:(size_t)maxThreanNum;
}else
Expand Down Expand Up @@ -273,7 +272,7 @@ void TDigestMatcher::getDigests(){
if (m_blocks.empty()) return;

const size_t blockCount=m_blocks.size();
TStreamCache streamCache(m_oldData,m_mem.data(),m_newCacheSize+m_oldCacheSize,0,0);
TStreamCache streamCache(m_oldData,m_mem.data(),m_newCacheSize+m_oldCacheSize,0);
for (size_t i=0;i<blockCount;++i) {
hpatch_StreamPos_t readPos=blockIndexToPos(i,m_kMatchBlockSize,m_oldData->streamSize);
streamCache.resetPos(0,readPos,m_kMatchBlockSize);
Expand All @@ -296,10 +295,9 @@ void TDigestMatcher::getDigests(){
}

struct TBlockStreamCache:public TStreamCache{
TBlockStreamCache(const hpatch_TStreamInput* _stream,unsigned char* _cache,
size_t _cacheSize,size_t _backupCacheSize, size_t _kMatchBlockSize,
hpatch_StreamPos_t _streamOffset,void* _locker)
:TStreamCache(_stream,_cache,_cacheSize,_streamOffset,_locker),
TBlockStreamCache(const hpatch_TStreamInput* _stream,unsigned char* _cache,size_t _cacheSize,
size_t _backupCacheSize, size_t _kMatchBlockSize,void* _locker)
:TStreamCache(_stream,_cache,_cacheSize,_locker),
backupCacheSize(_backupCacheSize),kMatchBlockSize(_kMatchBlockSize){
assert(cacheSize>=(backupCacheSize+kMatchBlockSize)); }
inline bool resetPos(hpatch_StreamPos_t streamPos){
Expand Down Expand Up @@ -331,7 +329,7 @@ struct TOldStreamCache:public TBlockStreamCache{
size_t _minCacheSize,size_t _maxCacheSize,
size_t _backupCacheSize,size_t _kMatchBlockSize,void* _locker)
:TBlockStreamCache(_stream,_cache+_maxCacheSize-_minCacheSize,
_minCacheSize, _backupCacheSize,_kMatchBlockSize,0,_locker),
_minCacheSize, _backupCacheSize,_kMatchBlockSize,_locker),
minCacheSize(_minCacheSize),maxCacheSize(_maxCacheSize){ }

inline bool resetPos(hpatch_StreamPos_t streamPos){
Expand Down Expand Up @@ -379,11 +377,9 @@ struct TOldStreamCache:public TBlockStreamCache{
};

struct TNewStreamCache:public TBlockStreamCache{
TNewStreamCache(const hpatch_TStreamInput* _stream,unsigned char* _cache,
size_t _cacheSize,size_t _backupCacheSize,size_t _kMatchBlockSize,
hpatch_StreamPos_t _streamOffset,void* _locker)
:TBlockStreamCache(_stream,_cache,_cacheSize,_backupCacheSize,_kMatchBlockSize,
_streamOffset,_locker){
TNewStreamCache(const hpatch_TStreamInput* _stream,unsigned char* _cache,size_t _cacheSize,
size_t _backupCacheSize,size_t _kMatchBlockSize,void* _locker)
:TBlockStreamCache(_stream,_cache,_cacheSize,_backupCacheSize,_kMatchBlockSize,_locker){
resetPos(0);
}
void toBestDataLength(){
Expand Down Expand Up @@ -670,7 +666,7 @@ void TDigestMatcher::_search_cover(const hpatch_TStreamInput* newData,hpatch_Str
hpatch_TOutputCovers* out_covers,unsigned char* pmem,
void* oldDataLocker,void* newDataLocker,void* coversLocker){
TNewStreamCache newStream(newData,pmem,m_newCacheSize,m_backupCacheSize,
m_kMatchBlockSize,newOffset,newDataLocker);
m_kMatchBlockSize,newDataLocker);
TOldStreamCache oldStream(m_oldData,pmem+m_newCacheSize,m_oldMinCacheSize,
m_oldCacheSize,m_backupCacheSize,m_kMatchBlockSize,oldDataLocker);
if (m_isUseLargeSorted)
Expand All @@ -684,7 +680,6 @@ struct mt_data_t{
CHLocker oldDataLocker;
CHLocker newDataLocker;
CHLocker coversLocker;
hpatch_StreamPos_t rollCount;
hpatch_StreamPos_t workCount;
volatile hpatch_StreamPos_t workIndex;
};
Expand All @@ -693,16 +688,20 @@ struct mt_data_t{
void TDigestMatcher::_search_cover_thread(hpatch_TOutputCovers* out_covers,
unsigned char* pmem,void* mt_data){
#if (_IS_USED_MULTITHREAD)
const size_t kPartPepeatSize=m_kMatchBlockSize-1;
mt_data_t& mt=*(mt_data_t*)mt_data;
const hpatch_StreamPos_t workCount=mt.workCount;
const hpatch_StreamPos_t rollCount=m_newData->streamSize-(m_kMatchBlockSize-1);
std::atomic<hpatch_StreamPos_t>& workIndex=*(std::atomic<hpatch_StreamPos_t>*)&mt.workIndex;
while (true){
hpatch_StreamPos_t curWorkIndex=workIndex++;
if (curWorkIndex>=mt.workCount) break;
hpatch_TStreamInput newData=*m_newData;
hpatch_StreamPos_t newOffset=mt.rollCount*curWorkIndex/mt.workCount;
newData.streamSize=((curWorkIndex+1<mt.workCount)?mt.rollCount*(curWorkIndex+1)/mt.workCount:mt.rollCount)
-newOffset+(m_kMatchBlockSize-1);
_search_cover(&newData,newOffset,out_covers,pmem,
if (curWorkIndex>=workCount) break;
hpatch_StreamPos_t new_begin=rollCount*curWorkIndex/workCount;
hpatch_StreamPos_t new_end=(curWorkIndex+1<workCount)?rollCount*(curWorkIndex+1)/workCount:rollCount;
assert(new_end+kPartPepeatSize<=m_newData->streamSize);
TStreamInputClip newClip;
TStreamInputClip_init(&newClip,m_newData,new_begin,new_end+kPartPepeatSize);
_search_cover(&newClip.base,new_begin,out_covers,pmem,
mt.oldDataLocker.locker,mt.newDataLocker.locker,mt.coversLocker.locker);
}
#endif
Expand All @@ -724,7 +723,6 @@ void TDigestMatcher::search_cover(hpatch_TOutputCovers* out_covers){
hpatch_StreamPos_t workCount=(rollCount+bestStep-1)/bestStep;
workCount=(threadNum>workCount)?threadNum:workCount;
mt_data_t mt_data;
mt_data.rollCount=rollCount;
mt_data.workCount=workCount;
mt_data.workIndex=0;
const size_t threadCount=threadNum-1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class TDigestMatcher{
size_t kMatchBlockSize,size_t threadNum);
void search_cover(hpatch_TOutputCovers* out_covers);
~TDigestMatcher();
private:
TDigestMatcher(const TDigestMatcher &); //empty
TDigestMatcher &operator=(const TDigestMatcher &); //empty
private:
const hpatch_TStreamInput* m_oldData;
const hpatch_TStreamInput* m_newData;
Expand Down
5 changes: 4 additions & 1 deletion libHDiffPatch/HDiff/private_diff/suffix_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class TSuffixString{
typedef ptrdiff_t TInt;
typedef int32_t TInt32;
typedef unsigned char TChar;
TSuffixString(bool isUsedFastMatch=false);
explicit TSuffixString(bool isUsedFastMatch=false);
~TSuffixString();

//throw std::runtime_error when create SA error
Expand All @@ -98,6 +98,9 @@ class TSuffixString{
return (TInt)m_SA_limit[i];
}
TInt lower_bound(const TChar* str,const TChar* str_end)const;//return index in SA; must str_end-str>=2 !
private:
TSuffixString(const TSuffixString &); //empty
TSuffixString &operator=(const TSuffixString &); //empty
private:
const TChar* m_src_begin;//原字符串.
const TChar* m_src_end;
Expand Down

0 comments on commit c46c93d

Please sign in to comment.