@@ -66,14 +66,14 @@ vector<Selectable *> Orch::getSelectables()
66
66
return selectables;
67
67
}
68
68
69
- void Consumer::addToSync (std::deque<KeyOpFieldsValuesTuple> &entries)
69
+ size_t Consumer::addToSync (std::deque<KeyOpFieldsValuesTuple> &entries)
70
70
{
71
71
SWSS_LOG_ENTER ();
72
72
73
73
/* Nothing popped */
74
74
if (entries.empty ())
75
75
{
76
- return ;
76
+ return 0 ;
77
77
}
78
78
79
79
for (auto & entry: entries)
@@ -120,10 +120,11 @@ void Consumer::addToSync(std::deque<KeyOpFieldsValuesTuple> &entries)
120
120
m_toSync[key] = KeyOpFieldsValuesTuple (key, op, existing_values);
121
121
}
122
122
}
123
+ return entries.size ();
123
124
}
124
125
125
126
// TODO: Table should be const
126
- void Consumer::refillToSync (Table* table)
127
+ size_t Consumer::refillToSync (Table* table)
127
128
{
128
129
std::deque<KeyOpFieldsValuesTuple> entries;
129
130
vector<string> keys;
@@ -142,15 +143,28 @@ void Consumer::refillToSync(Table* table)
142
143
entries.push_back (kco);
143
144
}
144
145
145
- addToSync (entries);
146
+ return addToSync (entries);
146
147
}
147
148
148
- void Consumer::refillToSync ()
149
+ size_t Consumer::refillToSync ()
149
150
{
150
- auto db = getConsumerTable ()->getDbConnector ();
151
- string tableName = getConsumerTable ()->getTableName ();
152
- auto table = Table (db, tableName);
153
- refillToSync (&table);
151
+ ConsumerTableBase *consumerTable = getConsumerTable ();
152
+
153
+ auto subTable = dynamic_cast <SubscriberStateTable *>(consumerTable);
154
+ if (subTable != NULL )
155
+ {
156
+ std::deque<KeyOpFieldsValuesTuple> entries;
157
+ subTable->pops (entries);
158
+ return addToSync (entries);
159
+ }
160
+ else
161
+ {
162
+ // consumerTable is either ConsumerStateTable or ConsumerTable
163
+ auto db = consumerTable->getDbConnector ();
164
+ string tableName = consumerTable->getTableName ();
165
+ auto table = Table (db, tableName);
166
+ return refillToSync (&table);
167
+ }
154
168
}
155
169
156
170
void Consumer::execute ()
@@ -171,31 +185,50 @@ void Consumer::drain()
171
185
m_orch->doTask (*this );
172
186
}
173
187
174
- bool Orch::addExistingData (const string& tableName)
188
+ size_t Orch::addExistingData (const string& tableName)
175
189
{
176
- Consumer* consumer = dynamic_cast <Consumer *>(getExecutor (tableName));
190
+ auto consumer = dynamic_cast <Consumer *>(getExecutor (tableName));
177
191
if (consumer == NULL )
178
192
{
179
193
SWSS_LOG_ERROR (" No consumer %s in Orch" , tableName.c_str ());
180
- return false ;
194
+ return 0 ;
181
195
}
182
196
183
- consumer->refillToSync ();
184
- return true ;
197
+ return consumer->refillToSync ();
185
198
}
186
199
187
200
// TODO: Table should be const
188
- bool Orch::addExistingData (Table *table)
201
+ size_t Orch::addExistingData (Table *table)
189
202
{
190
203
string tableName = table->getTableName ();
191
204
Consumer* consumer = dynamic_cast <Consumer *>(getExecutor (tableName));
192
205
if (consumer == NULL )
193
206
{
194
207
SWSS_LOG_ERROR (" No consumer %s in Orch" , tableName.c_str ());
195
- return false ;
208
+ return 0 ;
209
+ }
210
+
211
+ return consumer->refillToSync (table);
212
+ }
213
+
214
+ bool Orch::bake ()
215
+ {
216
+ SWSS_LOG_ENTER ();
217
+
218
+ for (auto &it : m_consumerMap)
219
+ {
220
+ string executorName = it.first ;
221
+ auto executor = it.second ;
222
+ auto consumer = dynamic_cast <Consumer *>(executor.get ());
223
+ if (consumer == NULL )
224
+ {
225
+ continue ;
226
+ }
227
+
228
+ size_t refilled = consumer->refillToSync ();
229
+ SWSS_LOG_NOTICE (" Add warm input: %s, %zd" , executorName.c_str (), refilled);
196
230
}
197
231
198
- consumer->refillToSync (table);
199
232
return true ;
200
233
}
201
234
0 commit comments