forked from senaite/senaite.core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublish.py
1570 lines (1394 loc) · 63 KB
/
publish.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# This file is part of Bika LIMS
#
# Copyright 2011-2016 by it's authors.
# Some rights reserved. See LICENSE.txt, AUTHORS.txt.
import os
import re
import tempfile
import traceback
from copy import copy
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
from operator import itemgetter
from smtplib import SMTPAuthenticationError
from smtplib import SMTPRecipientsRefused, SMTPServerDisconnected
import App
import transaction
from DateTime import DateTime
from Products.Archetypes.interfaces import IDateTimeField, IFileField, \
ILinesField, IReferenceField, IStringField, ITextField
from Products.CMFCore.WorkflowCore import WorkflowException
from Products.CMFCore.utils import getToolByName
from Products.CMFPlone.utils import _createObjectByType, safe_unicode
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile
from bika.lims import POINTS_OF_CAPTURE, bikaMessageFactory as _, t
from bika.lims import logger
from bika.lims.browser import BrowserView, ulocalized_time
from bika.lims.catalog.analysis_catalog import CATALOG_ANALYSIS_LISTING
from bika.lims.idserver import renameAfterCreation
from bika.lims.interfaces import IAnalysisRequest, IResultOutOfRange
from bika.lims.interfaces.field import IUIDReferenceField
from bika.lims.utils import attachPdf, createPdf, encode_header, \
format_supsub, \
isnumber
from bika.lims.utils import formatDecimalMark, to_utf8
from bika.lims.utils.analysis import format_uncertainty
from bika.lims.vocabularies import getARReportTemplates
from bika.lims.workflow import wasTransitionPerformed
from plone.api.portal import get_registry_record
from plone.api.portal import set_registry_record
from plone.app.blob.interfaces import IBlobField
from plone.registry import Record
from plone.registry import field
from plone.registry.interfaces import IRegistry
from plone.resource.utils import queryResourceDirectory
from zope.component import getAdapters, getUtility
class AnalysisRequestPublishView(BrowserView):
template = ViewPageTemplateFile("templates/analysisrequest_publish.pt")
_ars = []
_arsbyclient = []
_current_ar_index = 0
_current_arsbyclient_index = 0
_publish = False
def __init__(self, context, request, publish=False):
BrowserView.__init__(self, context, request)
self.context = context
self.request = request
self._publish = publish
self._ars = [self.context]
self._digester = AnalysisRequestDigester()
@property
def _DEFAULT_TEMPLATE(self):
registry = getUtility(IRegistry)
return registry.get(
'bika.lims.analysisrequest.default_arreport_template', 'default.pt')
def next_certificate_number(self):
"""Get a new certificate id. These are throwaway IDs, until the
publication is actually done. So each preview gives us a new ID.
"""
key = 'bika.lims.current_coa_number'
registry = getUtility(IRegistry)
if key not in registry:
registry.records[key] = \
Record(field.Int(title=u"Current COA number"), 0)
val = get_registry_record(key) + 1
set_registry_record(key, val)
return "%05d" % int(val)
def __call__(self):
if self.context.portal_type == 'AnalysisRequest':
self._ars = [self.context]
elif self.context.portal_type == 'AnalysisRequestsFolder' \
and self.request.get('items', ''):
uids = self.request.get('items').split(',')
uc = getToolByName(self.context, 'uid_catalog')
self._ars = [obj.getObject() for obj in uc(UID=uids)]
else:
# Do nothing
self.destination_url = self.request.get_header(
"referer", self.context.absolute_url())
# Group ARs by client
groups = {}
for ar in self._ars:
idclient = ar.aq_parent.id
if idclient not in groups:
groups[idclient] = [ar]
else:
groups[idclient].append(ar)
self._arsbyclient = [group for group in groups.values()]
# Report may want to print current date
self.current_date = self.ulocalized_time(DateTime(), long_format=True)
# Do publish?
if self.request.form.get('publish', '0') == '1':
self.publishFromPOST()
else:
return self.template()
def showOptions(self):
"""Returns true if the options top panel will be displayed
in the template
"""
return self.request.get('pub', '1') == '1'
def getAvailableFormats(self):
"""Returns the available formats found in templates/reports
"""
return getARReportTemplates()
def getAnalysisRequests(self):
"""Returns a dict with the analysis requests to manage
"""
return self._ars
def getAnalysisRequestsCount(self):
"""Returns the number of analysis requests to manage
"""
return len(self._ars)
def getGroupedAnalysisRequestsCount(self):
"""Returns the number of groups of analysis requests to manage when
a multi-ar template is selected. The ARs are grouped by client
"""
return len(self._arsbyclient)
def getAnalysisRequestObj(self):
"""Returns the analysis request objects to be managed
"""
return self._ars[self._current_ar_index]
def getAnalysisRequest(self, analysisrequest=None):
"""Returns the dict for the Analysis Request specified. If no AR set,
returns the current analysis request
"""
if analysisrequest:
return self._digester(analysisrequest)
else:
return self._digester(self._ars[self._current_ar_index])
def getAnalysisRequestGroup(self):
"""Returns the current analysis request group to be managed
"""
return self._arsbyclient[self._current_arsbyclient_index]
def getAnalysisRequestGroupData(self):
"""Returns an array that contains the dicts (ar_data) for each
analysis request from the current group
"""
return [self._digester(ar) for ar in self.getAnalysisRequestGroup()]
def _nextAnalysisRequest(self):
"""Move to the next analysis request
"""
if self._current_ar_index < len(self._ars):
self._current_ar_index += 1
def _nextAnalysisRequestGroup(self):
"""Move to the next analysis request group
"""
if self._current_arsbyclient_index < len(self._arsbyclient):
self._current_arsbyclient_index += 1
def _renderTemplate(self):
"""Returns the html template to be rendered in accordance with the
template specified in the request ('template' parameter)
"""
templates_dir = 'templates/reports'
embedt = self.request.form.get('template', self._DEFAULT_TEMPLATE)
if embedt.find(':') >= 0:
prefix, template = embedt.split(':')
templates_dir = queryResourceDirectory('reports', prefix).directory
embedt = template
embed = ViewPageTemplateFile(os.path.join(templates_dir, embedt))
return embedt, embed(self)
def getReportTemplate(self):
"""Returns the html template for the current ar and moves to
the next ar to be processed. Uses the selected template
specified in the request ('template' parameter)
"""
embedt = ""
try:
embedt, reptemplate = self._renderTemplate()
except:
tbex = traceback.format_exc()
arid = self._ars[self._current_ar_index].id
reptemplate = \
"<div class='error-report'>%s - %s '%s':<pre>%s</pre></div>" \
% (arid, _("Unable to load the template"), embedt, tbex)
self._nextAnalysisRequest()
return reptemplate
def getGroupedReportTemplate(self):
"""Returns the html template for the current group of ARs and moves to
the next group to be processed. Uses the selected template
specified in the request ('template' parameter)
"""
embedt = ""
try:
embedt, reptemplate = self._renderTemplate()
except:
tbex = traceback.format_exc()
reptemplate = \
"<div class='error-report'>%s '%s':<pre>%s</pre></div>" \
% (_("Unable to load the template"), embedt, tbex)
self._nextAnalysisRequestGroup()
return reptemplate
def getReportStyle(self):
"""Returns the css style to be used for the current template.
If the selected template is 'default.pt', this method will
return the content from 'default.css'. If no css file found
for the current template, returns empty string
"""
template = self.request.form.get('template', self._DEFAULT_TEMPLATE)
content = ''
if template.find(':') >= 0:
prefix, template = template.split(':')
resource = queryResourceDirectory('reports', prefix)
css = '{0}.css'.format(template[:-3])
if css in resource.listDirectory():
content = resource.readFile(css)
else:
this_dir = os.path.dirname(os.path.abspath(__file__))
templates_dir = os.path.join(this_dir, 'templates/reports/')
path = '%s/%s.css' % (templates_dir, template[:-3])
with open(path, 'r') as content_file:
content = content_file.read()
return content
def isSingleARTemplate(self):
seltemplate = self.request.form.get('template', self._DEFAULT_TEMPLATE)
seltemplate = seltemplate.split(':')[-1].strip()
return not seltemplate.lower().startswith('multi')
def isQCAnalysesVisible(self):
"""Returns if the QC Analyses must be displayed
"""
return self.request.form.get('qcvisible', '0').lower() in ['true', '1']
def isHiddenAnalysesVisible(self):
"""Returns true if hidden analyses are visible
"""
return self.request.form.get('hvisible', '0').lower() in ['true', '1']
def isLandscape(self):
""" Returns if the layout is landscape
"""
return self.request.form.get('landscape', '0').lower() in ['true', '1']
def localise_images(self, htmlreport):
"""WeasyPrint will attempt to retrieve attachments directly from the URL
referenced in the HTML report, which may refer back to a single-threaded
(and currently occupied) zeoclient, hanging it. All "attachments"
using urls ending with at_download/AttachmentFile must be converted
to local files.
Returns a list of files which were created, and a modified copy
of htmlreport.
"""
cleanup = []
_htmltext = to_utf8(htmlreport)
# first regular image tags
for match in re.finditer(
"""http.*at_download/AttachmentFile""", _htmltext, re.I):
url = match.group()
att_path = url.replace(self.portal_url + "/", "")
attachment = self.portal.unrestrictedTraverse(att_path)
af = attachment.getAttachmentFile()
filename = af.filename
extension = "." + filename.split(".")[-1]
outfile, outfilename = tempfile.mkstemp(suffix=extension)
outfile = open(outfilename, 'wb')
outfile.write(str(af.data))
outfile.close()
_htmltext.replace(url, outfilename)
cleanup.append(outfilename)
return cleanup, _htmltext
def publishFromPOST(self):
html = self.request.form.get('html')
style = self.request.form.get('style')
uids = self.request.form.get('uid').split(':')
reporthtml = "<html><head>%s</head><body><div " \
"id='report'>%s</body></html>" % (style, html)
publishedars = []
for uid in uids:
ars = self.publishFromHTML(
uid, safe_unicode(reporthtml).encode('utf-8'))
publishedars.extend(ars)
return publishedars
def publishFromHTML(self, aruid, results_html):
# The AR can be published only and only if allowed
uc = getToolByName(self.context, 'uid_catalog')
ars = uc(UID=aruid)
if not ars or len(ars) != 1:
return []
ar = ars[0].getObject()
wf = getToolByName(self.context, 'portal_workflow')
allowed_states = ['verified', 'published']
# Publish/Republish allowed?
if wf.getInfoFor(ar, 'review_state') not in allowed_states:
# Pre-publish allowed?
if not ar.getAnalyses(review_state=allowed_states):
return []
# HTML written to debug file
debug_mode = App.config.getConfiguration().debug_mode
if debug_mode:
tmp_fn = tempfile.mktemp(suffix=".html")
logger.debug("Writing HTML for %s to %s" % (ar.Title(), tmp_fn))
open(tmp_fn, "wb").write(results_html)
# Create the pdf report (will always be attached to the AR)
# we must supply the file ourself so that createPdf leaves it alone.
pdf_fn = tempfile.mktemp(suffix=".pdf")
pdf_report = createPdf(htmlreport=results_html, outfile=pdf_fn)
# PDF written to debug file
if debug_mode:
logger.debug("Writing PDF for %s to %s" % (ar.Title(), pdf_fn))
else:
os.remove(pdf_fn)
recipients = []
contact = ar.getContact()
lab = ar.bika_setup.laboratory
if pdf_report:
if contact:
recipients = [{
'UID': contact.UID(),
'Username': to_utf8(contact.getUsername()),
'Fullname': to_utf8(contact.getFullname()),
'EmailAddress': to_utf8(contact.getEmailAddress()),
'PublicationModes': contact.getPublicationPreference()
}]
reportid = ar.generateUniqueId('ARReport')
report = _createObjectByType("ARReport", ar, reportid)
report.edit(
AnalysisRequest=ar.UID(),
Pdf=pdf_report,
Recipients=recipients
)
report.unmarkCreationFlag()
renameAfterCreation(report)
# Set status to prepublished/published/republished
status = wf.getInfoFor(ar, 'review_state')
transitions = {'verified': 'publish',
'published': 'republish'}
transition = transitions.get(status, 'prepublish')
try:
wf.doActionFor(ar, transition)
except WorkflowException:
pass
# compose and send email.
# The managers of the departments for which the current AR has
# at least one AS must receive always the pdf report by email.
# https://github.com/bikalabs/Bika-LIMS/issues/1028
mime_msg = MIMEMultipart('related')
mime_msg['Subject'] = self.get_mail_subject(ar)[0]
mime_msg['From'] = formataddr(
(encode_header(lab.getName()), lab.getEmailAddress()))
mime_msg.preamble = 'This is a multi-part MIME message.'
msg_txt = MIMEText(results_html, _subtype='html')
mime_msg.attach(msg_txt)
to = []
mngrs = ar.getResponsible()
for mngrid in mngrs['ids']:
name = mngrs['dict'][mngrid].get('name', '')
email = mngrs['dict'][mngrid].get('email', '')
if email:
to.append(formataddr((encode_header(name), email)))
if len(to) > 0:
# Send the email to the managers
mime_msg['To'] = ','.join(to)
attachPdf(mime_msg, pdf_report, ar.id)
try:
host = getToolByName(self.context, 'MailHost')
host.send(mime_msg.as_string(), immediate=True)
except SMTPServerDisconnected as msg:
logger.warn("SMTPServerDisconnected: %s." % msg)
except SMTPRecipientsRefused as msg:
raise WorkflowException(str(msg))
except SMTPAuthenticationError as msg:
logger.warn("SMTPAuthenticationFailed: %s." % msg)
# Send report to recipients
recips = self.get_recipients(ar)
for recip in recips:
if 'email' not in recip.get('pubpref', []) \
or not recip.get('email', ''):
continue
title = encode_header(recip.get('title', ''))
email = recip.get('email')
formatted = formataddr((title, email))
# Create the new mime_msg object, cause the previous one
# has the pdf already attached
mime_msg = MIMEMultipart('related')
mime_msg['Subject'] = self.get_mail_subject(ar)[0]
mime_msg['From'] = formataddr(
(encode_header(lab.getName()), lab.getEmailAddress()))
mime_msg.preamble = 'This is a multi-part MIME message.'
msg_txt = MIMEText(results_html, _subtype='html')
mime_msg.attach(msg_txt)
mime_msg['To'] = formatted
# Attach the pdf to the email if requested
if pdf_report and 'pdf' in recip.get('pubpref'):
attachPdf(mime_msg, pdf_report, ar.id)
# For now, I will simply ignore mail send under test.
if hasattr(self.portal, 'robotframework'):
continue
msg_string = mime_msg.as_string()
# content of outgoing email written to debug file
if debug_mode:
tmp_fn = tempfile.mktemp(suffix=".email")
logger.debug(
"Writing MIME message for %s to %s" % (ar.Title(), tmp_fn))
open(tmp_fn, "wb").write(msg_string)
try:
host = getToolByName(self.context, 'MailHost')
host.send(msg_string, immediate=True)
except SMTPServerDisconnected as msg:
logger.warn("SMTPServerDisconnected: %s." % msg)
except SMTPRecipientsRefused as msg:
raise WorkflowException(str(msg))
except SMTPAuthenticationError as msg:
logger.warn("SMTPAuthenticationFailed: %s." % msg)
return [ar]
def publish(self):
"""Publish the AR report/s. Generates a results pdf file associated
to each AR, sends an email with the report to the lab manager and
sends a notification (usually an email with the PDF attached) to the
AR's contact and CCs. Transitions each published AR to statuses
'published', 'prepublished' or 'republished'. Returns a list with the
AR identifiers that have been published/prepublished/republished
(only those 'verified', 'published' or at least have one 'verified'
result).
"""
if len(self._ars) > 1:
published_ars = []
for ar in self._ars:
arpub = AnalysisRequestPublishView(
ar, self.request, publish=True)
ar = arpub.publish()
published_ars.extend(ar)
published_ars = [par.id for par in published_ars]
return published_ars
results_html = safe_unicode(self.template()).encode('utf-8')
return self.publishFromHTML(results_html)
def get_recipients(self, ar):
"""Returns a list with the recipients and all its publication prefs
"""
recips = []
# Contact and CC's
contact = ar.getContact()
if contact:
recips.append({'title': to_utf8(contact.Title()),
'email': contact.getEmailAddress(),
'pubpref': contact.getPublicationPreference()})
for cc in ar.getCCContact():
recips.append({'title': to_utf8(cc.Title()),
'email': cc.getEmailAddress(),
'pubpref': cc.getPublicationPreference()})
# CC Emails
# https://github.com/senaite/bika.lims/issues/361
plone_utils = getToolByName(self.context, "plone_utils")
ccemails = map(lambda x: x.strip(), ar.getCCEmails().split(","))
for ccemail in ccemails:
# Better do that with a field validator
if not plone_utils.validateSingleEmailAddress(ccemail):
logger.warn(
"Skipping invalid email address '{}'".format(ccemail))
continue
recips.append({
'title': ccemail,
'email': ccemail,
'pubpref': ('email', 'pdf',), })
return recips
def get_mail_subject(self, ar):
"""Returns the email subject in accordance with the client
preferences
"""
client = ar.aq_parent
subject_items = client.getEmailSubject()
ai = co = cr = cs = False
if 'ar' in subject_items:
ai = True
if 'co' in subject_items:
co = True
if 'cr' in subject_items:
cr = True
if 'cs' in subject_items:
cs = True
ais = []
cos = []
crs = []
css = []
blanks_found = False
if ai:
ais.append(ar.getId())
if co:
if ar.getClientOrderNumber():
if not ar.getClientOrderNumber() in cos:
cos.append(ar.getClientOrderNumber())
else:
blanks_found = True
if cr or cs:
sample = ar.getSample()
if cr:
if sample.getClientReference():
if not sample.getClientReference() in crs:
crs.append(sample.getClientReference())
else:
blanks_found = True
if cs:
if sample.getClientSampleID():
if not sample.getClientSampleID() in css:
css.append(sample.getClientSampleID())
else:
blanks_found = True
line_items = []
if ais:
ais.sort()
li = t(_('ARs: ${ars}', mapping={'ars': ', '.join(ais)}))
line_items.append(li)
if cos:
cos.sort()
li = t(_('Orders: ${orders}', mapping={'orders': ', '.join(cos)}))
line_items.append(li)
if crs:
crs.sort()
li = t(_(
'Refs: ${references}', mapping={'references': ', '.join(crs)}))
line_items.append(li)
if css:
css.sort()
li = t(_(
'Samples: ${samples}', mapping={'samples': ', '.join(css)}))
line_items.append(li)
tot_line = ' '.join(line_items)
if tot_line:
subject = t(_('Analysis results for ${subject_parts}',
mapping={'subject_parts': tot_line}))
if blanks_found:
subject += (' ' + t(_('and others')))
else:
subject = t(_('Analysis results'))
return subject, tot_line
def sorted_by_sort_key(self, category_keys):
"""Sort categories via catalog lookup on title. """
bsc = getToolByName(self.context, "bika_setup_catalog")
analysis_categories = bsc(
portal_type="AnalysisCategory", sort_on="sortable_title")
sort_keys = dict([(b.Title, "{:04}".format(a))
for a, b in enumerate(analysis_categories)])
return sorted(category_keys,
key=lambda title, sk=sort_keys: sk.get(title))
def getAnaysisBasedTransposedMatrix(self, ars):
"""Returns a dict with the following structure:
{'category_1_name':
{'service_1_title':
{'service_1_uid':
{'service': <AnalysisService-1>,
'ars': {'ar1_id': [<Analysis (for as-1)>,
<Analysis (for as-1)>],
'ar2_id': [<Analysis (for as-1)>]
},
},
},
{'service_2_title':
{'service_2_uid':
{'service': <AnalysisService-2>,
'ars': {'ar1_id': [<Analysis (for as-2)>,
<Analysis (for as-2)>],
'ar2_id': [<Analysis (for as-2)>]
},
},
},
...
},
}
"""
analyses = {}
for ar in ars:
ans = [an.getObject() for an in ar.getAnalyses()]
for an in ans:
cat = an.getCategoryTitle()
an_title = an.Title()
if cat not in analyses:
analyses[cat] = {
an_title: {
# The report should not mind receiving 'an'
# here - service fields are all inside!
'service': an,
'accredited': an.getAccredited(),
'ars': {ar.id: an.getFormattedResult()}
}
}
elif an_title not in analyses[cat]:
analyses[cat][an_title] = {
'service': an,
'accredited': an.getAccredited(),
'ars': {ar.id: an.getFormattedResult()}
}
else:
d = analyses[cat][an_title]
d['ars'][ar.id] = an.getFormattedResult()
analyses[cat][an_title] = d
return analyses
def _lab_address(self, lab):
lab_address = lab.getPostalAddress() \
or lab.getBillingAddress() \
or lab.getPhysicalAddress()
return _format_address(lab_address)
def explode_data(self, data, padding=''):
out = ''
for k, v in data.items():
if type(v) is dict:
pad = '%s ' % padding
exploded = self.explode_data(v, pad)
out = "%s<br/>%s'%s':{%s}" % (out, padding, str(k), exploded)
elif type(v) is list:
out = "%s<br/>%s'%s':[]" % (out, padding, str(k))
elif type(v) is str:
out = "%s<br/>%s'%s':''" % (out, padding, str(k))
return out
def currentDate(self):
"""
This method returns the current time. It is useful if you want to
get the current time in a report.
:return: DateTime()
"""
return DateTime()
class AnalysisRequestDigester:
"""Read AR data which could be useful during publication, into a data
dictionary. This class should be instantiated once, and the instance
called for all subsequent digestion. This allows the instance to cache
data for objects that may be read multiple times for different ARs.
Passing overwrite=True when calling the instance will cause the
ar.Digest field to be overwritten with a new digestion. This flag
is set True by default in the EndRequestHandler that is responsible for
automated re-building.
It should be run once when the AR is verified (or when a verified AR is
modified) to pre-digest the data so that AnalysisRequestPublishView will
run a little faster.
Note: ProxyFields are not included in the reading of the schema. If you
want to access sample fields in the report template, you must refer
directly to the correct field in the Sample data dictionary.
Note: ComputedFields are removed from the schema while creating the dict.
XXX: Add all metadata columns for the AR into the dict.
"""
def __init__(self):
# By default we don't care about these schema fields when creating
# dictionaries from the schemas of objects.
self.SKIP_FIELDNAMES = [
'allowDiscussion', 'subject', 'location', 'contributors',
'creators', 'effectiveDate', 'expirationDate', 'language', 'rights',
'relatedItems', 'modification_date', 'immediatelyAddableTypes',
'locallyAllowedTypes', 'nextPreviousEnabled', 'constrainTypesMode',
'RestrictedCategories', 'Digest',
]
def __call__(self, ar, overwrite=False):
# cheating
self.context = ar
self.request = ar.REQUEST
# if AR was previously digested, use existing data (if exists)
verified = wasTransitionPerformed(ar, 'verify')
if not overwrite and verified:
# Prevent any error related with digest
data = ar.getDigest() if hasattr(ar, 'getDigest') else {}
if data:
# Check if the department managers have changed since
# verification:
saved_managers = data.get('managers', {})
saved_managers_ids = set(saved_managers.get('ids', []))
current_managers = self.context.getManagers()
current_managers_ids = set([man.getId() for man in
current_managers])
# The symmetric difference of two sets A and B is the set of
# elements which are in either of the sets A or B but not
# in both.
are_different = saved_managers_ids.symmetric_difference(
current_managers_ids)
if len(are_different) == 0:
# Seems that sometimes the 'obj' is wrong in the saved
# data.
data['obj'] = ar
# Always set results interpretation
self._set_results_interpretation(ar, data)
return data
logger.info("=========== creating new data for %s" % ar)
# Set data to the AR schema field, and return it.
data = self._ar_data(ar)
if hasattr(ar, 'setDigest'):
ar.setDigest(data)
logger.info("=========== new data for %s created." % ar)
return data
def _schema_dict(self, instance, skip_fields=None, recurse=True):
"""Return a dict of all mutated field values for all schema fields.
This isn't used, as right now the digester just uses old code directly
for BBB purposes. But I'm keeping it here for future use.
:param instance: The item who's schema will be exploded into a dict.
:param skip_fields: A list of fieldnames which will not be rendered.
:param recurse: If true, reference values will be recursed into.
"""
data = {
'obj': instance,
}
fields = instance.Schema().fields()
for fld in fields:
fieldname = fld.getName()
if fieldname in self.SKIP_FIELDNAMES \
or (skip_fields and fieldname in skip_fields) \
or fld.type == 'computed':
continue
rawvalue = fld.get(instance)
if rawvalue is True or rawvalue is False:
# Booleans are special; we'll str and return them.
data[fieldname] = str(rawvalue)
elif rawvalue is 0:
# Zero is special: it's false-ish, but the value is important.
data[fieldname] = 0
elif not rawvalue:
# Other falsy values can simply return an empty string.
data[fieldname] = ''
elif fld.type == 'analyses':
# AR.Analyses field is handled separately of course.
data[fieldname] = ''
elif IDateTimeField.providedBy(fld):
# Date fields get stringed to rfc8222
data[fieldname] = rawvalue.rfc822() if rawvalue else ''
elif IReferenceField.providedBy(fld) \
or IUIDReferenceField.providedBy(fld):
# mutate all reference targets into dictionaries
# Assume here that allowed_types excludes incompatible types.
if recurse and fld.multiValued:
v = [self._schema_dict(x, recurse=False) for x in rawvalue]
elif recurse and not fld.multiValued:
v = self._schema_dict(rawvalue, recurse=False)
elif not recurse and fld.multiValued:
v = [val.Title() for val in rawvalue if val]
else:
v = rawvalue.Title() if rawvalue else ''
data[fieldname] = v
# Include a [fieldname]Title[s] field containing the title
# or titles of referenced objects.
if fld.multiValued:
data[fieldname + "Titles"] = [x.Title() for x in rawvalue]
else:
data[fieldname + "Title"] = rawvalue.Title()
# Text/String comes after UIDReferenceField.
elif ITextField.providedBy(fld) or IStringField.providedBy(fld):
rawvalue = str(rawvalue).strip()
data[fieldname] = rawvalue
# FileField comes after StringField.
elif IFileField.providedBy(fld) or IBlobField.providedBy(fld):
# We ignore file field values; we'll add the ones we want.
data[fieldname] = ''
elif ILinesField.providedBy(fld):
# LinesField turns into a single string of lines
data[fieldname] = "<br/>".join(rawvalue)
elif fld.type == 'record':
# Record returns a dictionary.
data[fieldname] = rawvalue
elif fld.type == 'records':
# Record returns a list of dictionaries.
data[fieldname] = rawvalue
elif fld.type == 'address':
# This is just a Record field
data[fieldname + "_formatted"] = _format_address(rawvalue)
# Also include un-formatted address
data[fieldname] = rawvalue
elif fld.type == 'duration':
# Duration returns a formatted string like 1d 3h 1m.
data[fieldname + "_formatted"] = \
' '.join(["%s%s" % (rawvalue[key], key[0])
for key in ('days', 'hours', 'minutes')])
# Also include unformatted duration.
data[fieldname] = rawvalue
else:
data[fieldname] = rawvalue
return data
def getDimension(self):
""" Returns the dimension of the report
"""
return self.request.form.get("layout", "A4")
def isLandscape(self):
""" Returns if the layout is landscape
"""
return self.request.form.get('landscape', '0').lower() in ['true', '1']
def getDirection(self):
""" Return landscape or horizontal
"""
return self.isLandscape() and "landscape" or "horizontal"
def getLayout(self):
""" Returns the layout of the report
"""
mapping = {
"A4": (210, 297),
"letter": (216, 279)
}
dimension = self.getDimension()
layout = mapping.get(dimension, mapping.get("A4"))
if self.isLandscape():
layout = tuple(reversed(layout))
return layout
def _workflow_data(self, instance):
"""Add some workflow information for all actions performed against
this instance. Only values for the last action event for any
transition will be set here, previous transitions will be ignored.
The default format for review_history is a list of lists; this function
returns rather a dictionary of dictionaries, keyed by action_id
"""
workflow = getToolByName(self.context, 'portal_workflow')
history = copy(list(workflow.getInfoFor(instance, 'review_history')))
data = {e['action']: {
'actor': e['actor'],
'time': ulocalized_time(e['time'], long_format=True)
} for e in history if e['action']}
return data
def _ar_data(self, ar, excludearuids=None):
""" Creates an ar dict, accessible from the view and from each
specific template.
"""
if not excludearuids:
excludearuids = []
bs = ar.bika_setup
data = {'obj': ar,
'id': ar.getId(),
'client_order_num': ar.getClientOrderNumber(),
'client_reference': ar.getClientReference(),
'client_sampleid': ar.getClientSampleID(),
'adhoc': ar.getAdHoc(),
'composite': ar.getComposite(),
'report_drymatter': ar.getReportDryMatter(),
'invoice_exclude': ar.getInvoiceExclude(),
'date_received': ulocalized_time(ar.getDateReceived(),
long_format=1),
'member_discount': ar.getMemberDiscount(),
'date_sampled': ulocalized_time(
ar.getDateSampled(), long_format=1),
'date_published': ulocalized_time(DateTime(), long_format=1),
'invoiced': ar.getInvoiced(),
'late': ar.getLate(),
'subtotal': ar.getSubtotal(),
'vat_amount': ar.getVATAmount(),
'totalprice': ar.getTotalPrice(),
'invalid': ar.isInvalid(),
'url': ar.absolute_url(),
'remarks': to_utf8(ar.getRemarks()),
'footer': to_utf8(bs.getResultFooter()),
'prepublish': False,
'child_analysisrequest': None,
'parent_analysisrequest': None,
'resultsinterpretation':ar.getResultsInterpretation(),
'ar_attachments': self._get_ar_attachments(ar),
'an_attachments': self._get_an_attachments(ar),
}
# Sub-objects
excludearuids.append(ar.UID())
puid = ar.getRawParentAnalysisRequest()
if puid and puid not in excludearuids:
data['parent_analysisrequest'] = self._ar_data(
ar.getParentAnalysisRequest(), excludearuids)
cuid = ar.getRawChildAnalysisRequest()
if cuid and cuid not in excludearuids:
data['child_analysisrequest'] = self._ar_data(
ar.getChildAnalysisRequest(), excludearuids)
wf = ar.portal_workflow
allowed_states = ['verified', 'published']
data['prepublish'] = wf.getInfoFor(ar,
'review_state') not in allowed_states
data['contact'] = self._contact_data(ar)
data['client'] = self._client_data(ar)
data['sample'] = self._sample_data(ar)
data['batch'] = self._batch_data(ar)
data['specifications'] = self._specs_data(ar)
data['analyses'] = self._analyses_data(ar, ['verified', 'published'])
data['qcanalyses'] = self._qcanalyses_data(ar,
['verified', 'published'])
data['points_of_capture'] = sorted(
set([an['point_of_capture'] for an in data['analyses']]))
data['categories'] = sorted(
set([an['category'] for an in data['analyses']]))
data['haspreviousresults'] = len(
[an['previous_results'] for an in data['analyses'] if
an['previous_results']]) > 0
data['hasblanks'] = len([an['reftype'] for an in data['qcanalyses'] if
an['reftype'] == 'b']) > 0
data['hascontrols'] = len([an['reftype'] for an in data['qcanalyses'] if
an['reftype'] == 'c']) > 0
data['hasduplicates'] = len(
[an['reftype'] for an in data['qcanalyses'] if
an['reftype'] == 'd']) > 0
# Categorize analyses
data['categorized_analyses'] = {}
data['department_analyses'] = {}
for an in data['analyses']:
poc = an['point_of_capture']
cat = an['category']
pocdict = data['categorized_analyses'].get(poc, {})
catlist = pocdict.get(cat, [])
catlist.append(an)
pocdict[cat] = catlist
data['categorized_analyses'][poc] = pocdict
# Group by department too
anobj = an['obj']
dept = anobj.getDepartment()
if dept:
dept = dept.UID()