MB 无锁队列复现和原论文的纠错

纠错

Michael&Scott 1 给出了了一个无锁队列的实现。在我实际用 Java 语音复现时,发现其给出的伪代码有一处问题。

原始版本的enque有一处是错误的。红色标明的行中,第一个if判断了next.ptr==null之后,第二个 CAS 里面,直接比较next.ptr和其自己的值。但这两个语句之间,实际上另一个线程可能已经把next.ptr赋值了,这样 CAS 肯定成功,因为自己等于自己,但这样就会把已经连上的节点替换掉。

code

我们可通过如下一个简单的实验证实(完整的框架见下面

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
...
if (next.getReference() == null) {
    if(next.getReference() != null){
        System.err.println("bad things happen!!!");
    }

    if (next.compareAndSet(null,
                            node,
                            next.getStamp(),
                            next.getStamp() + 1)) {
        break;
    }
} else {...}
...

result

所以,那里的 CAS 应该改成CAS(&tail.next->ptr,NULL,next,[node,next.count+1])。最后是完整的复现和测试代码。测试里开 100 个进程,分别入队 10000 个字符串。然后检查每个进程出队的顺序是不是入队的顺序。结果符合预期。

复现代码

Queue.java

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import java.util.concurrent.atomic.AtomicStampedReference;

public class Queue<T> {
    final Node<T> sentinel = new Node<>();
    final AtomicStampedReference<Node<T>> Head;
    final AtomicStampedReference<Node<T>> Tail;


    Queue() {
        Head = new AtomicStampedReference<>(sentinel, 0);
        Tail = new AtomicStampedReference<>(sentinel, 0);
    }

    void enqueue(T value) {
        Node<T> node = new Node<>(value);
        AtomicStampedReference<Node<T>> tail = new AtomicStampedReference<>(null, 0);
        AtomicStampedReference<Node<T>> next;

        while (true) {
            tail.set(Tail.getReference(), Tail.getStamp());
            next = tail.getReference().next;
            if (tail.getReference() == Tail.getReference()) {
                if (next.getReference() == null) {
                    if (next.compareAndSet(null,
                                           node,
                                           next.getStamp(),
                                           next.getStamp() + 1)) {
                        break;
                    }
                } else {
                    Tail.compareAndSet(tail.getReference(),
                                       next.getReference(),
                                       tail.getStamp(),
                                       tail.getStamp() + 1);

                }
            }
        }
        Tail.compareAndSet(tail.getReference(), node, tail.getStamp(), tail.getStamp() + 1);
    }

    boolean dequeue() {
        AtomicStampedReference<Node<T>> head = new AtomicStampedReference<>(null, -1);
        AtomicStampedReference<Node<T>> tail = new AtomicStampedReference<>(null, -1);
        AtomicStampedReference<Node<T>> next;

        while (true) {
            head.set(Head.getReference(), Head.getStamp());
            tail.set(Tail.getReference(), Tail.getStamp());
            next = head.getReference().next;

            if (head.getReference() == Head.getReference()) {
                if (head.getReference() == tail.getReference()) {
                    if (next.getReference() == null) {
                        return false;
                    }
                    Tail.compareAndSet(tail.getReference(), next.getReference(), tail.getStamp()
                            , tail.getStamp() + 1);
                } else {
                    if (Head.compareAndSet(head.getReference(),
                                           next.getReference(),
                                           head.getStamp(),
                                           head.getStamp() + 1)) {
                        break;
                    }
                }
            }
        }
        return true;
    }

    boolean dequeue(T[] temp) {
        AtomicStampedReference<Node<T>> head = new AtomicStampedReference<>(null, -1);
        AtomicStampedReference<Node<T>> tail = new AtomicStampedReference<>(null, -1);
        AtomicStampedReference<Node<T>> next;

        while (true) {
            head.set(Head.getReference(), Head.getStamp());
            tail.set(Tail.getReference(), Tail.getStamp());
            next = head.getReference().next;

            if (head.getReference() == Head.getReference()) {
                if (head.getReference() == tail.getReference()) {
                    if (next.getReference() == null) {
                        return false;
                    }
                    Tail.compareAndSet(tail.getReference(), next.getReference(), tail.getStamp()
                            , tail.getStamp() + 1);
                } else {
                    temp[0] = next.getReference().value;
                    if (Head.compareAndSet(head.getReference(),
                                           next.getReference(),
                                           head.getStamp(),
                                           head.getStamp() + 1)) {
                        break;
                    }
                }
            }
        }
        return true;
    }

    void goThrough() {
        AtomicStampedReference<Node<T>> iter = Head.getReference().next;
        while (iter.getReference() != null) {
            System.out.println(iter.getReference().value);
            iter = iter.getReference().next;
        }
    }

}


class Node<T> {
    T value;
    final AtomicStampedReference<Node<T>> next = new AtomicStampedReference<>(null, 0);

    Node() {
    }

    Node(T value) {
        this.value = value;
    }
}

Main.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.Arrays;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Queue<String> q = new Queue<>();
        int num = 10000;
        Thread[] clusters = new Thread[100];
        for (int j = 0; j < clusters.length; j++) {
            int finalJ = j;
            clusters[j] = new Thread(() -> {
                for (int i = 0; i < num; i++) {
                    q.enqueue(String.format("%d %d", finalJ, i));
                }
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        for (Thread cluster : clusters) {
            cluster.start();
        }
        for (Thread cluster : clusters) {
            cluster.join();
        }

        int[] out = new int[clusters.length];
        Arrays.fill(out, Integer.MIN_VALUE);
        String[] holder = new String[1];
        for (int i = 0; i < clusters.length * num; i++) {
            if (q.dequeue(holder)) {
                String[] re = holder[0].split(" ");
                int index = Integer.parseInt(re[0]);
                int var = Integer.parseInt(re[1]);

                if (out[index] >= var) {
                    System.out.printf("%d %d>%d%n", index, out[index], var);
                    System.exit(1);
                }
                out[index] = var;
            }
        }
    }
}

  1. MICHAEL M M, SCOTT M L, 1998. Nonblocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Memory Multiprocessors[J/OL]. Journal of Parallel and Distributed Computing, 51(1): 1-26. DOI:10.1006/jpdc.1998.1446. ↩︎