@@ -57,8 +57,7 @@ func (e feedTypeError) Error() string {
57
57
return "event: wrong type in " + e .op + " got " + e .got .String () + ", want " + e .want .String ()
58
58
}
59
59
60
- func (f * Feed ) init (etype reflect.Type ) {
61
- f .etype = etype
60
+ func (f * Feed ) init () {
62
61
f .removeSub = make (chan interface {})
63
62
f .sendLock = make (chan struct {}, 1 )
64
63
f .sendLock <- struct {}{}
@@ -71,27 +70,36 @@ func (f *Feed) init(etype reflect.Type) {
71
70
// The channel should have ample buffer space to avoid blocking other subscribers.
72
71
// Slow subscribers are not dropped.
73
72
func (f * Feed ) Subscribe (channel interface {}) Subscription {
73
+ f .once .Do (f .init )
74
+
74
75
chanval := reflect .ValueOf (channel )
75
76
chantyp := chanval .Type ()
76
77
if chantyp .Kind () != reflect .Chan || chantyp .ChanDir ()& reflect .SendDir == 0 {
77
78
panic (errBadChannel )
78
79
}
79
80
sub := & feedSub {feed : f , channel : chanval , err : make (chan error , 1 )}
80
81
81
- f .once .Do (func () { f .init (chantyp .Elem ()) })
82
- if f .etype != chantyp .Elem () {
83
- panic (feedTypeError {op : "Subscribe" , got : chantyp , want : reflect .ChanOf (reflect .SendDir , f .etype )})
84
- }
85
-
86
82
f .mu .Lock ()
87
83
defer f .mu .Unlock ()
84
+ if ! f .typecheck (chantyp .Elem ()) {
85
+ panic (feedTypeError {op : "Subscribe" , got : chantyp , want : reflect .ChanOf (reflect .SendDir , f .etype )})
86
+ }
88
87
// Add the select case to the inbox.
89
88
// The next Send will add it to f.sendCases.
90
89
cas := reflect.SelectCase {Dir : reflect .SelectSend , Chan : chanval }
91
90
f .inbox = append (f .inbox , cas )
92
91
return sub
93
92
}
94
93
94
+ // note: callers must hold f.mu
95
+ func (f * Feed ) typecheck (typ reflect.Type ) bool {
96
+ if f .etype == nil {
97
+ f .etype = typ
98
+ return true
99
+ }
100
+ return f .etype == typ
101
+ }
102
+
95
103
func (f * Feed ) remove (sub * feedSub ) {
96
104
// Delete from inbox first, which covers channels
97
105
// that have not been added to f.sendCases yet.
@@ -120,17 +128,19 @@ func (f *Feed) remove(sub *feedSub) {
120
128
func (f * Feed ) Send (value interface {}) (nsent int ) {
121
129
rvalue := reflect .ValueOf (value )
122
130
123
- f .once .Do (func () { f .init (rvalue .Type ()) })
124
- if f .etype != rvalue .Type () {
125
- panic (feedTypeError {op : "Send" , got : rvalue .Type (), want : f .etype })
126
- }
127
-
131
+ f .once .Do (f .init )
128
132
<- f .sendLock
129
133
130
134
// Add new cases from the inbox after taking the send lock.
131
135
f .mu .Lock ()
132
136
f .sendCases = append (f .sendCases , f .inbox ... )
133
137
f .inbox = nil
138
+
139
+ if ! f .typecheck (rvalue .Type ()) {
140
+ f .sendLock <- struct {}{}
141
+ f .mu .Unlock ()
142
+ panic (feedTypeError {op : "Send" , got : rvalue .Type (), want : f .etype })
143
+ }
134
144
f .mu .Unlock ()
135
145
136
146
// Set the sent value on all channels.
0 commit comments