4
4
"fmt"
5
5
"math/big"
6
6
"math/rand"
7
+ "sync"
7
8
"time"
8
9
9
10
lru "github.com/hashicorp/golang-lru"
40
41
41
42
type remoteVerifyManager struct {
42
43
bc * BlockChain
44
+ taskLock sync.RWMutex
43
45
tasks map [common.Hash ]* verifyTask
44
46
peers verifyPeers
45
47
verifiedCache * lru.Cache
@@ -109,14 +111,17 @@ func (vm *remoteVerifyManager) mainLoop() {
109
111
vm .NewBlockVerifyTask (h .Block .Header ())
110
112
case hash := <- vm .verifyCh :
111
113
vm .cacheBlockVerified (hash )
114
+ vm .taskLock .Lock ()
112
115
if task , ok := vm .tasks [hash ]; ok {
113
116
delete (vm .tasks , hash )
114
117
verifyTaskCounter .Dec (1 )
115
118
verifyTaskSucceedMeter .Mark (1 )
116
119
verifyTaskExecutionTimer .Update (time .Since (task .startAt ))
117
120
close (task .terminalCh )
118
121
}
122
+ vm .taskLock .Unlock ()
119
123
case <- pruneTicker .C :
124
+ vm .taskLock .Lock ()
120
125
for hash , task := range vm .tasks {
121
126
if vm .bc .CurrentHeader ().Number .Cmp (task .blockHeader .Number ) == 1 &&
122
127
vm .bc .CurrentHeader ().Number .Uint64 ()- task .blockHeader .Number .Uint64 () > pruneHeightDiff {
@@ -126,16 +131,21 @@ func (vm *remoteVerifyManager) mainLoop() {
126
131
close (task .terminalCh )
127
132
}
128
133
}
134
+ vm .taskLock .Unlock ()
129
135
case message := <- vm .messageCh :
136
+ vm .taskLock .RLock ()
130
137
if vt , ok := vm .tasks [message .verifyResult .BlockHash ]; ok {
131
138
vt .messageCh <- message
132
139
}
140
+ vm .taskLock .RUnlock ()
133
141
134
142
// System stopped
135
143
case <- vm .bc .quit :
144
+ vm .taskLock .RLock ()
136
145
for _ , task := range vm .tasks {
137
146
close (task .terminalCh )
138
147
}
148
+ vm .taskLock .RUnlock ()
139
149
return
140
150
case <- vm .chainHeadSub .Err ():
141
151
return
@@ -156,7 +166,10 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) {
156
166
return
157
167
}
158
168
// if there already has a verify task for this block, skip.
159
- if _ , ok := vm .tasks [hash ]; ok {
169
+ vm .taskLock .RLock ()
170
+ _ , ok := vm .tasks [hash ]
171
+ vm .taskLock .RUnlock ()
172
+ if ok {
160
173
return
161
174
}
162
175
@@ -184,7 +197,9 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) {
184
197
return
185
198
}
186
199
verifyTask := NewVerifyTask (diffHash , header , vm .peers , vm .verifyCh , vm .allowInsecure )
200
+ vm .taskLock .Lock ()
187
201
vm .tasks [hash ] = verifyTask
202
+ vm .taskLock .Unlock ()
188
203
verifyTaskCounter .Inc (1 )
189
204
}(header .Hash ())
190
205
header = vm .bc .GetHeaderByHash (header .ParentHash )
@@ -208,7 +223,16 @@ func (vm *remoteVerifyManager) AncestorVerified(header *types.Header) bool {
208
223
}
209
224
210
225
hash := header .Hash ()
211
- _ , exist := vm .verifiedCache .Get (hash )
226
+
227
+ // Check if the task is complete
228
+ vm .taskLock .RLock ()
229
+ task , exist := vm .tasks [hash ]
230
+ vm .taskLock .RUnlock ()
231
+ if exist {
232
+ <- task .terminalCh
233
+ }
234
+
235
+ _ , exist = vm .verifiedCache .Get (hash )
212
236
return exist
213
237
}
214
238
0 commit comments