Skip to content

Commit 08010a8

Browse files
committed
covermerger: parallelize and use monorepo
1 parent a6f99ac commit 08010a8

File tree

6 files changed

+186
-136
lines changed

6 files changed

+186
-136
lines changed

pkg/covermerger/covermerger.go

+71-40
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
package covermerger
55

66
import (
7+
"context"
78
"encoding/csv"
89
"fmt"
910
"io"
1011
"log"
1112
"strconv"
13+
"sync"
1214

1315
"golang.org/x/exp/maps"
16+
"golang.org/x/sync/errgroup"
1417
)
1518

1619
const (
@@ -23,7 +26,6 @@ const (
2326
)
2427

2528
type FileRecord map[string]string
26-
type FileRecords []FileRecord
2729
type RepoBranchCommit struct {
2830
Repo string
2931
Branch string
@@ -73,24 +75,15 @@ type FileCoverageMerger interface {
7375
Result() *MergeResult
7476
}
7577

76-
func batchFileData(c *Config, targetFilePath string, records FileRecords, processedFiles map[string]struct{},
77-
) (*MergeResult, error) {
78+
func batchFileData(c *Config, targetFilePath string, records []FileRecord) (*MergeResult, error) {
7879
log.Printf("processing %d records for %s", len(records), targetFilePath)
79-
if _, exists := processedFiles[targetFilePath]; exists {
80-
return nil, fmt.Errorf("file was already processed, check the input ordering")
81-
}
82-
processedFiles[targetFilePath] = struct{}{}
8380
repoBranchCommitsMap := make(map[RepoBranchCommit]bool)
8481
for _, record := range records {
8582
repoBranchCommitsMap[record.RepoBranchCommit()] = true
8683
}
8784
repoBranchCommitsMap[c.Base] = true
8885
repoBranchCommits := maps.Keys(repoBranchCommitsMap)
89-
getFiles := getFileVersions
90-
if c.getFileVersionsMock != nil {
91-
getFiles = c.getFileVersionsMock
92-
}
93-
fvs, err := getFiles(c, targetFilePath, repoBranchCommits)
86+
fvs, err := c.FileVersProvider.GetFileVersions(c, targetFilePath, repoBranchCommits)
9487
if err != nil {
9588
return nil, fmt.Errorf("failed to getFileVersions: %w", err)
9689
}
@@ -125,11 +118,11 @@ func makeRecord(fields, schema []string) FileRecord {
125118
}
126119

127120
type Config struct {
128-
Workdir string
129-
skipRepoClone bool
130-
Base RepoBranchCommit
131-
getFileVersionsMock func(*Config, string, []RepoBranchCommit) (fileVersions, error)
132-
repoCache repoCache
121+
Jobs int
122+
Workdir string
123+
skipRepoClone bool
124+
Base RepoBranchCommit
125+
FileVersProvider fileVersProvider
133126
}
134127

135128
func isSchema(fields, schema []string) bool {
@@ -182,32 +175,70 @@ func MergeCSVData(config *Config, reader io.Reader) (map[string]*MergeResult, er
182175
return mergeResult, nil
183176
}
184177

185-
func mergeChanData(c *Config, recordsChan <-chan FileRecord) (map[string]*MergeResult, error) {
178+
type FileRecords struct {
179+
fileName string
180+
records []FileRecord
181+
}
182+
183+
func mergeChanData(c *Config, recordChan <-chan FileRecord) (map[string]*MergeResult, error) {
184+
g, ctx := errgroup.WithContext(context.Background())
185+
frecordChan := groupFileRecords(recordChan, ctx)
186186
stat := make(map[string]*MergeResult)
187-
targetFile := ""
188-
var records []FileRecord
189-
processedFiles := map[string]struct{}{}
190-
for record := range recordsChan {
191-
curTargetFile := record[KeyFilePath]
192-
if targetFile == "" {
193-
targetFile = curTargetFile
194-
}
195-
if curTargetFile != targetFile {
196-
var err error
197-
if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil {
198-
return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
187+
var mu sync.Mutex
188+
for i := 0; i < c.Jobs; i++ {
189+
g.Go(func() error {
190+
for frecord := range frecordChan {
191+
if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil {
192+
return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err)
193+
} else {
194+
mu.Lock()
195+
if _, exist := stat[frecord.fileName]; exist {
196+
mu.Unlock()
197+
return fmt.Errorf("file %s was already processed", frecord.fileName)
198+
}
199+
stat[frecord.fileName] = mr
200+
mu.Unlock()
201+
}
199202
}
200-
records = nil
201-
targetFile = curTargetFile
202-
}
203-
records = append(records, record)
203+
return nil
204+
})
204205
}
205-
if records != nil {
206-
var err error
207-
if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil {
208-
return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
209-
}
206+
if err := g.Wait(); err != nil {
207+
return nil, err
210208
}
211-
212209
return stat, nil
213210
}
211+
212+
func groupFileRecords(recordChan <-chan FileRecord, ctx context.Context) chan FileRecords {
213+
frecordChan := make(chan FileRecords)
214+
go func() {
215+
defer close(frecordChan)
216+
targetFile := ""
217+
var records []FileRecord
218+
for record := range recordChan {
219+
select {
220+
case <-ctx.Done():
221+
return
222+
default:
223+
}
224+
curTargetFile := record[KeyFilePath]
225+
if targetFile == "" {
226+
targetFile = curTargetFile
227+
}
228+
if curTargetFile != targetFile {
229+
frecordChan <- FileRecords{
230+
fileName: targetFile,
231+
records: records,
232+
}
233+
records = nil
234+
targetFile = curTargetFile
235+
}
236+
records = append(records, record)
237+
}
238+
frecordChan <- FileRecords{
239+
fileName: targetFile,
240+
records: records,
241+
}
242+
}()
243+
return frecordChan
244+
}

pkg/covermerger/covermerger_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,15 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
121121
t.Run(test.name, func(t *testing.T) {
122122
aggregation, err := MergeCSVData(
123123
&Config{
124+
Jobs: 2,
124125
Workdir: test.workdir,
125126
skipRepoClone: true,
126127
Base: RepoBranchCommit{
127128
Repo: test.baseRepo,
128129
Branch: test.baseBranch,
129130
Commit: test.baseCommit,
130131
},
131-
getFileVersionsMock: mockGetFileVersions,
132+
FileVersProvider: &fileVersProviderMock{},
132133
},
133134
strings.NewReader(test.bqTable),
134135
)
@@ -140,15 +141,15 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
140141
}
141142
}
142143

143-
func mockGetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
144+
type fileVersProviderMock struct{}
145+
146+
func (m *fileVersProviderMock) GetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
144147
) (fileVersions, error) {
145148
res := make(fileVersions)
146149
for _, rbc := range rbcs {
147150
filePath := c.Workdir + "/repos/" + rbc.Commit + "/" + targetFilePath
148151
if bytes, err := os.ReadFile(filePath); err == nil {
149-
res[rbc] = fileVersion{
150-
content: string(bytes),
151-
}
152+
res[rbc] = string(bytes)
152153
}
153154
}
154155
return res, nil

pkg/covermerger/file_line_merger.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ func makeFileLineCoverMerger(
99
baseFileExists := false
1010
for rbc, fv := range fvs {
1111
if rbc == base {
12-
baseFile = fv.content
12+
baseFile = fv
1313
baseFileExists = true
1414
break
1515
}
@@ -25,7 +25,7 @@ func makeFileLineCoverMerger(
2525
lostFrames: map[RepoBranchCommit]int64{},
2626
}
2727
for rbc, fv := range fvs {
28-
a.matchers[rbc] = makeLineToLineMatcher(fv.content, baseFile)
28+
a.matchers[rbc] = makeLineToLineMatcher(fv, baseFile)
2929
}
3030
return a
3131
}

pkg/covermerger/monorepo.go

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2024 syzkaller project authors. All rights reserved.
2+
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
3+
4+
package covermerger
5+
6+
import (
7+
"fmt"
8+
"log"
9+
"sync"
10+
11+
"github.com/google/syzkaller/pkg/vcs"
12+
"github.com/google/syzkaller/sys/targets"
13+
)
14+
15+
type fileVersProvider interface {
16+
GetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
17+
) (fileVersions, error)
18+
}
19+
20+
type monoRepo struct {
21+
branches map[RepoBranchCommit]struct{}
22+
mu sync.RWMutex
23+
repo vcs.Repo
24+
}
25+
26+
type fileVersions map[RepoBranchCommit]string
27+
28+
func (mr *monoRepo) GetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
29+
) (fileVersions, error) {
30+
mr.mu.RLock()
31+
if !mr.allRepoBranchesPresent(rbcs) {
32+
mr.mu.RUnlock()
33+
if err := mr.cloneBranches(rbcs); err != nil {
34+
return nil, fmt.Errorf("failed to clone repos: %w", err)
35+
}
36+
mr.mu.RLock()
37+
}
38+
defer mr.mu.RUnlock()
39+
res := make(fileVersions)
40+
for _, rbc := range rbcs {
41+
fileBytes, err := mr.repo.Object(targetFilePath, rbc.Commit)
42+
// It is ok if some file doesn't exist. It means we have repo FS diff.
43+
if err != nil {
44+
continue
45+
}
46+
res[rbc] = string(fileBytes)
47+
}
48+
return res, nil
49+
}
50+
51+
func (mr *monoRepo) allRepoBranchesPresent(rbcs []RepoBranchCommit) bool {
52+
for _, rbc := range rbcs {
53+
if !mr.repoBranchPresent(rbc) {
54+
return false
55+
}
56+
}
57+
return true
58+
}
59+
60+
func (mr *monoRepo) repoBranchPresent(rbc RepoBranchCommit) bool {
61+
rbc.Commit = ""
62+
_, ok := mr.branches[rbc]
63+
return ok
64+
}
65+
66+
func (mr *monoRepo) addRepoBranch(rbc RepoBranchCommit) error {
67+
rbc.Commit = ""
68+
mr.branches[rbc] = struct{}{}
69+
log.Printf("cloning repo: %s, branch: %s", rbc.Repo, rbc.Branch)
70+
if rbc.Repo == "" || rbc.Branch == "" {
71+
panic("repo and branch are needed")
72+
}
73+
if _, err := mr.repo.CheckoutBranch(rbc.Repo, rbc.Branch); err != nil {
74+
return fmt.Errorf("failed to CheckoutBranch(repo %s, branch %s): %w",
75+
rbc.Repo, rbc.Branch, err)
76+
}
77+
return nil
78+
}
79+
80+
func MakeMonoRepo(workdir string) fileVersProvider {
81+
rbcPath := workdir + "/repos/linux_kernels"
82+
mr := &monoRepo{
83+
branches: map[RepoBranchCommit]struct{}{},
84+
}
85+
var err error
86+
if mr.repo, err = vcs.NewRepo(targets.Linux, "none", rbcPath); err != nil {
87+
panic(fmt.Sprintf("failed to create/open repo at %s: %s", rbcPath, err.Error()))
88+
}
89+
return mr
90+
}
91+
92+
func (mr *monoRepo) cloneBranches(rbcs []RepoBranchCommit) error {
93+
mr.mu.Lock()
94+
defer mr.mu.Unlock()
95+
for _, rbc := range rbcs {
96+
if mr.repoBranchPresent(rbc) {
97+
continue
98+
}
99+
if err := mr.addRepoBranch(rbc); err != nil {
100+
return err
101+
}
102+
}
103+
return nil
104+
}

0 commit comments

Comments
 (0)