global lock and fine-frained lock

  • 全局锁:简单,但影响性能
  • 局部锁:提升并行性能,但需要避免死锁,会产生额外的锁的开销
  • 并行度与额外的锁的开销,类似于任务颗粒度的问题

例子:有序链表

struct Node {
    int value;
    Node* next;
};
struct List {
    Node* head;
    Lock lock;              // global lock
};
void insert(List* list, int value) {
    Node* n = new Node;
    n->value = value;

    lock(list->lock);       // global lock
    Node* prev = list->head;
    Node* cur = list->head->next;
    while (cur) {
        if (cur->value > value)
            break;
        prev = cur;
        cur = cur->next;
    }
    n->next = cur;
    prev->next = n;
    unlock(list->lock);     // global unlock
}

struct Node {
    int value;
    Node* next;
    Lock* lock;             // 
};
void insert(List* list, int value) {
    Node* n = new Node;
    n->value = value;

    Node* prev, *cur;
    lock(list->lock);       // 全局锁,保证所有线程有序获得头结点锁
    prev = list->head;
    cur = list->head->next;

    lock(prev->lock);       // 全局锁切换成局部锁
    unlock(list->lock);
    if (cur) lock(cur->lock);

    while (cur) {
        if (cur->value > value)
            break;
        Node* old_prev = prev;
        prev = cur;
        cur = cur->next;
        unlock(old_prev->lock);     // 移动局部锁 prev&cur
        if (cur) lock(cur->lock);
    }
    n->next = cur;
    prev->next = n;

    unlock(prev->lock);             // 解锁局部锁 prev&cur
    if (cur) unlock(cur->lock);
}

lock-free data structure

锁是阻塞行为,即便在上下文切换时也禁止其他线程运行,利用非锁结构才能实现非阻塞算法,从而减少开销。

ABA问题

例子:lock-free stack

struct Node {
    Node* next;
    int value;
};
struct Stack {
    Node* top;
};
void init(Stack* s) {
s->top = NULL;
}
void push(Stack* s, Node* n) {
    while (1) {
        Node* old_top = s->top;
        n->next = old_top;
        // compare_and_swap,保证没有其他线程修改时,当前修改才生效
        if (compare_and_swap(&s->top, old_top, n) == old_top)
            return;
    }
}
Node* pop(Stack* s) {
    while (1) {
        Node* old_top = s->top;
        if (old_top == NULL)
            return NULL;
        Node* new_top = old_top->next;
        // 同理
        if (compare_and_swap(&s->top, old_top, new_top) == old_top)
            return old_top;
    }
}

解决:

struct Stack {
    Node* top;
    int pop_count;  // 对pop计数,即当前pop发生时,没有其他pop生效
};
Node* pop(Stack* s) {
    while (1) {
        int pop_count = s->pop_count;
        Node* top = s->top;
        if (top == NULL)
            return NULL;
        Node* new_top = top->next;
        if (double_compare_and_swap(&s->top, top, new_top, \
            &s->pop_count, pop_count, pop_count+1))
            return top;
    }
}

ShengYg

Step after step the ladder is ascended.


Tags •