-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuser_app.py
379 lines (334 loc) · 15.1 KB
/
user_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
from dataclasses import dataclass, astuple
import logging
import math
import time
import threading
from datetime import timedelta
from typing import Any, Callable, Mapping
import sys
from cognit import (
EnergySchedulingPolicy,
FaaSState,
ServerlessRuntimeConfig,
ServerlessRuntimeContext,
)
import phoenixsystems.sem.metersim as metersim
from home_energy_management.device_simulators.device_utils import DeviceUserApi
from home_energy_management.device_simulators.heating import HeatingPreferences
from home_energy_management.device_simulators.electric_vehicle import EVDeparturePlans
@dataclass
class AlgoParams:
model_parameters: dict[str, float]
step_timedelta_s: int
storage_parameters: dict[str, float]
ev_battery_parameters: dict[str, float]
room_heating_params_list: list[dict[str, Any]]
energy_drawn_from_grid: float
energy_returned_to_grid: float
energy_pv_produced: float
temp_outdoor: float
charge_level_of_storage: float
prev_charge_level_of_storage: float
charge_level_of_ev_battery: float
prev_charge_level_of_ev_battery: float
heating_status_per_room: dict[str, list[bool]]
temp_per_room: dict[str, float]
class UserApp:
metrology: metersim.Metersim # Metrology
runtime: ServerlessRuntimeContext # Cognit Serverless Runtime
heating_user_preferences: dict[str, HeatingPreferences]
ev_departure_plans: EVDeparturePlans
cycle_time: int
speedup: int
model_parameters: dict[str, float]
# Decision algorithm
decision_algo: Callable
# Devices
pv: DeviceUserApi
energy_storage: DeviceUserApi
electric_vehicle: DeviceUserApi
room_heating: Mapping[str, DeviceUserApi]
temp_outside_sensor: DeviceUserApi
# Utils
shutdown_flag: bool
app_thread: threading.Thread
use_cognit: bool
cognit_timeout: int
start_time: float
cond: threading.Condition
app_logger: logging.Logger
cognit_logger: logging.Logger
# Registers
last_algo_run: float = 0.0
last_active_plus: int = 0
last_active_minus: int = 0
last_pv_energy: float = 0.0
last_storage_charge_level: float = 0.0
last_ev_battery_charge_level: float = 0.0
def __init__(
self,
metrology: metersim.Metersim,
decision_algo: Callable,
model_parameters: dict[str, float],
pv: DeviceUserApi,
energy_storage: DeviceUserApi,
electric_vehicle: DeviceUserApi,
room_heating: Mapping[str, DeviceUserApi],
temp_outside_sensor: DeviceUserApi,
speedup: int,
cycle: int,
heating_user_preferences: dict[str, HeatingPreferences],
ev_departure_plans: EVDeparturePlans,
use_cognit: bool = True,
cognit_timeout: int = 3,
) -> None:
self.metrology = metrology
self.decision_algo = decision_algo
self.pv = pv
self.energy_storage = energy_storage
self.electric_vehicle = electric_vehicle
self.room_heating = room_heating
self.temp_outside_sensor = temp_outside_sensor
self.use_cognit = use_cognit
self.cognit_timeout = cognit_timeout
self.speedup = speedup
self.cycle_time = cycle
self.heating_user_preferences = heating_user_preferences
self.ev_departure_plans = ev_departure_plans
self.model_parameters = model_parameters
self.shutdown_flag = False
self.cond = threading.Condition()
app_log_handler = logging.FileHandler("user_app.log")
app_log_formatter = logging.Formatter("")
app_log_handler.setFormatter(app_log_formatter)
self.app_logger = logging.Logger("user_app")
self.app_logger.addHandler(app_log_handler)
if self.use_cognit:
self.init_cognit_runtime()
self.app_thread = threading.Thread(target=self.app_loop)
def init_cognit_runtime(self) -> None:
self.cognit_logger = logging.getLogger("cognit-logger")
self.cognit_logger.handlers.clear()
handler = logging.FileHandler("cognit.log")
formatter = logging.Formatter(
fmt="[%(asctime)s][%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler.setFormatter(formatter)
self.cognit_logger.addHandler(handler)
sr_conf = ServerlessRuntimeConfig()
sr_conf.name = "Smart Energy Meter Serverless Runtime"
sr_conf.scheduling_policies = [EnergySchedulingPolicy(50)]
sr_conf.faas_flavour = "Energy"
try:
self.runtime = ServerlessRuntimeContext(config_path="cognit.yml")
self.runtime.create(sr_conf)
except Exception as e:
self.cognit_logger.error(f"Error in config file content: {e}")
sys.exit(1)
while self.runtime.status != FaaSState.RUNNING:
time.sleep(1)
self.cognit_logger.info("Runtime should be ready now!")
def set_heating_user_preferences(self, room: str, pref: HeatingPreferences):
self.heating_user_preferences[room] = pref
def update_slr_preferences(self, green_energy_perc: int):
sr_conf = ServerlessRuntimeConfig()
sr_conf.name = "Smart Energy Meter Serverless Runtime"
sr_conf.scheduling_policies = [EnergySchedulingPolicy(green_energy_perc)]
sr_conf.faas_flavour = "Energy"
with self.cond:
self.runtime.update(sr_conf)
time.sleep(12)
self.cond.notify_all()
def offload_now(self):
with self.cond:
self.offload()
self.cond.notify_all()
def set_cycle_length(self, cycle: int):
with self.cond:
self.cycle_time = cycle
self.cond.notify_all()
def set_speedup(self, speedup: int):
with self.cond:
self.speedup = speedup
self.cond.notify_all()
def update_algo_input(self, now: float) -> AlgoParams:
step_timedelta_s = math.floor((now - self.last_algo_run) * self.speedup)
self.last_algo_run = now
storage_parameters = self.energy_storage.get_info()
ev_parameters = self.electric_vehicle.get_info()
ev_parameters["time_until_charged"] = self.ev_departure_plans.get_time_until_departure()
room_heating_params_list = []
for room, value in self.heating_user_preferences.items():
params = self.room_heating[room].get_info()
params["preferred_temp"] = value.get_temp()
room_heating_params_list.append(params)
energy = self.metrology.get_energy_total()
energy_drawn_from_grid = energy.active_plus - self.last_active_plus
energy_returned_to_grid = energy.active_minus - self.last_active_minus
self.last_active_plus = energy.active_plus
self.last_active_minus = energy.active_minus
pv_reg = self.pv.get_info()["energy_produced"]
energy_pv_produced = pv_reg - self.last_pv_energy
self.last_pv_energy = pv_reg
temp_outdoor = self.temp_outside_sensor.get_info()["temperature"]
charge_level_of_storage = self.energy_storage.get_info()["curr_charge_level"]
prev_charge_level_of_storage = self.last_storage_charge_level
self.last_storage_charge_level = charge_level_of_storage
charge_level_of_ev_battery = self.electric_vehicle.get_info()["curr_charge_level"]
prev_charge_level_of_ev_battery = self.last_ev_battery_charge_level
self.last_ev_battery_charge_level = charge_level_of_ev_battery
heating_status_per_room = {}
temp_per_room = {}
for room in room_heating_params_list:
heating_status_per_room[room["name"]] = room["is_device_switch_on"]
temp_per_room[room["name"]] = room["curr_temp"]
algo_input = AlgoParams(
self.model_parameters,
step_timedelta_s,
storage_parameters,
ev_parameters,
room_heating_params_list,
energy_drawn_from_grid / 3.6e6,
energy_returned_to_grid / 3.6e6,
energy_pv_produced / 3.6e6,
temp_outdoor,
charge_level_of_storage,
prev_charge_level_of_storage,
charge_level_of_ev_battery,
prev_charge_level_of_ev_battery,
heating_status_per_room,
temp_per_room,
)
return algo_input
def execute_algo_response(self, algo_res: Any):
(
conf_temp_per_room,
storage_params,
ev_params,
next_temp_per_room,
next_charge_level_of_storage,
next_charge_level_of_ev_battery,
energy_from_power_grid,
) = algo_res
self.energy_storage.set_params(storage_params)
self.electric_vehicle.set_params(ev_params)
for key, value in self.room_heating.items():
value.set_params(
{
"optimal_temp": conf_temp_per_room[key],
}
)
def run_algo(self, algo_input: AlgoParams) -> Any:
ret = None
if not self.use_cognit:
ret = self.decision_algo(*astuple(algo_input))
else:
offload_ctx = self.runtime.call_async(self.decision_algo, *astuple(algo_input))
if offload_ctx is not None:
res_ctx = self.runtime.wait(offload_ctx.exec_id, self.cognit_timeout)
if res_ctx is not None and res_ctx.res is not None:
ret = res_ctx.res.res
return ret
def start(self):
self.start_time = time.clock_gettime(time.CLOCK_MONOTONIC)
self.app_thread.start()
def destroy(self):
self.shutdown_flag = True
with self.cond:
self.cond.notify_all()
self.app_thread.join()
if self.use_cognit:
self.runtime.delete()
def offload(self):
now = time.clock_gettime(time.CLOCK_MONOTONIC)
algo_input = self.update_algo_input(now)
algo_res = self.run_algo(algo_input)
self.app_logger.info("\n\x1B[2J\x1B[H")
self.app_logger.info(f"{timedelta(seconds=self.metrology.get_uptime())}")
self.app_logger.info("\n\tINPUT")
model_parameters = algo_input.model_parameters
self.app_logger.info(f"Step duration (s): {algo_input.step_timedelta_s}")
self.app_logger.info(
f"Model parameters: \n\t- heat capacity (J/K): {model_parameters['heat_capacity']}, "
f"\n\t- heating delta temperature (K): {model_parameters['heating_delta_temperature']}, "
f"\n\t- heating coefficient: {model_parameters['heating_coefficient']}, "
f"\n\t- heat loss coefficient (W/K): {model_parameters['heat_loss_coefficient']}, "
f"\n\t- delta charging power (%): {model_parameters['delta_charging_power_perc']}, "
f"\n\t- storage high SOC (%): {model_parameters['storage_high_charge_level']}"
)
storage_parameters = algo_input.storage_parameters
self.app_logger.info(
f"Storage parameters: \n\t- max capacity (kWh): {storage_parameters['max_capacity']}, "
f"\n\t- minimal SOC (%): {storage_parameters['min_charge_level']}, "
f"\n\t- nominal power (kW): {storage_parameters['nominal_power']}, "
f"\n\t- efficiency: {storage_parameters['efficiency']}"
)
ev_battery_parameters = algo_input.ev_battery_parameters
time_until_ev_charged = ev_battery_parameters['time_until_charged']
self.app_logger.info(
f"EV battery parameters: \n\t- max capacity (kWh): {ev_battery_parameters['max_capacity']}, "
f"\n\t- is available: {ev_battery_parameters['is_available']}, "
f"\n\t- departure SOC (%): {ev_battery_parameters['charged_level']}, "
f"\n\t- time until charged (h): "
f"{round(time_until_ev_charged / 3600, 2) if time_until_ev_charged > 0 else 'uknown'}, "
f"\n\t- nominal power (kW): {ev_battery_parameters['nominal_power']}, "
f"\n\t- efficiency: {ev_battery_parameters['efficiency']}"
)
heating_parameters = algo_input.room_heating_params_list[0]
self.app_logger.info(
f"Heating parameters: \n\t- current temperature (°C): {round(heating_parameters['curr_temp'], 2)}, "
f"\n\t- preferred temperature (°C): {heating_parameters['preferred_temp']}, "
f"\n\t- powers of heating devices (kW): {heating_parameters['powers_of_heating_devices']}, "
f"\n\t- status of heating devices switches: {heating_parameters['is_device_switch_on']}"
)
self.app_logger.info(f"Energy A+ (kWh): {round(algo_input.energy_drawn_from_grid, 2)}")
self.app_logger.info(f"Energy A- (kWh): {round(algo_input.energy_returned_to_grid, 2)}")
self.app_logger.info(f"Energy PV produced (kWh): {round(algo_input.energy_pv_produced, 2)}")
self.app_logger.info(f"Outdoor temperature (°C): {algo_input.temp_outdoor}")
self.app_logger.info(
f"Current storage SOC (%): {round(algo_input.charge_level_of_storage, 2)}"
)
self.app_logger.info(
f"Previous storage SOC (%): {round(algo_input.prev_charge_level_of_storage, 2)}"
)
self.app_logger.info(
f"Current EV battery SOC (%): {round(algo_input.charge_level_of_ev_battery, 2)}"
)
self.app_logger.info(
f"Previous EV battery SOC (%): {round(algo_input.prev_charge_level_of_ev_battery, 2)}"
)
if algo_res is not None:
self.execute_algo_response(algo_res)
self.app_logger.info("\n\tOUTPUT")
room_name = heating_parameters["name"]
self.app_logger.info(
f"Configuration of temperature (°C): {round(algo_res[0][room_name], 2)}"
)
self.app_logger.info(f"Configuration of storage: {algo_res[1]}")
self.app_logger.info(f"Configuration of EV battery: {algo_res[2]}")
self.app_logger.info(f"Predicted temperature (°C): {round(algo_res[3][room_name], 2)}")
self.app_logger.info(f"Predicted SOC of storage (%): {round(algo_res[4], 2)}")
self.app_logger.info(f"Predicted SOC of EV battery (%): {round(algo_res[5], 2)}")
self.app_logger.info(f"Predicted energy A+ (kWh): {round(algo_res[6], 2)}")
else:
self.app_logger.warning(f"Decision algorithm call failed")
def app_loop(self):
self.last_algo_run = time.clock_gettime(time.CLOCK_MONOTONIC)
with self.cond:
while not self.shutdown_flag:
slept = False
while (
time.clock_gettime(time.CLOCK_MONOTONIC)
< (self.cycle_time / self.speedup + self.last_algo_run)
and not self.shutdown_flag
) or not slept:
sleep_time = max(
self.cycle_time / self.speedup
+ self.last_algo_run
- time.clock_gettime(time.CLOCK_MONOTONIC),
0.001, # Sleep 1 ms to allow other threads to acquire cond
)
self.cond.wait(sleep_time)
slept = True
self.offload()