Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

universal模式下的领导选举机制 #182

Open
sjmshsh opened this issue Jan 29, 2024 · 2 comments
Open

universal模式下的领导选举机制 #182

sjmshsh opened this issue Jan 29, 2024 · 2 comments
Labels
proposal propose new suggestions for the project

Comments

@sjmshsh
Copy link
Contributor

sjmshsh commented Jan 29, 2024

我们需要在universal的模式下建立一个领导选举机制。目的是为了防止某些动作被执行多次。

@kaori-seasons
Copy link

kaori-seasons commented Feb 24, 2024

由于多副本选主需要用到分布式锁,下面是我的一些想法:

基于表主键唯一做分布式锁

通常来说,基于mysql排他锁来实现是最简单的方式,缺点是可能引起表锁。再者,参与的副本数量是不断变化的,所以通过select~for update这种方式前提还需要不断的更新维护数据。既然如此,不如直接insert更新数据时,利用主键唯一直接来实现分布式锁。这两点最大区别是一个利用了mysql自身的排他锁,一个则是自己像mysql写入主键相同的数据来实现。

设计的目的以及折中

对于这种场景,我们有以下几种情况需要考虑:

  • 可重入锁问题:kuma中对于每个参与选举的节点,是开启一个协程去处理的,并且保持一个长链接,即在目前的场景中是可以忽略分布式锁不可重入的问题的。但这里有一个问题,既然协程是需要长久占用锁的,那就需要不断更新其超时时间,使对锁永不超时,保持对锁的占用。直至进程因为某些原因挂掉,自动超时,被其他协程获取。

针对这一点,扩展到golang项目中,在golang开发中使用多个协程处理公共资源一般情况下不需要考虑锁的重入问题,因为golang 协程一般不像在java线程那样,使用线程池对线程进行复用,协程处理完任务golang进行回收,所以不存在协程再次需要获取锁的情况。再者,golang中获取唯一标识这点也略微麻烦

  • 锁释放问题,分布式锁需要设置超时时间,避免在获取锁的协程挂掉之后锁不能被释放。
  • mysql单点问题:mysql自身需要支持高可用部署

实现方案

const TIMEOUT int64 = 10 * 60

// mysql table struct
type distributedLock struct {
	Id         string `gorm:"primary_key"`
	ExpireTime int64
}

type lockObject struct {
	id         string
	expireTime int64
	db         *gorm.DB
}

func NewLock(id string, db *gorm.DB) *lockObject {
	return &lockObject{
		id:         id,
		expireTime: time.Now().Unix() + TIMEOUT,
		db:         db,
	}
}

func (lock *lockObject) TryLock() bool {
	// clean timeout lock
	lock.deleleExpiredLock()

	var newLock = distributedLock{
		Id:         lock.id,
		ExpireTime: lock.expireTime,
	}
	// insert lock
	if err := lock.db.Table("distributed_lock").Create(&newLock).Error; err != nil {
		return false
	}

	return true
}

// 因为参与选举的节点可能自身还需要保持pod中读取流的数据进入,常驻任务,所以需要保持锁不超时,如果超时后,其他协程获取锁再对pod进行数据读取,就会出现重复
func (lock *lockObject) KeepLock(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		default:
			var oldLock = distributedLock{
				Id: lock.id,
			}
			lock.db.Table("distributed_lock").Model(&oldLock).Update("expire_time", time.Now().Unix()+TIMEOUT)

			timer := time.NewTimer(time.Second * 10)
			<-timer.C
		}
	}
}

func (lock *lockObject) deleleExpiredLock() {
	var now = time.Now().Unix()
	lock.db.Table("distributed_lock").Where("id = ? AND expire_time < ?", lock.id, now).Delete(distributedLock{})
}

func (lock *lockObject) DeleleLock() {
	lock.db.Table("distributed_lock").Where("id = ?", lock.id).Delete(distributedLock{})
}

使用方式

for _, podItem := range podList.Items {
		pod := podItem
		go func() {
			lock := NewLock(pod.GetObjectMeta().GetName(), act.db)
			if !lock.TryLock() {
				return
			}
			ctx, cancel := context.WithCancel(context.Background())
			go lock.KeepLock(ctx)
			defer cancel()
			
			//todo write data
		}()
	}
}

@sjmshsh
Copy link
Contributor Author

sjmshsh commented Feb 25, 2024

Very good proposal, we will adopt this solution.

@mfordjody mfordjody added the proposal propose new suggestions for the project label Mar 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal propose new suggestions for the project
Projects
None yet
Development

No branches or pull requests

3 participants