@@ -57,7 +57,8 @@ func (e feedTypeError) Error() string {
57
57
return "event: wrong type in " + e .op + " got " + e .got .String () + ", want " + e .want .String ()
58
58
}
59
59
60
- func (f * Feed ) init () {
60
+ func (f * Feed ) init (etype reflect.Type ) {
61
+ f .etype = etype
61
62
f .removeSub = make (chan interface {})
62
63
f .sendLock = make (chan struct {}, 1 )
63
64
f .sendLock <- struct {}{}
@@ -70,36 +71,27 @@ func (f *Feed) init() {
70
71
// The channel should have ample buffer space to avoid blocking other subscribers.
71
72
// Slow subscribers are not dropped.
72
73
func (f * Feed ) Subscribe (channel interface {}) Subscription {
73
- f .once .Do (f .init )
74
-
75
74
chanval := reflect .ValueOf (channel )
76
75
chantyp := chanval .Type ()
77
76
if chantyp .Kind () != reflect .Chan || chantyp .ChanDir ()& reflect .SendDir == 0 {
78
77
panic (errBadChannel )
79
78
}
80
79
sub := & feedSub {feed : f , channel : chanval , err : make (chan error , 1 )}
81
80
82
- f .mu .Lock ()
83
- defer f .mu .Unlock ()
84
- if ! f .typecheck (chantyp .Elem ()) {
81
+ f .once .Do (func () { f .init (chantyp .Elem ()) })
82
+ if f .etype != chantyp .Elem () {
85
83
panic (feedTypeError {op : "Subscribe" , got : chantyp , want : reflect .ChanOf (reflect .SendDir , f .etype )})
86
84
}
85
+
86
+ f .mu .Lock ()
87
+ defer f .mu .Unlock ()
87
88
// Add the select case to the inbox.
88
89
// The next Send will add it to f.sendCases.
89
90
cas := reflect.SelectCase {Dir : reflect .SelectSend , Chan : chanval }
90
91
f .inbox = append (f .inbox , cas )
91
92
return sub
92
93
}
93
94
94
- // note: callers must hold f.mu
95
- func (f * Feed ) typecheck (typ reflect.Type ) bool {
96
- if f .etype == nil {
97
- f .etype = typ
98
- return true
99
- }
100
- return f .etype == typ
101
- }
102
-
103
95
func (f * Feed ) remove (sub * feedSub ) {
104
96
// Delete from inbox first, which covers channels
105
97
// that have not been added to f.sendCases yet.
@@ -128,19 +120,17 @@ func (f *Feed) remove(sub *feedSub) {
128
120
func (f * Feed ) Send (value interface {}) (nsent int ) {
129
121
rvalue := reflect .ValueOf (value )
130
122
131
- f .once .Do (f .init )
123
+ f .once .Do (func () { f .init (rvalue .Type ()) })
124
+ if f .etype != rvalue .Type () {
125
+ panic (feedTypeError {op : "Send" , got : rvalue .Type (), want : f .etype })
126
+ }
127
+
132
128
<- f .sendLock
133
129
134
130
// Add new cases from the inbox after taking the send lock.
135
131
f .mu .Lock ()
136
132
f .sendCases = append (f .sendCases , f .inbox ... )
137
133
f .inbox = nil
138
-
139
- if ! f .typecheck (rvalue .Type ()) {
140
- f .sendLock <- struct {}{}
141
- f .mu .Unlock ()
142
- panic (feedTypeError {op : "Send" , got : rvalue .Type (), want : f .etype })
143
- }
144
134
f .mu .Unlock ()
145
135
146
136
// Set the sent value on all channels.
0 commit comments