-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathapi.py
266 lines (236 loc) · 8.19 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""SODAR Taskflow API for Django apps"""
import json
import logging
# Landingzones dependency
from landingzones.constants import (
ZONE_STATUS_NOT_CREATED,
ZONE_STATUS_CREATING,
ZONE_STATUS_FAILED,
)
from landingzones.models import LandingZone
# Projectroles dependency
from projectroles.models import SODAR_CONSTANTS
from projectroles.plugins import get_backend_api
from taskflowbackend import flows
from taskflowbackend.lock_api import ProjectLockAPI
from taskflowbackend.tasks_celery import submit_flow_task
lock_api = ProjectLockAPI()
logger = logging.getLogger(__name__)
# SODAR constants
PROJECT_TYPE_PROJECT = SODAR_CONSTANTS['PROJECT_TYPE_PROJECT']
# Local constants
UNKNOWN_RUN_ERROR = 'Running flow failed: unknown error, see server log'
LOCK_FAIL_MSG = 'Unable to acquire project lock'
class TaskflowAPI:
"""SODAR Taskflow API to be used by Django apps"""
class FlowSubmitException(Exception):
"""SODAR Taskflow submission exception"""
@classmethod
def _raise_flow_exception(cls, ex_msg, tl_event=None, zone=None):
"""
Handle and raise exception with flow building or execution. Updates the
status of timeline event and/or landing zone if provided.
:param ex_msg: Exception message (string)
:param tl_event: Timeline event or None
:param zone: LandingZone object or None
:raise: FlowSubmitException
"""
if tl_event:
tl_event.set_status(ZONE_STATUS_FAILED, ex_msg)
# Update landing zone
if zone:
status = (
ZONE_STATUS_NOT_CREATED
if zone.status == ZONE_STATUS_CREATING
else ZONE_STATUS_FAILED
)
zone.set_status(status, ex_msg)
# TODO: Create app alert for failure if async (see #1499)
raise cls.FlowSubmitException(ex_msg)
@classmethod
def get_flow(
cls,
irods_backend,
project,
flow_name,
flow_data,
async_mode=False,
tl_event=None,
):
"""
Get and create a taskflow.
:param irods_backend: IrodsbackendAPI instance
:param project: Project object
:param flow_name: Name of flow (string)
:param flow_data: Flow parameters (dict)
:param async_mode: Set up flow asynchronously if True (boolean)
:param tl_event: ProjectEvent object for timeline updating or None
"""
flow_cls = flows.get_flow(flow_name)
if not flow_cls:
raise ValueError('Flow "{}" not supported'.format(flow_name))
flow = flow_cls(
irods_backend=irods_backend,
project=project,
flow_name=flow_name,
flow_data=flow_data,
async_mode=async_mode,
tl_event=tl_event,
)
try:
flow.validate()
except TypeError as ex:
msg = 'Error validating flow: {}'.format(ex)
logger.error(msg)
raise ex
return flow
@classmethod
def run_flow(
cls,
flow,
project,
force_fail=False,
async_mode=False,
tl_event=None,
):
"""
Run a flow, either synchronously or asynchronously.
:param flow: Flow object
:param project: Project object
:param force_fail: Force failure (boolean, for testing)
:param async_mode: Submit in async mode (boolean, default=False)
:param tl_event: Timeline ProjectEvent object or None. Event status will
be updated if the flow is run in async mode
:return: Dict
"""
flow_result = None
ex_msg = None
coordinator = None
lock = None
# Get zone if present in flow
zone = None
if flow.flow_data.get('zone_uuid'):
zone = LandingZone.objects.filter(
sodar_uuid=flow.flow_data['zone_uuid']
).first()
# Acquire lock if needed
if flow.require_lock:
# Acquire lock
coordinator = lock_api.get_coordinator()
if not coordinator:
cls._raise_flow_exception(
LOCK_FAIL_MSG + ': Failed to retrieve lock coordinator',
tl_event,
zone,
)
else:
lock_id = str(project.sodar_uuid)
lock = coordinator.get_lock(lock_id)
try:
lock_api.acquire(lock)
except Exception as ex:
cls._raise_flow_exception(
LOCK_FAIL_MSG + ': {}'.format(ex),
tl_event,
zone,
)
else:
logger.info('Lock not required (flow.require_lock=False)')
# Build flow
logger.info('Building flow "{}"..'.format(flow.flow_name))
try:
flow.build(force_fail)
except Exception as ex:
ex_msg = 'Error building flow: {}'.format(ex)
# Run flow
if not ex_msg:
logger.info('Building flow OK')
try:
flow_result = flow.run()
except Exception as ex:
ex_msg = 'Error running flow: {}'.format(ex)
# Flow completion
if flow_result and tl_event and async_mode:
tl_event.set_status('OK', 'Async submit OK')
# Exception/failure
elif not flow_result and not ex_msg:
ex_msg = UNKNOWN_RUN_ERROR
# Release lock if acquired
if flow.require_lock and lock:
lock_api.release(lock)
coordinator.stop()
# Raise exception if failed, otherwise return result
if ex_msg:
logger.error(ex_msg) # TODO: Isn't this redundant?
# NOTE: Not providing zone here since it's handled by flow
cls._raise_flow_exception(ex_msg, tl_event, None)
return flow_result
def submit(
self,
project,
flow_name,
flow_data,
async_mode=False,
tl_event=None,
force_fail=False,
):
"""
Submit taskflow for SODAR project data modification.
:param project: Project object
:param flow_name: Name of flow to be executed (string)
:param flow_data: Input data for flow execution (dict, must be JSON
serializable)
:param async_mode: Run flow asynchronously (boolean, default False)
:param tl_event: Corresponding timeline ProjectEvent (optional)
:param force_fail: Make flow fail on purpose (boolean, default False)
:return: Boolean
:raise: FlowSubmitException if submission fails
"""
irods_backend = get_backend_api('omics_irods')
if not irods_backend:
raise Exception('Irodsbackend not enabled')
try:
json.dumps(flow_data)
except (TypeError, OverflowError) as ex:
logger.error(
'Argument flow_data is not JSON serializable: {}'.format(ex)
)
raise ex
# Launch async submit task if async mode is set
if async_mode:
project_uuid = project.sodar_uuid
tl_uuid = tl_event.sodar_uuid if tl_event else None
submit_flow_task.delay(
project_uuid,
flow_name,
flow_data,
tl_uuid,
)
return None
# Else run flow synchronously
flow = self.get_flow(
irods_backend,
project,
flow_name,
flow_data,
async_mode,
tl_event,
)
return self.run_flow(
flow=flow,
project=project,
force_fail=force_fail,
async_mode=False,
tl_event=tl_event,
)
@classmethod
def get_error_msg(cls, flow_name, submit_info):
"""
Return a printable version of a SODAR Taskflow error message.
:param flow_name: Name of submitted flow
:param submit_info: Returned information from SODAR Taskflow
:return: String
"""
return 'Taskflow "{}" failed! Reason: "{}"'.format(
flow_name, submit_info[:256]
)