From 914c6db7208ded3e917dd294be57f69217cc797a Mon Sep 17 00:00:00 2001 From: sisong Date: Fri, 5 Jul 2024 12:46:15 +0800 Subject: [PATCH] Interface hsync_TDictCompress adding support getDictCompressBorder(); for hsynz; (This new api for optimize libdeflate dict compress speed to 10x) --- libhsync/sync_make/dict_compress_plugin.h | 3 +- libhsync/sync_make/sync_info_make.cpp | 2 +- libhsync/sync_make/sync_make.cpp | 38 ++++++++++++++--------- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/libhsync/sync_make/dict_compress_plugin.h b/libhsync/sync_make/dict_compress_plugin.h index 411797de..b8698e02 100644 --- a/libhsync/sync_make/dict_compress_plugin.h +++ b/libhsync/sync_make/dict_compress_plugin.h @@ -55,8 +55,9 @@ extern "C" { size_t* out_dictSize); size_t (*dictCompress)(hsync_dictCompressHandle dictHandle,size_t blockIndex, hpatch_byte* out_code,hpatch_byte* out_codeEnd, - const hpatch_byte* in_dataBegin,const hpatch_byte* in_dataEnd); + const hpatch_byte* in_dataBegin,size_t in_dataSize,size_t in_borderSize); const char* (*compressTypeForDisplay)(void);//like compressType but just for display,can NULL + size_t (*getDictCompressBorder)(void);//if 0,can NULL } hsync_TDictCompress; #define kDictCompressCancel (~(size_t)0) diff --git a/libhsync/sync_make/sync_info_make.cpp b/libhsync/sync_make/sync_info_make.cpp index f8b298fd..2d021087 100644 --- a/libhsync/sync_make/sync_info_make.cpp +++ b/libhsync/sync_make/sync_info_make.cpp @@ -74,7 +74,7 @@ namespace sync_private{ hsync_dictCompressHandle compressHandle=compressPlugin->dictCompressOpen(compressPlugin,1,buf.size()); checkv(compressHandle!=0); size_t compressedSize=compressPlugin->dictCompress(compressHandle,0,cmbuf.data(),cmbuf.data()+cmbuf.size(), - buf.data(),buf.data()+buf.size()); + buf.data(),buf.size(),0); checkv(compressedSize!=kDictCompressError); if ((compressedSize==kDictCompressCancel)||(compressedSize>=buf.size())) compressedSize=0; //cancel compress diff --git a/libhsync/sync_make/sync_make.cpp b/libhsync/sync_make/sync_make.cpp index 713d58e6..b4e7ae5b 100644 --- a/libhsync/sync_make/sync_make.cpp +++ b/libhsync/sync_make/sync_make.cpp @@ -75,20 +75,19 @@ struct _TCompress{ } } inline ~_TCompress(){ if (dictCompressHandle) compressPlugin->dictCompressClose(compressPlugin,dictCompressHandle); } - inline size_t doCompress(uint32_t blockIndex,const TByte* data,const TByte* dataEnd){ + inline size_t doCompress(uint32_t blockIndex,const TByte* in_data,size_t in_dataSize,size_t in_borderSize){ if (dictCompressHandle==0) return 0;//not compress blockEnd=blockIndex+1; assert(cmBuf!=0); hpatch_byte* curBuf=cmBuf+cmBufPos; size_t result=compressPlugin->dictCompress(dictCompressHandle,blockIndex,curBuf, - curBuf+kMaxCompressedSize,data,dataEnd); + curBuf+kMaxCompressedSize,in_data,in_dataSize,in_borderSize); checkv(result!=kDictCompressError); if (result==kDictCompressCancel){ result=0; //cancel compress - size_t dataSize=dataEnd-data; - memcpy(curBuf,data,dataSize); - cmBufPos+=dataSize; + memcpy(curBuf,in_data,in_dataSize); + cmBufPos+=in_dataSize; }else{ checkv((result<=kMaxCompressedSize) &&(result<=2*kSyncBlockSize)); //for decompress memroy size ctrl @@ -122,6 +121,7 @@ struct _TCompress{ uint32_t blockBegin; uint32_t blockEnd; unsigned char* buf; + size_t in_borderSize; size_t cmSize; #if (_IS_USED_MULTITHREAD) struct TWorkBuf* next; @@ -231,7 +231,7 @@ static void _saveCompressedData(_TCreateDatas& cd,TWorkBuf* workData,void* _mt=0 if (cd.out_hsynz){ size_t writeSize=workData->cmSize; if (writeSize>0){ - hpatch_byte* cmBuf=workData->buf+dataLens; + hpatch_byte* cmBuf=workData->buf+dataLens+workData->in_borderSize; writeStream(cd.out_hsynz,cd.curOutPos,cmBuf,writeSize); }else{ writeSize=dataLens-backZeroLen; @@ -264,35 +264,41 @@ static void _create_sync_data_part(_TCreateDatas& cd,TWorkBuf* workData, TNewDataSyncInfo* out_hsyni=cd.out_hsyni; const uint32_t kSyncBlockSize=out_hsyni->kSyncBlockSize; const uint32_t kSyncBlockCount=(workData->blockEnd-workData->blockBegin); + hpatch_StreamPos_t curReadPos=(hpatch_StreamPos_t)workData->blockBegin*kSyncBlockSize; size_t backZeroLen=0; const size_t dataLens=(size_t)kSyncBlockSize*kSyncBlockCount; - compress.cmBuf=workData->buf+dataLens; + compress.cmBuf=workData->buf+dataLens+workData->in_borderSize; compress.cmBufPos=0; {//read data - const hpatch_StreamPos_t curReadPos=(hpatch_StreamPos_t)workData->blockBegin*kSyncBlockSize; if (curReadPos+dataLens>cd.newData->streamSize){ backZeroLen=(size_t)(curReadPos+dataLens-cd.newData->streamSize); assert(backZeroLenin_borderSize; + if (curReadPos+readLen>cd.newData->streamSize) + readLen=cd.newData->streamSize-curReadPos; #if (_IS_USED_MULTITHREAD) TMt* mt=(TMt*)_mt; CAutoLocker _autoLocker(mt?mt->readLocker.locker:0); if (mt) compress.resertDict(cd.newData,curReadPos,workData->blockBegin); #endif - checkv(cd.newData->read(cd.newData,curReadPos,workData->buf,workData->buf+dataLens-backZeroLen)); + checkv(cd.newData->read(cd.newData,curReadPos,workData->buf,workData->buf+readLen)); } if (backZeroLen) memset(workData->buf+dataLens-backZeroLen,0,backZeroLen); } hpatch_byte* dataBuf=workData->buf; - for (uint32_t i=workData->blockBegin;iblockEnd;++i,dataBuf+=kSyncBlockSize) { + for (uint32_t i=workData->blockBegin;iblockEnd;++i,dataBuf+=kSyncBlockSize,curReadPos+=kSyncBlockSize) { size_t srcDataLen=(i+1blockEnd)?kSyncBlockSize:kSyncBlockSize-backZeroLen; + size_t cur_borderSize=workData->in_borderSize; + if (curReadPos+srcDataLen+cur_borderSize>cd.newData->streamSize) + cur_borderSize=cd.newData->streamSize-(curReadPos+srcDataLen); //compress - size_t compressedSize=compress.doCompress(i,dataBuf,dataBuf+srcDataLen); + size_t compressedSize=compress.doCompress(i,dataBuf,srcDataLen,cur_borderSize); checkv(compressedSize==(uint32_t)compressedSize); if (out_hsyni->savedSizes) //save compressedSize out_hsyni->savedSizes[i]=(uint32_t)compressedSize; @@ -387,6 +393,7 @@ void _private_create_sync_data(TNewDataSyncInfo* newSyncInfo, checkChecksumInit(createDatas.out_hsyni->savedNewDataCheckChecksum, createDatas.out_hsyni->kStrongChecksumByteSize); + const size_t in_borderSize=(compressPlugin&&compressPlugin->getDictCompressBorder)?compressPlugin->getDictCompressBorder():0; const hpatch_StreamPos_t kMaxCompressedSize=compressPlugin?compressPlugin->maxCompressedSize(kSyncBlockSize):0; const size_t _kBestWorkBufSize=2*(1<<20); uint32_t bestWorkBlockCount=(uint32_t)((_kBestWorkBufSize+kSyncBlockSize+kMaxCompressedSize-1) @@ -402,7 +409,7 @@ void _private_create_sync_data(TNewDataSyncInfo* newSyncInfo, bestWorkBlockCount=(bestWorkBlockCount<=maxWorkBlockCount)?bestWorkBlockCount:maxWorkBlockCount; const size_t kWorkBufCount=threadNum+(threadNum/4)+1; - const size_t kWorkMemBufSize=(kSyncBlockSize+(size_t)kMaxCompressedSize)*bestWorkBlockCount; + const size_t kWorkMemBufSize=(kSyncBlockSize+(size_t)kMaxCompressedSize)*bestWorkBlockCount+in_borderSize; TAutoMem membuf((sizeof(TWorkBuf)+kWorkMemBufSize)*kWorkBufCount); TMt mt(threadNum,createDatas,bestWorkBlockCount,checksumPlugin,compressPlugin, @@ -413,6 +420,7 @@ void _private_create_sync_data(TNewDataSyncInfo* newSyncInfo, memset(workBufList,0,sizeof(TWorkBuf)*kWorkBufCount); for (size_t i=0;ibuf=workMemBuf; + workBufList->in_borderSize=in_borderSize; checkv(mt.read_chan.send(workBufList,true)); } } @@ -424,10 +432,12 @@ void _private_create_sync_data(TNewDataSyncInfo* newSyncInfo, }else #endif { - TAutoMem membuf((kSyncBlockSize+(size_t)kMaxCompressedSize)*bestWorkBlockCount); + TAutoMem membuf((kSyncBlockSize+(size_t)kMaxCompressedSize)*bestWorkBlockCount+in_borderSize); CChecksum checksumBlockData(checksumPlugin,false); _TCompress compress(compressPlugin,kBlockCount,kSyncBlockSize,createDatas.out_hsyni); - TWorkBuf workData={0}; workData.buf=membuf.data(); + TWorkBuf workData={0}; + workData.buf=membuf.data(); + workData.in_borderSize=in_borderSize; for (uint32_t ib=0; ib