Synchronization Mechanisms
Inspect one language lane at a time so line-level text and code deltas stay readable.
Diff Lane
English
0 modified sections0 code block delta0 anchor delta
Diff Lane
中文
4 modified sections0 code block delta0 anchor delta
modified原子操作 Atomictextcode+1 line
v1.0.5
Section Text
1
仓颉提供整数类型、`Bool` 类型和引用类型的原子操作。2
3
其中整数类型包括: `Int8`、`Int16`、`Int32`、`Int64`、`UInt8`、`UInt16`、`UInt32`、`UInt64`。4
5
整数类型的原子操作支持基本的读写、交换以及算术运算操作:6
7
| 操作 | 功能 |8
| ---------------- | ------------------------------------------------- |9
| `load` | 读取 |10
| `store` | 写入 |11
| `swap` | 交换,返回交换前的值 |12
| `compareAndSwap` | 比较再交换,交换成功返回 `true`,否则返回 `false` |13
| `fetchAdd` | 加法,返回执行加操作之前的值 |14
| `fetchSub` | 减法,返回执行减操作之前的值 |15
| `fetchAnd` | 与,返回执行与操作之前的值 |16
| `fetchOr` | 或,返回执行或操作之前的值 |17
| `fetchXor` | 异或,返回执行异或操作之前的值 |18
19
需要注意的是:20
21
1. 交换操作和算术操作的返回值是修改前的值。22
2. compareAndSwap 是判断当前原子变量的值是否等于 old 值,如果等于,则使用 new 值替换;否则不替换。23
24
以 `Int8` 类型为例,对应的原子操作类型声明如下:25
26
27
上述每一种原子类型的方法都有一个对应的方法可以接收内存排序参数,目前内存排序参数仅支持顺序一致性。28
29
类似的,其他整数类型对应的原子操作类型有:30
31
32
下方示例演示了如何在多线程程序中,使用原子操作实现计数:33
34
<!-- verify -->35
36
37
输出结果应为:38
39
40
以下是使用整数类型原子操作的一些其他正确示例:41
42
<!-- compile -->43
44
45
`Bool` 类型和引用类型的原子操作只提供读写和交换操作:46
47
| 操作 | 功能 |48
| ---------------- | ------------------------------------------------- |49
| `load` | 读取 |50
| `store` | 写入 |51
| `swap` | 交换,返回交换前的值 |52
| `compareAndSwap` | 比较再交换,交换成功返回 `true`,否则返回 `false` |53
54
> **注意:**55
>56
> 引用类型原子操作只对引用类型有效。57
58
原子引用类型是 `AtomicReference`,以下是使用 `Bool` 类型、引用类型原子操作的一些正确示例:59
60
<!-- verify -->61
62
63
编译执行上述代码,输出结果为:Code 1 · cangjie
1
class AtomicInt8 {2
public func load(): Int83
public func store(val: Int8): Unit4
public func swap(val: Int8): Int85
public func compareAndSwap(old: Int8, new: Int8): Bool6
public func fetchAdd(val: Int8): Int87
public func fetchSub(val: Int8): Int88
public func fetchAnd(val: Int8): Int89
public func fetchOr(val: Int8): Int810
public func fetchXor(val: Int8): Int811
}Code 2 · cangjie
1
class AtomicInt16 {...}2
class AtomicInt32 {...}3
class AtomicInt64 {...}4
class AtomicUInt8 {...}5
class AtomicUInt16 {...}6
class AtomicUInt32 {...}7
class AtomicUInt64 {...}Code 3 · cangjie
1
import std.sync.AtomicInt642
import std.collection.ArrayList3
4
let count = AtomicInt64(0)5
6
main(): Int64 {7
let list = ArrayList<Future<Int64>>()8
9
// create 1000 threads.10
for (_ in 0..1000) {11
let fut = spawn {12
sleep(Duration.millisecond) // sleep for 1ms.13
count.fetchAdd(1)14
}15
list.add(fut)16
}17
18
// Wait for all threads finished.19
for (f in list) {20
f.get()21
}22
23
let val = count.load()24
println("count = ${val}")25
return 026
}Code 4 · text
1
count = 1000Code 5 · cangjie
1
var obj: AtomicInt32 = AtomicInt32(1)2
var x = obj.load() // x: 1, the type is Int323
x = obj.swap(2) // x: 14
x = obj.load() // x: 25
var y = obj.compareAndSwap(2, 3) // y: true, the type is Bool.6
y = obj.compareAndSwap(2, 3) // y: false, the value in obj is no longer 2 but 3. Therefore, the CAS operation fails.7
x = obj.fetchAdd(1) // x: 38
x = obj.load() // x: 4Code 6 · cangjie
1
import std.sync.{AtomicBool, AtomicReference}2
3
class A {}4
5
main() {6
var obj = AtomicBool(true)7
var x1 = obj.load() // x1: true, the type is Bool8
println(x1)9
var t1 = A()10
var obj2 = AtomicReference(t1)11
var x2 = obj2.load() // x2 and t1 are the same object12
var y1 = obj2.compareAndSwap(x2, t1) // x2 and t1 are the same object, y1: true13
println(y1)14
var t2 = A()15
var y2 = obj2.compareAndSwap(t2, A()) // x and t1 are not the same object, CAS fails, y2: false16
println(y2)17
y2 = obj2.compareAndSwap(t1, A()) // CAS successes, y2: true18
println(y2)19
}Code 7 · text
1
true2
true3
false4
truev1.1.0
Section Text
1
仓颉提供整数类型、`Bool` 类型和引用类型的原子操作。2
3
其中整数类型包括: `Int8`、`Int16`、`Int32`、`Int64`、`UInt8`、`UInt16`、`UInt32`、`UInt64`。4
5
整数类型的原子操作支持基本的读写、交换以及算术运算操作:6
7
| 操作 | 功能 |8
| ---------------- | ------------------------------------------------- |9
| `load` | 读取 |10
| `store` | 写入 |11
| `swap` | 交换,返回交换前的值 |12
| `compareAndSwap` | 比较再交换,交换成功返回 `true`,否则返回 `false` |13
| `fetchAdd` | 加法,返回执行加操作之前的值 |14
| `fetchSub` | 减法,返回执行减操作之前的值 |15
| `fetchAnd` | 与,返回执行与操作之前的值 |16
| `fetchOr` | 或,返回执行或操作之前的值 |17
| `fetchXor` | 异或,返回执行异或操作之前的值 |18
19
需要注意的是:20
21
1. 交换操作和算术操作的返回值是修改前的值。22
2. compareAndSwap 是判断当前原子变量的值是否等于 old 值,如果等于,则使用 new 值替换;否则不替换。23
24
以 `Int8` 类型为例,对应的原子操作类型声明如下:25
26
<!-- code_no_check -->27
28
上述每一种原子类型的方法都有一个对应的方法可以接收内存排序参数,目前内存排序参数仅支持顺序一致性。29
30
类似的,其他整数类型对应的原子操作类型有:31
32
<!-- code_no_check -->33
34
下方示例演示了如何在多线程程序中,使用原子操作实现计数:35
36
<!-- verify -->37
38
39
输出结果应为:40
41
42
以下是使用整数类型原子操作的一些其他正确示例:43
44
<!-- compile -->45
46
47
`Bool` 类型和引用类型的原子操作只提供读写和交换操作:48
49
| 操作 | 功能 |50
| ---------------- | ------------------------------------------------- |51
| `load` | 读取 |52
| `store` | 写入 |53
| `swap` | 交换,返回交换前的值 |54
| `compareAndSwap` | 比较再交换,交换成功返回 `true`,否则返回 `false` |55
56
> **注意:**57
>58
> 引用类型原子操作只对引用类型有效。59
60
原子引用类型是 `AtomicReference`,以下是使用 `Bool` 类型、引用类型原子操作的一些正确示例:61
62
<!-- verify -->63
64
65
编译执行上述代码,输出结果为:Code 1 · cangjie
1
class AtomicInt8 {2
public func load(): Int83
public func store(val: Int8): Unit4
public func swap(val: Int8): Int85
public func compareAndSwap(old: Int8, new: Int8): Bool6
public func fetchAdd(val: Int8): Int87
public func fetchSub(val: Int8): Int88
public func fetchAnd(val: Int8): Int89
public func fetchOr(val: Int8): Int810
public func fetchXor(val: Int8): Int811
}Code 2 · cangjie
1
class AtomicInt16 {...}2
class AtomicInt32 {...}3
class AtomicInt64 {...}4
class AtomicUInt8 {...}5
class AtomicUInt16 {...}6
class AtomicUInt32 {...}7
class AtomicUInt64 {...}Code 3 · cangjie
1
import std.sync.AtomicInt642
import std.collection.ArrayList3
4
let count = AtomicInt64(0)5
6
main(): Int64 {7
let list = ArrayList<Future<Int64>>()8
9
// create 1000 threads.10
for (_ in 0..1000) {11
let fut = spawn {12
sleep(Duration.millisecond) // sleep for 1ms.13
count.fetchAdd(1)14
}15
list.add(fut)16
}17
18
// Wait for all threads finished.19
for (f in list) {20
f.get()21
}22
23
let val = count.load()24
println("count = ${val}")25
return 026
}Code 4 · text
1
count = 1000Code 5 · cangjie
1
var obj: AtomicInt32 = AtomicInt32(1)2
var x: Int32 = obj.load() // x: 13
x = obj.swap(2) // x: 14
x = obj.load() // x: 25
var y: Bool = obj.compareAndSwap(2, 3) // y: true6
y = obj.compareAndSwap(2, 3) // y: false, the value in obj is no longer 2 but 3. Therefore, the CAS operation fails7
x = obj.fetchAdd(1) // x: 38
x = obj.load() // x: 4Code 6 · cangjie
1
import std.sync.{AtomicBool, AtomicReference}2
3
class A {}4
5
main() {6
var obj = AtomicBool(true)7
var x1 : Bool = obj.load() // x1: true8
println(x1)9
var t1 = A()10
var obj2 = AtomicReference(t1)11
var x2 = obj2.load() // x2 and t1 are the same object12
var y1 = obj2.compareAndSwap(x2, t1) // x2 and t1 are the same object, y1: true13
println(y1)14
var t2 = A()15
var y2 = obj2.compareAndSwap(t2, A()) // x and t1 are not the same object, CAS fails, y2: false16
println(y2)17
y2 = obj2.compareAndSwap(t1, A()) // CAS successes, y2: true18
println(y2)19
}Code 7 · text
1
true2
true3
false4
truemodified可重入互斥锁 Mutextext+1 line
v1.0.5
Section Text
1
可重入互斥锁的作用是对临界区加以保护,使得任意时刻最多只有一个线程能够执行临界区的代码。当一个线程试图获取一个已被其他线程持有的锁时,该线程会被阻塞,直到锁被释放,该线程才会被唤醒,可重入是指线程获取该锁后可再次获得该锁。2
3
使用可重入互斥锁时,必须牢记两条规则:4
5
1. 在访问共享数据之前,必须尝试获取锁;6
2. 处理完共享数据后,必须释放锁,以便其他线程可以获得锁。7
8
`Mutex` 提供的主要成员函数如下:9
10
11
下方示例演示了如何使用 `Mutex` 来保护对全局共享变量 `count` 的访问,对 `count` 的操作即属于临界区:12
13
<!-- verify -->14
15
16
输出结果应为:17
18
19
下方示例演示了如何使用 `tryLock`:20
21
<!-- run -->22
23
24
一种可能的输出结果如下:25
26
27
以下是互斥锁的一些错误示例:28
29
错误示例 1:线程操作临界区后没有解锁,导致其他线程无法获得锁而阻塞。30
31
<!-- compile.error -->32
<!-- cfg="libcangjie-std-sync" -->33
34
35
错误示例 2:在本线程没有持有锁的情况下调用 `unlock` 将会抛出异常。36
37
<!-- compile.error -->38
<!-- cfg="libcangjie-std-sync" -->39
40
41
错误示例 3:`tryLock()` 并不保证获取到锁,可能会造成不在锁的保护下操作临界区和在没有持有锁的情况下调用 `unlock` 抛出异常等行为。42
43
<!-- compile.error -->44
<!-- cfg="libcangjie-std-sync" -->45
46
47
另外,`Mutex` 在设计上是一个可重入锁,也就是说:在某个线程已经持有一个 `Mutex` 锁的情况下,再次尝试获取同一个 `Mutex` 锁,永远可以立即获得该 `Mutex` 锁。48
49
> **注意:**50
>51
> 虽然 `Mutex` 是一个可重入锁,但是调用 `unlock()` 的次数必须和调用 `lock()` 的次数相同,才能成功释放该锁。52
53
下方示例代码演示了 `Mutex` 可重入的特性:54
55
<!-- verify -->56
57
58
输出结果应为:59
60
61
在上方示例中,无论是主线程还是新创建的线程,如果在 `foo()` 中已经获得了锁,那么继续调用 `bar()` 的话,在 `bar()` 函数中由于是对同一个 `Mutex` 进行加锁,因此也是能立即获得该锁的,不会出现死锁。Code 1 · cangjie
1
public class Mutex <: UniqueLock {2
// Create a Mutex.3
public init()4
5
// Locks the mutex, blocks if the mutex is not available.6
public func lock(): Unit7
8
// Unlocks the mutex. If there are other threads blocking on this9
// lock, then wake up one of them.10
public func unlock(): Unit11
12
// Tries to lock the mutex, returns false if the mutex is not13
// available, otherwise returns true.14
public func tryLock(): Bool15
16
// Generate a Condition instance for the mutex.17
public func condition(): Condition18
}Code 2 · cangjie
1
import std.sync.Mutex2
import std.collection.ArrayList3
4
var count: Int64 = 05
let mtx = Mutex()6
7
main(): Int64 {8
let list = ArrayList<Future<Unit>>()9
10
// create 1000 threads.11
for (i in 0..1000) {12
let fut = spawn {13
sleep(Duration.millisecond) // sleep for 1ms.14
mtx.lock()15
count++16
mtx.unlock()17
}18
list.add(fut)19
}20
21
// Wait for all threads finished.22
for (f in list) {23
f.get()24
}25
26
println("count = ${count}")27
return 028
}Code 3 · text
1
count = 1000Code 4 · cangjie
1
import std.sync.Mutex2
3
4
main(): Int64 {5
let mtx: Mutex = Mutex()6
var future: Future<Unit> = spawn {7
mtx.lock()8
println("get the lock, do something")9
sleep(Duration.millisecond * 10)10
mtx.unlock()11
}12
try {13
future.get(Duration.millisecond * 10)14
} catch (e: TimeoutException) {15
if (mtx.tryLock()) {16
println("tryLock success, do something")17
mtx.unlock()18
return 019
}20
println("tryLock failed, do nothing")21
return 022
}23
return 024
}Code 5 · text
1
get the lock, do somethingCode 6 · cangjie
1
import std.sync.Mutex2
3
var sum: Int64 = 04
let mutex = Mutex()5
6
main() {7
let foo = spawn { =>8
mutex.lock()9
sum = sum + 110
}11
let bar = spawn { =>12
mutex.lock()13
sum = sum + 114
}15
foo.get()16
println("${sum}")17
bar.get() // Because the thread is not unlocked, other threads waiting to obtain the current mutex will be blocked.18
}Code 7 · cangjie
1
import std.sync.Mutex2
3
var sum: Int64 = 04
let mutex = Mutex()5
6
main() {7
let foo = spawn { =>8
sum = sum + 19
mutex.unlock() // Error, Unlock without obtaining the lock and throw an exception: IllegalSynchronizationStateException.10
}11
foo.get()12
}Code 8 · cangjie
1
import std.sync.Mutex2
3
var sum: Int64 = 04
let mutex = Mutex()5
6
main() {7
for (i in 0..100) {8
spawn { =>9
mutex.tryLock() // Error, `tryLock()` just trying to acquire a lock, there is no guarantee that the lock will be acquired, and this can lead to abnormal behavior.10
sum = sum + 111
mutex.unlock()12
}13
}14
}Code 9 · cangjie
1
import std.sync.Mutex2
3
var count: Int64 = 04
let mtx = Mutex()5
6
func foo() {7
mtx.lock()8
count += 109
bar()10
mtx.unlock()11
}12
13
func bar() {14
mtx.lock()15
count += 10016
mtx.unlock()17
}18
19
main(): Int64 {20
let fut = spawn {21
sleep(Duration.millisecond) // sleep for 1ms.22
foo()23
}24
25
foo()26
27
fut.get()28
29
println("count = ${count}")30
return 031
}Code 10 · text
1
count = 220v1.1.0
Section Text
1
可重入互斥锁的作用是对临界区加以保护,使得任意时刻最多只有一个线程能够执行临界区的代码。当一个线程试图获取一个已被其他线程持有的锁时,该线程会被阻塞,直到锁被释放,该线程才会被唤醒,可重入是指线程获取该锁后可再次获得该锁。2
3
使用可重入互斥锁时,必须牢记两条规则:4
5
1. 在访问共享数据之前,必须尝试获取锁;6
2. 处理完共享数据后,必须释放锁,以便其他线程可以获得锁。7
8
`Mutex` 提供的主要成员函数如下:9
10
<!-- code_no_check -->11
12
下方示例演示了如何使用 `Mutex` 来保护对全局共享变量 `count` 的访问,对 `count` 的操作即属于临界区:13
14
<!-- verify -->15
16
17
输出结果应为:18
19
20
下方示例演示了如何使用 `tryLock`:21
22
<!-- run -->23
24
25
一种可能的输出结果如下:26
27
28
以下是互斥锁的一些错误示例:29
30
错误示例 1:线程操作临界区后没有解锁,导致其他线程无法获得锁而阻塞。31
32
<!-- compile.error -->33
<!-- cfg="libcangjie-std-sync" -->34
35
36
错误示例 2:在本线程没有持有锁的情况下调用 `unlock` 将会抛出异常。37
38
<!-- compile.error -->39
<!-- cfg="libcangjie-std-sync" -->40
41
42
错误示例 3:`tryLock()` 并不保证获取到锁,可能会造成不在锁的保护下操作临界区和在没有持有锁的情况下调用 `unlock` 抛出异常等行为。43
44
<!-- compile.error -->45
<!-- cfg="libcangjie-std-sync" -->46
47
48
另外,`Mutex` 在设计上是一个可重入锁,也就是说:在某个线程已经持有一个 `Mutex` 锁的情况下,再次尝试获取同一个 `Mutex` 锁,永远可以立即获得该 `Mutex` 锁。49
50
> **注意:**51
>52
> 虽然 `Mutex` 是一个可重入锁,但是调用 `unlock()` 的次数必须和调用 `lock()` 的次数相同,才能成功释放该锁。53
54
下方示例代码演示了 `Mutex` 可重入的特性:55
56
<!-- verify -->57
58
59
输出结果应为:60
61
62
在上方示例中,无论是主线程还是新创建的线程,如果在 `foo()` 中已经获得了锁,那么继续调用 `bar()` 的话,在 `bar()` 函数中由于是对同一个 `Mutex` 进行加锁,因此也是能立即获得该锁的,不会出现死锁。Code 1 · cangjie
1
public class Mutex <: UniqueLock {2
// Create a Mutex.3
public init()4
5
// Locks the mutex, blocks if the mutex is not available.6
public func lock(): Unit7
8
// Unlocks the mutex. If there are other threads blocking on this9
// lock, then wake up one of them.10
public func unlock(): Unit11
12
// Tries to lock the mutex, returns false if the mutex is not13
// available, otherwise returns true.14
public func tryLock(): Bool15
16
// Generate a Condition instance for the mutex.17
public func condition(): Condition18
}Code 2 · cangjie
1
import std.sync.Mutex2
import std.collection.ArrayList3
4
var count: Int64 = 05
let mtx = Mutex()6
7
main(): Int64 {8
let list = ArrayList<Future<Unit>>()9
10
// create 1000 threads.11
for (i in 0..1000) {12
let fut = spawn {13
sleep(Duration.millisecond) // sleep for 1ms.14
mtx.lock()15
count++16
mtx.unlock()17
}18
list.add(fut)19
}20
21
// Wait for all threads finished.22
for (f in list) {23
f.get()24
}25
26
println("count = ${count}")27
return 028
}Code 3 · text
1
count = 1000Code 4 · cangjie
1
import std.sync.Mutex2
3
4
main(): Int64 {5
let mtx: Mutex = Mutex()6
var future: Future<Unit> = spawn {7
mtx.lock()8
println("get the lock, do something")9
sleep(Duration.millisecond * 10)10
mtx.unlock()11
}12
try {13
future.get(Duration.millisecond * 10)14
} catch (e: TimeoutException) {15
if (mtx.tryLock()) {16
println("tryLock success, do something")17
mtx.unlock()18
return 019
}20
println("tryLock failed, do nothing")21
return 022
}23
return 024
}Code 5 · text
1
get the lock, do somethingCode 6 · cangjie
1
import std.sync.Mutex2
3
var sum: Int64 = 04
let mutex = Mutex()5
6
main() {7
let foo = spawn { =>8
mutex.lock()9
sum = sum + 110
}11
let bar = spawn { =>12
mutex.lock()13
sum = sum + 114
}15
foo.get()16
println("${sum}")17
bar.get() // Because the thread is not unlocked, other threads waiting to obtain the current mutex will be blocked.18
}Code 7 · cangjie
1
import std.sync.Mutex2
3
var sum: Int64 = 04
let mutex = Mutex()5
6
main() {7
let foo = spawn { =>8
sum = sum + 19
mutex.unlock() // Error, Unlock without obtaining the lock and throw an exception: IllegalSynchronizationStateException.10
}11
foo.get()12
}Code 8 · cangjie
1
import std.sync.Mutex2
3
var sum: Int64 = 04
let mutex = Mutex()5
6
main() {7
for (i in 0..100) {8
spawn { =>9
mutex.tryLock() // Error, `tryLock()` just trying to acquire a lock, there is no guarantee that the lock will be acquired, and this can lead to abnormal behavior.10
sum = sum + 111
mutex.unlock()12
}13
}14
}Code 9 · cangjie
1
import std.sync.Mutex2
3
var count: Int64 = 04
let mtx = Mutex()5
6
func foo() {7
mtx.lock()8
count += 109
bar()10
mtx.unlock()11
}12
13
func bar() {14
mtx.lock()15
count += 10016
mtx.unlock()17
}18
19
main(): Int64 {20
let fut = spawn {21
sleep(Duration.millisecond) // sleep for 1ms.22
foo()23
}24
25
foo()26
27
fut.get()28
29
println("count = ${count}")30
return 031
}Code 10 · text
1
count = 220modifiedConditiontextcode+2 lines, -1 line
v1.0.5
Section Text
1
`Condition` 是与某个互斥锁绑定的条件变量(也就是等待队列),`Condition` 实例由互斥锁创建,一个互斥锁可以创建多个 `Condition` 实例。`Condition` 可以使线程阻塞并等待来自另一个线程的信号以恢复执行。这是一种利用共享变量进行线程同步的机制,主要提供如下方法:2
3
4
调用 `Condition` 接口的 `wait`、`notify` 或 `notifyAll` 方法前,需要确保当前线程已经持有绑定的锁。`wait` 方法包含如下动作:5
6
1. 添加当前线程到对应锁的等待队列中;7
2. 阻塞当前线程,同时完全释放该锁,并记录锁的重入次数;8
3. 等待某个其他线程使用同一个 `Condition` 实例的 `notify` 或 `notifyAll` 方法向该线程发出信号;9
4. 当前线程被唤醒后,会自动尝试重新获取锁,且持有锁的重入状态与第 2 步记录的重入次数相同;但是如果尝试获取锁失败,则当前线程会阻塞在该锁上。10
11
`wait` 方法接受一个可选参数 `timeout`。需要注意的是,业界很多常用的常规操作系统不保证调度的实时性,因此无法保证一个线程会被阻塞“精确的 N 纳秒”——可能会观察到与系统相关的不精确情况。此外,当前语言规范明确允许实现产生虚假唤醒——在这种情况下,`wait` 返回值是由实现决定的——可能为 `true` 或 `false`。因此鼓励开发者始终将 `wait` 包在一个循环中:12
13
14
以下是使用 `Condition` 的一个正确示例:15
16
<!-- verify -->17
18
19
输出结果应为:20
21
22
`Condition` 对象执行 `wait` 时,必须在锁的保护下进行,否则 `wait` 中释放锁的操作会抛出异常。23
24
以下是使用条件变量的一些错误示例:25
26
<!-- run.error -->27
28
29
有时在复杂的线程间同步的场景下需要对同一个锁对象生成多个 `Condition` 实例,以下示例实现了一个长度固定的有界 `FIFO` 队列。当队列为空,`get()` 会被阻塞;当队列已满,`put()` 会被阻塞。30
31
<!-- compile -->Code 1 · cangjie
1
public class Mutex <: UniqueLock {2
// ...3
// Generate a Condition instance for the mutex.4
public func condition(): Condition5
}6
7
public interface Condition {8
// Wait for a signal, blocking the current thread.9
func wait(): Unit10
func wait(timeout!: Duration): Bool11
12
// Wait for a signal and predicate, blocking the current thread.13
func waitUntil(predicate: ()->Bool): Unit14
func waitUntil(predicate: ()->Bool, timeout!: Duration): Bool15
16
// Wake up one thread of those waiting on the monitor, if any.17
func notify(): Unit18
19
// Wake up all threads waiting on the monitor, if any.20
func notifyAll(): Unit21
}Code 2 · cangjie
1
synchronized (obj) {2
while (<condition is not true>) {3
obj.wait()4
}5
}Code 3 · text
1
import std.sync.Mutex2
3
let mtx = Mutex()4
let condition = synchronized(mtx) {5
mtx.condition()6
}7
var flag: Bool = true8
9
main(): Int64 {10
let fut = spawn {11
mtx.lock()12
while (flag) {13
println("New thread: before wait")14
condition.wait()15
println("New thread: after wait")16
}17
mtx.unlock()18
}19
20
// Sleep for 10ms, to make sure the new thread can be executed.21
sleep(10 * Duration.millisecond)22
23
mtx.lock()24
println("Main thread: set flag")25
flag = false26
mtx.unlock()27
28
mtx.lock()29
println("Main thread: notify")30
condition.notifyAll()31
mtx.unlock()32
33
// wait for the new thread finished.34
fut.get()35
return 036
}Code 4 · cangjie
1
New thread: before wait2
Main thread: set flag3
Main thread: notify4
New thread: after waitCode 5 · cangjie
1
import std.sync.Mutex2
3
let m1 = Mutex()4
let c1 = synchronized(m1) {5
m1.condition()6
}7
let m2 = Mutex()8
var flag: Bool = true9
var count: Int64 = 010
11
func foo1() {12
spawn {13
m2.lock()14
while (flag) {15
c1.wait() // Error:The lock used together with the condition variable must be the same lock and in the locked state. Otherwise, the unlock operation in `wait` throws an exception.16
}17
count = count + 118
m2.unlock()19
}20
m1.lock()21
flag = false22
c1.notifyAll()23
m1.unlock()24
}25
26
func foo2() {27
spawn {28
while (flag) {29
c1.wait() // Error:The `wait` of a conditional variable must be called with a lock held.30
}31
count = count + 132
}33
m1.lock()34
flag = false35
c1.notifyAll()36
m1.unlock()37
}38
39
main() {40
foo1()41
foo2()42
c1.wait()43
}Code 6 · cangjie
1
import std.sync.{Mutex, Condition}2
3
class BoundedQueue {4
// Create a Mutex, two Conditions.5
let m: Mutex = Mutex()6
var notFull: Condition7
var notEmpty: Condition8
9
var count: Int64 // Object count in buffer.10
var head: Int64 // Write index.11
var tail: Int64 // Read index.12
13
// Queue's length is 100.14
let items: Array<Object> = Array<Object>(100, {i => Object()})15
16
init() {17
count = 018
head = 019
tail = 020
21
synchronized(m) {22
notFull = m.condition()23
notEmpty = m.condition()24
}25
}26
27
// Insert an object, if the queue is full, block the current thread.28
public func put(x: Object) {29
// Acquire the mutex.30
synchronized(m) {31
while (count == 100) {32
// If the queue is full, wait for the "queue notFull" event.33
notFull.wait()34
}35
items[head] = x36
head++37
if (head == 100) {38
head = 039
}40
count++41
42
// An object has been inserted and the current queue is no longer43
// empty, so wake up the thread previously blocked on get()44
// because the queue was empty.45
notEmpty.notify()46
} // Release the mutex.47
}48
49
// Pop an object, if the queue is empty, block the current thread.50
public func get(): Object {51
// Acquire the mutex.52
synchronized(m) {53
while (count == 0) {54
// If the queue is empty, wait for the "queue notEmpty" event.55
notEmpty.wait()56
}57
let x: Object = items[tail]58
tail++59
if (tail == 100) {60
tail = 061
}62
count--63
64
// An object has been popped and the current queue is no longer65
// full, so wake up the thread previously blocked on put()66
// because the queue was full.67
notFull.notify()68
69
return x70
} // Release the mutex.71
}72
}v1.1.0
Section Text
1
`Condition` 是与某个互斥锁绑定的条件变量(也就是等待队列),`Condition` 实例由互斥锁创建,一个互斥锁可以创建多个 `Condition` 实例。`Condition` 可以使线程阻塞并等待来自另一个线程的信号以恢复执行。这是一种利用共享变量进行线程同步的机制,主要提供如下方法:2
3
<!-- code_no_check -->4
5
调用 `Condition` 接口的 `wait`、`notify` 或 `notifyAll` 方法前,需要确保当前线程已经持有绑定的锁。`wait` 方法包含如下动作:6
7
1. 添加当前线程到对应锁的等待队列中;8
2. 阻塞当前线程,同时完全释放该锁,并记录锁的重入次数;9
3. 等待某个其他线程使用同一个 `Condition` 实例的 `notify` 或 `notifyAll` 方法向该线程发出信号;10
4. 当前线程被唤醒后,会自动尝试重新获取锁,且持有锁的重入状态与第 2 步记录的重入次数相同;但是如果尝试获取锁失败,则当前线程会阻塞在该锁上。11
12
`wait` 方法接受一个可选参数 `timeout`。需要注意的是,业界很多常用的常规操作系统不保证调度的实时性,因此无法保证一个线程会被阻塞“精确的 N 纳秒”——可能会观察到与系统相关的不精确情况。此外,当前语言规范明确允许实现产生虚假唤醒——在这种情况下,`wait` 返回值是由实现决定的——可能为 `true` 或 `false`。因此鼓励开发者始终将 `wait` 包在一个循环中。13
以下是使用 `Condition` 的一个正确示例:14
15
<!-- verify -->16
17
18
输出结果应为:19
20
21
`Condition` 对象执行 `wait` 时,必须在锁的保护下进行,否则 `wait` 中释放锁的操作会抛出异常。22
23
以下是使用条件变量的一些错误示例:24
25
<!-- run.error -->26
27
28
有时在复杂的线程间同步的场景下需要对同一个锁对象生成多个 `Condition` 实例,以下示例实现了一个长度固定的有界 `FIFO` 队列。当队列为空,`get()` 会被阻塞;当队列已满,`put()` 会被阻塞。29
30
<!-- compile -->Code 1 · cangjie
1
public class Mutex <: UniqueLock {2
// ...3
// Generate a Condition instance for the mutex.4
public func condition(): Condition5
}6
7
public interface Condition {8
// Wait for a signal, blocking the current thread.9
func wait(): Unit10
func wait(timeout!: Duration): Bool11
12
// Wait for a signal and predicate, blocking the current thread.13
func waitUntil(predicate: ()->Bool): Unit14
func waitUntil(predicate: ()->Bool, timeout!: Duration): Bool15
16
// Wake up one thread of those waiting on the monitor, if any.17
func notify(): Unit18
19
// Wake up all threads waiting on the monitor, if any.20
func notifyAll(): Unit21
}Code 2 · cangjie
1
import std.sync.Mutex2
3
let mtx = Mutex()4
let condition = synchronized(mtx) {5
mtx.condition()6
}7
var flag: Bool = true8
9
main(): Int64 {10
let fut = spawn {11
mtx.lock()12
while (flag) {13
println("New thread: before wait")14
condition.wait()15
println("New thread: after wait")16
}17
mtx.unlock()18
}19
20
// Sleep for 10ms, to make sure the new thread can be executed.21
sleep(10 * Duration.millisecond)22
23
mtx.lock()24
println("Main thread: set flag")25
flag = false26
mtx.unlock()27
28
mtx.lock()29
println("Main thread: notify")30
condition.notifyAll()31
mtx.unlock()32
33
// wait for the new thread finished.34
fut.get()35
return 036
}Code 3 · text
1
New thread: before wait2
Main thread: set flag3
Main thread: notify4
New thread: after waitCode 4 · cangjie
1
import std.sync.Mutex2
3
let m1 = Mutex()4
let c1 = synchronized(m1) {5
m1.condition()6
}7
let m2 = Mutex()8
var flag: Bool = true9
var count: Int64 = 010
11
func foo1() {12
spawn {13
m2.lock()14
while (flag) {15
c1.wait() // Error:The lock used together with the condition variable must be the same lock and in the locked state. Otherwise, the unlock operation in `wait` throws an exception.16
}17
count = count + 118
m2.unlock()19
}20
m1.lock()21
flag = false22
c1.notifyAll()23
m1.unlock()24
}25
26
func foo2() {27
spawn {28
while (flag) {29
c1.wait() // Error:The `wait` of a conditional variable must be called with a lock held.30
}31
count = count + 132
}33
m1.lock()34
flag = false35
c1.notifyAll()36
m1.unlock()37
}38
39
main() {40
foo1()41
foo2()42
c1.wait()43
}Code 5 · cangjie
1
import std.sync.{Mutex, Condition}2
3
class BoundedQueue {4
// Create a Mutex, two Conditions.5
let m: Mutex = Mutex()6
var notFull: Condition7
var notEmpty: Condition8
9
var count: Int64 // Object count in buffer.10
var head: Int64 // Write index.11
var tail: Int64 // Read index.12
13
// Queue's length is 100.14
let items: Array<Object> = Array<Object>(100, {i => Object()})15
16
init() {17
count = 018
head = 019
tail = 020
21
synchronized(m) {22
notFull = m.condition()23
notEmpty = m.condition()24
}25
}26
27
// Insert an object, if the queue is full, block the current thread.28
public func put(x: Object) {29
// Acquire the mutex.30
synchronized(m) {31
while (count == 100) {32
// If the queue is full, wait for the "queue notFull" event.33
notFull.wait()34
}35
items[head] = x36
head++37
if (head == 100) {38
head = 039
}40
count++41
42
// An object has been inserted and the current queue is no longer43
// empty, so wake up the thread previously blocked on get()44
// because the queue was empty.45
notEmpty.notify()46
} // Release the mutex.47
}48
49
// Pop an object, if the queue is empty, block the current thread.50
public func get(): Object {51
// Acquire the mutex.52
synchronized(m) {53
while (count == 0) {54
// If the queue is empty, wait for the "queue notEmpty" event.55
notEmpty.wait()56
}57
let x: Object = items[tail]58
tail++59
if (tail == 100) {60
tail = 061
}62
count--63
64
// An object has been popped and the current queue is no longer65
// full, so wake up the thread previously blocked on put()66
// because the queue was full.67
notFull.notify()68
69
return x70
} // Release the mutex.71
}72
}Code 6 · cangjie
1
modified线程局部变量 ThreadLocaltext+1 line
v1.0.5
Section Text
1
使用 core 包中的 `ThreadLocal` 可以创建并使用线程局部变量,每一个线程都有它独立的一个存储空间来保存这些线程局部变量。因此,在每个线程可以安全地访问他们各自的线程局部变量,而不受其他线程的影响。2
3
4
下方示例代码演示了如何通过 `ThreadLocal`类来创建并使用各自线程的局部变量:5
6
<!-- run -->7
8
9
可能的输出结果如下:10
11
12
或者Code 1 · cangjie
1
public class ThreadLocal<T> {2
/* 构造一个携带空值的仓颉线程局部变量 */3
public init()4
5
/* 获得仓颉线程局部变量的值 */6
public func get(): Option<T> // 如果值不存在,则返回 Option<T>.None。返回值 Option<T> - 仓颉线程局部变量的值7
8
/* 通过 value 设置仓颉线程局部变量的值 */9
public func set(value: Option<T>): Unit // 如果传入 Option<T>.None,该局部变量的值将被删除,在线程后续操作中将无法获取。参数 value - 需要设置的局部变量的值。10
}Code 2 · cangjie
1
2
main(): Int64 {3
let tl = ThreadLocal<Int64>()4
let fut1 = spawn {5
tl.set(123)6
println("tl in spawn1 = ${tl.get().getOrThrow()}")7
}8
let fut2 = spawn {9
tl.set(456)10
println("tl in spawn2 = ${tl.get().getOrThrow()}")11
}12
fut1.get()13
fut2.get()14
015
}Code 3 · text
1
tl in spawn1 = 1232
tl in spawn2 = 456Code 4 · text
1
tl in spawn2 = 4562
tl in spawn1 = 123v1.1.0
Section Text
1
使用 core 包中的 `ThreadLocal` 可以创建并使用线程局部变量,每一个线程都有它独立的一个存储空间来保存这些线程局部变量。因此,在每个线程可以安全地访问他们各自的线程局部变量,而不受其他线程的影响。2
3
<!-- code_no_check -->4
5
下方示例代码演示了如何通过 `ThreadLocal`类来创建并使用各自线程的局部变量:6
7
<!-- run -->8
9
10
可能的输出结果如下:11
12
13
或者Code 1 · cangjie
1
public class ThreadLocal<T> {2
/* 构造一个携带空值的仓颉线程局部变量 */3
public init()4
5
/* 获得仓颉线程局部变量的值 */6
public func get(): Option<T> // 如果值不存在,则返回 Option<T>.None。返回值 Option<T> - 仓颉线程局部变量的值7
8
/* 通过 value 设置仓颉线程局部变量的值 */9
public func set(value: Option<T>): Unit // 如果传入 Option<T>.None,该局部变量的值将被删除,在线程后续操作中将无法获取。参数 value - 需要设置的局部变量的值。10
}Code 2 · cangjie
1
2
main(): Int64 {3
let tl = ThreadLocal<Int64>()4
let fut1 = spawn {5
tl.set(123)6
println("tl in spawn1 = ${tl.get().getOrThrow()}")7
}8
let fut2 = spawn {9
tl.set(456)10
println("tl in spawn2 = ${tl.get().getOrThrow()}")11
}12
fut1.get()13
fut2.get()14
015
}Code 3 · text
1
tl in spawn1 = 1232
tl in spawn2 = 456Code 4 · text
1
tl in spawn2 = 4562
tl in spawn1 = 123