Skip to content

Commit 4fb1bd3

Browse files
committed
Initial commit
0 parents  commit 4fb1bd3

10 files changed

+566
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
drp-delta

Makefile

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
build: *.go src/*.go
2+
go build
3+
4+
run: build
5+
./drp-delta
6+
7+
clean:
8+
rm drp-delta

go.mod

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module drp-delta
2+
3+
go 1.20
4+
5+
require github.com/google/uuid v1.3.0

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
2+
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=

main.go

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package main
2+
3+
import "fmt"
4+
5+
func main() {
6+
fmt.Println("Hello, World")
7+
}

src/defserver.go

+201
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package drpdelta
2+
3+
func (d *definition) server(inbox <-chan InboundMessage, outbox chan<- OutboundMessage) {
4+
for inMsg := range inbox {
5+
switch inContent := inMsg.Content.(type) {
6+
case ChangeMessage:
7+
inContentRef := &inContent
8+
9+
d.changeLists[inMsg.Sender] = append(d.changeLists[inMsg.Sender], inContentRef)
10+
for _, tx := range inContent.txs {
11+
txid := tx.id
12+
if _, found := d.changeGraph[txid]; !found {
13+
d.changeGraph[txid] = map[Address]*ChangeMessage{}
14+
}
15+
16+
d.changeGraph[txid][inMsg.Sender] = inContentRef
17+
}
18+
19+
for _, pred := range inContent.preds {
20+
if _, found := d.changeGraph[pred.id]; !found {
21+
d.changeGraph[pred.id] = map[Address]*ChangeMessage{}
22+
for _, name := range pred.writes {
23+
d.changeGraph[pred.id][name] = nil
24+
}
25+
}
26+
}
27+
}
28+
29+
d.tick(inbox, outbox)
30+
}
31+
}
32+
33+
func (d *definition) tick(inbox <-chan InboundMessage, outbox chan<- OutboundMessage) {
34+
var smallestTransitivePreds map[*ChangeMessage]struct{}
35+
for _, changeList := range d.changeLists {
36+
if len(changeList) == 0 {
37+
continue
38+
}
39+
40+
change := changeList[0]
41+
42+
if transitivePreds, ok := d.findTransitivePreds(change); ok {
43+
if smallestTransitivePreds == nil || len(transitivePreds) < len(smallestTransitivePreds) {
44+
smallestTransitivePreds = transitivePreds
45+
}
46+
}
47+
}
48+
49+
if smallestTransitivePreds != nil {
50+
allTxs, allPreds := unionTxsPreds(smallestTransitivePreds)
51+
52+
for inputName, changeList := range d.changeLists {
53+
for i, change := range changeList {
54+
if _, found := smallestTransitivePreds[change]; found {
55+
// execute the change
56+
d.replicas[inputName] = change.value
57+
delete(smallestTransitivePreds, change)
58+
} else {
59+
d.changeLists[inputName] = d.changeLists[inputName][i:]
60+
break
61+
}
62+
}
63+
}
64+
65+
if len(smallestTransitivePreds) > 0 {
66+
panic("invariant broken: didn't exhaust the transitive preds set!")
67+
}
68+
69+
newValue := d.f(d.replicas)
70+
71+
for _, sub := range d.subscribers {
72+
outbox <- OutboundMessage{Target: sub, Content: ChangeMessage{txs: allTxs, preds: allPreds, value: newValue}}
73+
}
74+
}
75+
}
76+
77+
func (d *definition) findTransitivePreds(change *ChangeMessage) (map[*ChangeMessage]struct{}, bool) {
78+
preds := map[*ChangeMessage]struct{}{}
79+
ok := d._findTransitivePreds(change, preds)
80+
return preds, ok
81+
}
82+
83+
func (d *definition) _findTransitivePreds(change *ChangeMessage, preds map[*ChangeMessage]struct{}) bool {
84+
if _, found := preds[change]; found {
85+
return true
86+
}
87+
preds[change] = struct{}{}
88+
89+
for _, tx := range change.txs {
90+
var overlappingInputs []Address
91+
for _, name := range tx.writes {
92+
for inputName, inputTransitiveVariableDeps := range d.inputTransitiveVariableDeps {
93+
if _, found := inputTransitiveVariableDeps[name]; found {
94+
overlappingInputs = append(overlappingInputs, inputName)
95+
}
96+
}
97+
}
98+
99+
for _, overlappingInput := range overlappingInputs {
100+
thisInputChange := d.changeGraph[tx.id][overlappingInput]
101+
if thisInputChange == nil {
102+
// this input has not yet received this pred
103+
return false
104+
} else if !d._findTransitivePreds(thisInputChange, preds) {
105+
// the recursive findTransitivePreds call failed
106+
return false
107+
}
108+
}
109+
}
110+
111+
for _, changePred := range change.preds {
112+
if namesToChanges, found := d.changeGraph[changePred.id]; found {
113+
if namesToChanges == nil {
114+
panic("invariant 1 broken in _findTransitivePreds")
115+
}
116+
117+
var overlappingInputs []Address
118+
for name := range namesToChanges {
119+
for inputName, inputTransitiveVariableDeps := range d.inputTransitiveVariableDeps {
120+
if _, found := inputTransitiveVariableDeps[name]; found {
121+
overlappingInputs = append(overlappingInputs, inputName)
122+
}
123+
}
124+
}
125+
126+
for _, overlappingInput := range overlappingInputs {
127+
thisInputChange := d.changeGraph[changePred.id][overlappingInput]
128+
if thisInputChange == nil {
129+
// this input has not yet received this pred
130+
return false
131+
} else if !d._findTransitivePreds(thisInputChange, preds) {
132+
// the recursive findTransitivePreds call failed
133+
return false
134+
}
135+
}
136+
} else {
137+
panic("invariant 2 broken in _findTransitivePreds")
138+
}
139+
}
140+
141+
return true
142+
}
143+
144+
func unionTxsPreds(changes map[*ChangeMessage]struct{}) (txs []Tx, preds []Tx) {
145+
txsMap := map[Txid][]Address{}
146+
predsMap := map[Txid][]Address{}
147+
148+
for change, _ := range changes {
149+
for _, tx := range change.txs {
150+
txsMap[tx.id] = tx.writes
151+
}
152+
for _, tx := range change.preds {
153+
predsMap[tx.id] = tx.writes
154+
}
155+
}
156+
157+
txs = make([]Tx, len(txsMap))
158+
preds = make([]Tx, len(predsMap))
159+
for id, writes := range txsMap {
160+
txs = append(txs, Tx{id, writes})
161+
}
162+
for id, writes := range predsMap {
163+
preds = append(txs, Tx{id, writes})
164+
}
165+
166+
return
167+
}
168+
169+
func NewDefinition(address Address, deps []ReactiveNode, f func(map[Address]any) any) definition {
170+
replicas := map[Address]any{}
171+
changeLists := map[Address][]*ChangeMessage{}
172+
for _, dep := range deps {
173+
dep.Subscribe(address)
174+
replicas[dep.Address()] = nil
175+
changeLists[dep.Address()] = []*ChangeMessage{}
176+
}
177+
178+
return definition{
179+
Address: address,
180+
replicas: replicas,
181+
changeGraph: map[Txid]map[Address]*ChangeMessage{},
182+
changeLists: changeLists,
183+
f: f,
184+
subscribers: []Address{},
185+
inputTransitiveVariableDeps: map[Address]map[Address]struct{}{}, // TODO: properly impl this
186+
}
187+
}
188+
189+
func (d *definition) Subscribe(who Address) {
190+
d.subscribers = append(d.subscribers, who)
191+
}
192+
193+
type definition struct {
194+
Address Address
195+
replicas map[Address]any
196+
changeGraph map[Txid]map[Address]*ChangeMessage
197+
changeLists map[Address][]*ChangeMessage
198+
f func(map[Address]any) any
199+
subscribers []Address
200+
inputTransitiveVariableDeps map[Address]map[Address]struct{}
201+
}

src/doaction.go

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package drpdelta
2+
3+
import "fmt"
4+
5+
func DoAction(action Action, inbox <-chan InboundMessage, outbox chan<- OutboundMessage) {
6+
txid := NewTxid()
7+
8+
// make a list of all the things we write to for later use
9+
writes := []Address{}
10+
for name, kind := range action.targets {
11+
if kind == write {
12+
writes = append(writes, name)
13+
}
14+
}
15+
16+
// request all queues
17+
for name := range action.targets {
18+
outbox <- OutboundMessage{Target: name, Content: QueueMessage{txid: txid}}
19+
}
20+
// wait to be granted all queues
21+
queueResponseCount := 0
22+
for inMsg := range inbox {
23+
if _, ok := inMsg.Content.(QueueGrantedMessage); ok {
24+
queueResponseCount += 1
25+
26+
if queueResponseCount == len(action.targets) {
27+
break
28+
}
29+
} else {
30+
fmt.Println("Unexpected message:", inMsg)
31+
panic("")
32+
}
33+
}
34+
35+
// request all locks
36+
for name, kind := range action.targets {
37+
outbox <- OutboundMessage{Target: name, Content: LockMessage{txid: txid, kind: kind}}
38+
}
39+
// wait to be granted all locks
40+
lockResponseCount := 0
41+
for inMsg := range inbox {
42+
if _, ok := inMsg.Content.(LockGrantedMessage); ok {
43+
lockResponseCount += 1
44+
45+
if lockResponseCount == len(action.targets) {
46+
break
47+
}
48+
} else {
49+
fmt.Println("Unexpected message:", inMsg)
50+
panic("")
51+
}
52+
}
53+
54+
// request all reads
55+
for name := range action.targets {
56+
outbox <- OutboundMessage{Target: name, Content: ReadMessage{tx: Tx{id: txid, writes: writes}}}
57+
}
58+
// wait to be granted all reads
59+
values := map[Address]any{}
60+
preds := []Tx{}
61+
for inMsg := range inbox {
62+
if inContent, ok := inMsg.Content.(ReadResultMessage); ok {
63+
values[inMsg.Sender] = inContent.value
64+
preds = append(preds, inContent.valueTx)
65+
}
66+
}
67+
68+
// do all writes
69+
for name, value := range action.f(values) {
70+
if kind, ok := action.targets[name]; !ok || kind != write {
71+
panic("invariant broken: action tries to write to variable it doesn't have a write lock for")
72+
}
73+
74+
outbox <- OutboundMessage{Target: name, Content: WriteMessage{tx: Tx{id: txid, writes: writes}, preds: preds, value: value}}
75+
}
76+
}
77+
78+
type Action struct {
79+
targets map[Address]lockKind
80+
f func(map[Address]any) map[Address]any
81+
}

src/messages.go

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package drpdelta
2+
3+
import "github.com/google/uuid"
4+
5+
type InboundMessage struct {
6+
Sender Address
7+
Content any
8+
}
9+
10+
type OutboundMessage struct {
11+
Target Address
12+
Content any
13+
}
14+
15+
type Address string
16+
17+
type QueueMessage struct {
18+
txid Txid
19+
}
20+
21+
type LockMessage struct {
22+
txid Txid
23+
kind lockKind
24+
}
25+
26+
type ReadMessage struct {
27+
tx Tx
28+
}
29+
30+
type WriteMessage struct {
31+
tx Tx
32+
preds []Tx
33+
value any
34+
}
35+
36+
type QueueGrantedMessage struct {
37+
txid Txid
38+
}
39+
40+
type LockGrantedMessage struct {
41+
txid Txid
42+
kind lockKind
43+
}
44+
45+
type ReadResultMessage struct {
46+
txid Txid
47+
value any
48+
valueTx Tx
49+
}
50+
51+
type ChangeMessage struct {
52+
txs []Tx
53+
preds []Tx
54+
value any
55+
}
56+
57+
// A transaction ID plus the set of names written to by the transaction.
58+
type Tx struct {
59+
// the transaction's ID
60+
id Txid
61+
// the list of values written to by the transaction
62+
writes []Address
63+
}
64+
65+
type Txid string
66+
67+
func NewTxid() Txid {
68+
return Txid(uuid.New().String())
69+
}

src/orchestrator.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package drpdelta
2+
3+
type ReactiveNode interface {
4+
Subscribe(Address)
5+
Address() Address
6+
}

0 commit comments

Comments
 (0)