-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathindex.py
1453 lines (1276 loc) · 61.7 KB
/
index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# pylint: disable=too-many-lines
from __future__ import annotations
import logging
import os
import shutil
import subprocess
import tempfile
import time
from importlib.metadata import distribution
from pathlib import Path
import duckdb
import idc_index_data
import pandas as pd
import psutil
from packaging.version import Version
from tqdm import tqdm
logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.DEBUG)
aws_endpoint_url = "https://s3.amazonaws.com"
gcp_endpoint_url = "https://storage.googleapis.com"
class IDCClient:
DOWNLOAD_HIERARCHY_DEFAULT = (
"%collection_id/%PatientID/%StudyInstanceUID/%Modality_%SeriesInstanceUID"
)
def __init__(self):
file_path = idc_index_data.IDC_INDEX_PARQUET_FILEPATH
# Read index file
logger.debug(f"Reading index file v{idc_index_data.__version__}")
self.index = pd.read_parquet(file_path)
# self.index = self.index.astype(str).replace("nan", "")
self.index["series_size_MB"] = self.index["series_size_MB"].astype(float)
self.collection_summary = self.index.groupby("collection_id").agg(
{"Modality": pd.Series.unique, "series_size_MB": "sum"}
)
# Lookup s5cmd
self.s5cmdPath = shutil.which("s5cmd")
if self.s5cmdPath is None:
# Workaround to support environment without a properly setup PATH
# See https://github.com/Slicer/Slicer/pull/7587
logger.debug("Falling back to looking up s5cmd along side the package")
for script in distribution("s5cmd").files:
if str(script).startswith("s5cmd/bin/s5cmd"):
self.s5cmdPath = script.locate().resolve(strict=True)
break
if self.s5cmdPath is None:
raise FileNotFoundError(
"s5cmd executable not found. Please install s5cmd from https://github.com/peak/s5cmd#installation"
)
self.s5cmdPath = str(self.s5cmdPath)
logger.debug(f"Found s5cmd executable: {self.s5cmdPath}")
# ... and check it can be executed
subprocess.check_call([self.s5cmdPath, "--help"], stdout=subprocess.DEVNULL)
@staticmethod
def _filter_dataframe_by_id(key, dataframe, _id):
values = _id
if isinstance(_id, str):
values = [_id]
filtered_df = dataframe[dataframe[key].isin(values)].copy()
if filtered_df.empty:
error_message = f"No data found for the {key} with the values {values}."
raise ValueError(error_message)
return filtered_df
@staticmethod
def _filter_by_collection_id(df_index, collection_id):
return IDCClient._filter_dataframe_by_id(
"collection_id", df_index, collection_id
)
@staticmethod
def _filter_by_patient_id(df_index, patient_id):
return IDCClient._filter_dataframe_by_id("PatientID", df_index, patient_id)
@staticmethod
def _filter_by_dicom_study_uid(df_index, dicom_study_uid):
return IDCClient._filter_dataframe_by_id(
"StudyInstanceUID", df_index, dicom_study_uid
)
@staticmethod
def _filter_by_dicom_series_uid(df_index, dicom_series_uid):
return IDCClient._filter_dataframe_by_id(
"SeriesInstanceUID", df_index, dicom_series_uid
)
@staticmethod
def get_idc_version():
"""
Returns the version of IDC data used in idc-index
"""
idc_version = Version(idc_index_data.__version__).major
return f"v{idc_version}"
def get_collections(self):
"""
Returns the collections present in IDC
"""
unique_collections = self.index["collection_id"].unique()
return unique_collections.tolist()
def get_series_size(self, seriesInstanceUID):
"""
Gets cumulative size (MB) of the DICOM instances in a given SeriesInstanceUID.
Args:
seriesInstanceUID (str): The DICOM SeriesInstanceUID.
Returns:
float: The cumulative size of the DICOM instances in the given SeriesInstanceUID rounded to two digits, in MB.
Raises:
ValueError: If the `seriesInstanceUID` does not exist.
"""
resp = self.index[["SeriesInstanceUID"] == seriesInstanceUID][
"series_size_MB"
].iloc[0]
return resp
def get_patients(self, collection_id, outputFormat="dict"):
"""
Gets the patients in a collection.
Args:
collection_id (str or a list of str): The collection id or list of collection ids. This should be in lower case separated by underscores.
For example, 'pdmr_texture_analysis'. or ['pdmr_texture_analysis','nlst']
outputFormat (str, optional): The format in which to return the patient IDs. Available options are 'dict',
'df', and 'list'. Default is 'dict'.
Returns:
dict or pandas.DataFrame or list: Patient IDs in the requested output format. By default, it returns a dictionary.
Raises:
ValueError: If `outputFormat` is not one of 'dict', 'df', 'list'.
"""
if not isinstance(collection_id, str) and not isinstance(collection_id, list):
raise TypeError("collection_id must be a string or list of strings")
if outputFormat not in ["dict", "df", "list"]:
raise ValueError("outputFormat must be either 'dict', 'df', or 'list")
patient_df = self._filter_by_collection_id(self.index, collection_id)
if outputFormat == "list":
response = patient_df["PatientID"].unique().tolist()
else:
patient_df = patient_df.rename(columns={"collection_id": "Collection"})
patient_df = patient_df[["PatientID", "PatientSex", "PatientAge"]]
patient_df = (
patient_df.groupby("PatientID")
.agg(
{
"PatientSex": lambda x: ",".join(x[x != ""].unique()),
"PatientAge": lambda x: ",".join(x[x != ""].unique()),
}
)
.reset_index()
)
patient_df = patient_df.drop_duplicates().sort_values(by="PatientID")
# Convert DataFrame to a list of dictionaries for the API-like response
if outputFormat == "dict":
response = patient_df.to_dict(orient="records")
else:
response = patient_df
logger.debug("Get patient response: %s", str(response))
return response
def get_dicom_studies(self, patientId, outputFormat="dict"):
"""
Returns Studies for a given patient or list of patients.
Args:
patientId (str or list of str): The patient Id or a list of patient Ids.
outputFormat (str, optional): The format in which to return the studies. Available options are 'dict',
'df', and 'list'. Default is 'dict'.
Returns:
dict or pandas.DataFrame or list: Studies in the requested output format. By default, it returns a dictionary.
Raises:
ValueError: If `outputFormat` is not one of 'dict', 'df', 'list'.
ValueError: If any of the `patientId` does not exist.
"""
if not isinstance(patientId, str) and not isinstance(patientId, list):
raise TypeError("patientId must be a string or list of strings")
if outputFormat not in ["dict", "df", "list"]:
raise ValueError("outputFormat must be either 'dict' or 'df' or 'list'")
studies_df = self._filter_by_patient_id(self.index, patientId)
if outputFormat == "list":
response = studies_df["StudyInstanceUID"].unique().tolist()
else:
studies_df["patient_study_size_MB"] = studies_df.groupby(
["PatientID", "StudyInstanceUID"]
)["series_size_MB"].transform("sum")
studies_df["patient_study_series_count"] = studies_df.groupby(
["PatientID", "StudyInstanceUID"]
)["SeriesInstanceUID"].transform("count")
studies_df["patient_study_instance_count"] = studies_df.groupby(
["PatientID", "StudyInstanceUID"]
)["instanceCount"].transform("count")
studies_df = studies_df.rename(
columns={
"collection_id": "Collection",
"patient_study_series_count": "SeriesCount",
}
)
# patient_study_df = patient_study_df[['PatientID', 'PatientSex', 'Collection', 'PatientAge', 'StudyInstanceUID', 'StudyDate', 'StudyDescription', 'patient_study_size_MB', 'SeriesCount', 'patient_study_instance_count']]
studies_df = studies_df[
["StudyInstanceUID", "StudyDate", "StudyDescription", "SeriesCount"]
]
# Group by 'StudyInstanceUID'
studies_df = (
studies_df.groupby("StudyInstanceUID")
.agg(
{
"StudyDate": lambda x: ",".join(x[x != ""].unique()),
"StudyDescription": lambda x: ",".join(x[x != ""].unique()),
"SeriesCount": lambda x: int(x[x != ""].iloc[0])
if len(x[x != ""]) > 0
else 0,
}
)
.reset_index()
)
studies_df = studies_df.drop_duplicates().sort_values(
by=["StudyDate", "StudyDescription", "SeriesCount"]
)
if outputFormat == "dict":
response = studies_df.to_dict(orient="records")
else:
response = studies_df
logger.debug("Get patient study response: %s", str(response))
return response
def get_dicom_series(self, studyInstanceUID, outputFormat="dict"):
"""
Returns Series for a given study or list of studies.
Args:
studyInstanceUID (str or list of str): The DICOM StudyInstanceUID or a list of StudyInstanceUIDs.
outputFormat (str, optional): The format in which to return the series. Available options are 'dict',
'df', and 'list'. Default is 'dict'.
Returns:
dict or pandas.DataFrame or list: Series in the requested output format. By default, it returns a dictionary.
Raises:
ValueError: If `outputFormat` is not one of 'dict', 'df', 'list'.
ValueError: If any of the `studyInstanceUID` does not exist.
"""
if not isinstance(studyInstanceUID, str) and not isinstance(
studyInstanceUID, list
):
raise TypeError("studyInstanceUID must be a string or list of strings")
if outputFormat not in ["dict", "df", "list"]:
raise ValueError("outputFormat must be either 'dict' or 'df' or 'list'")
series_df = self._filter_by_dicom_study_uid(self.index, studyInstanceUID)
if outputFormat == "list":
response = series_df["SeriesInstanceUID"].unique().tolist()
else:
series_df = series_df.rename(
columns={
"collection_id": "Collection",
"instanceCount": "instance_count",
}
)
series_df["ImageCount"] = 1
series_df = series_df[
[
"StudyInstanceUID",
"SeriesInstanceUID",
"Modality",
"SeriesDate",
"Collection",
"BodyPartExamined",
"SeriesDescription",
"Manufacturer",
"ManufacturerModelName",
"series_size_MB",
"SeriesNumber",
"instance_count",
"ImageCount",
]
]
series_df = series_df.drop_duplicates().sort_values(
by=[
"Modality",
"SeriesDate",
"SeriesDescription",
"BodyPartExamined",
"SeriesNumber",
]
)
# Convert DataFrame to a list of dictionaries for the API-like response
if outputFormat == "dict":
response = series_df.to_dict(orient="records")
else:
response = series_df
logger.debug("Get series response: %s", str(response))
return response
def get_series_file_URLs(self, seriesInstanceUID):
"""
Get the URLs of the files corresponding to the DICOM instances in a given SeriesInstanceUID.
Args:
SeriesInstanceUID: string containing the value of DICOM SeriesInstanceUID to filter by
Returns:
list of strings containing the AWS S3 URLs of the files corresponding to the SeriesInstanceUID
"""
# Query to get the S3 URL
s3url_query = f"""
SELECT
series_aws_url
FROM
index
WHERE
SeriesInstanceUID='{seriesInstanceUID}'
"""
s3url_query_df = self.sql_query(s3url_query)
s3_url = s3url_query_df.series_aws_url[0]
# Remove the last character from the S3 URL
s3_url = s3_url[:-1]
# Run the s5cmd ls command and capture its output
result = subprocess.run(
[self.s5cmdPath, "--no-sign-request", "ls", s3_url],
stdout=subprocess.PIPE,
check=False,
)
output = result.stdout.decode("utf-8")
# Parse the output to get the file names
lines = output.split("\n")
file_names = [
s3_url + line.split()[-1]
for line in lines
if line and line.split()[-1].endswith(".dcm")
]
return file_names
def get_viewer_URL(
self, seriesInstanceUID=None, studyInstanceUID=None, viewer_selector=None
):
"""
Get the URL of the IDC viewer for the given series or study in IDC based on
the provided SeriesInstanceUID or StudyInstanceUID. If StudyInstanceUID is not provided,
it will be automatically deduced. If viewer_selector is not provided, default viewers
will be used (OHIF v2 or v3 for radiology modalities, and Slim for SM).
This function will validate the provided SeriesInstanceUID or StudyInstanceUID against IDC
index to ensure that the series or study is available in IDC.
Args:
SeriesInstanceUID: string containing the value of DICOM SeriesInstanceUID for a series
available in IDC
StudyInstanceUID: string containing the value of DICOM SeriesInstanceUID for a series
available in IDC
viewer_selector: string containing the name of the viewer to use. Must be one of the following:
ohif_v2, ohif_v3, or slim. If not provided, default viewers will be used.
Returns:
string containing the IDC viewer URL for the given SeriesInstanceUID
"""
if seriesInstanceUID is None and studyInstanceUID is None:
raise ValueError(
"Either SeriesInstanceUID or StudyInstanceUID, or both, must be provided."
)
if (
seriesInstanceUID is not None
and seriesInstanceUID not in self.index["SeriesInstanceUID"].values
):
raise ValueError("SeriesInstanceUID not found in IDC index.")
if (
studyInstanceUID is not None
and studyInstanceUID not in self.index["StudyInstanceUID"].values
):
raise ValueError("StudyInstanceUID not found in IDC index.")
if viewer_selector is not None and viewer_selector not in [
"ohif_v2",
"ohif_v3",
"slim",
]:
raise ValueError(
"viewer_selector must be one of 'ohif_v2', 'ohif_v3', or 'slim'."
)
modality = None
if studyInstanceUID is None:
query = f"""
SELECT
DISTINCT(StudyInstanceUID),
Modality
FROM
index
WHERE
SeriesInstanceUID='{seriesInstanceUID}'
"""
query_result = self.sql_query(query)
studyInstanceUID = query_result.StudyInstanceUID[0]
modality = query_result.Modality[0]
else:
query = f"""
SELECT
DISTINCT(Modality)
FROM
index
WHERE
StudyInstanceUID='{studyInstanceUID}'
"""
query_result = self.sql_query(query)
modality = query_result.Modality[0]
if viewer_selector is None:
if "SM" in modality:
viewer_selector = "slim"
else:
viewer_selector = "ohif_v2"
if viewer_selector == "ohif_v2":
if seriesInstanceUID is None:
viewer_url = f"https://viewer.imaging.datacommons.cancer.gov/viewer/{studyInstanceUID}"
else:
viewer_url = f"https://viewer.imaging.datacommons.cancer.gov/viewer/{studyInstanceUID}?SeriesInstanceUID={seriesInstanceUID}"
elif viewer_selector == "ohif_v3":
if seriesInstanceUID is None:
viewer_url = f"https://viewer.imaging.datacommons.cancer.gov/v3/viewer/?StudyInstanceUIDs={studyInstanceUID}"
else:
viewer_url = f"https://viewer.imaging.datacommons.cancer.gov/v3/viewer/?StudyInstanceUIDs={studyInstanceUID}&SeriesInstanceUID={seriesInstanceUID}"
elif viewer_selector == "volview":
# TODO! Not implemented yet
pass
elif viewer_selector == "slim":
if seriesInstanceUID is None:
viewer_url = f"https://viewer.imaging.datacommons.cancer.gov/slim/studies/{studyInstanceUID}"
else:
viewer_url = f"https://viewer.imaging.datacommons.cancer.gov/slim/studies/{studyInstanceUID}/series/{seriesInstanceUID}"
return viewer_url
def _validate_update_manifest_and_get_download_size(
self,
manifestFile,
downloadDir,
validate_manifest,
use_s5cmd_sync,
dirTemplate,
) -> tuple[float, str, Path]:
"""
Validates the manifest file by checking the URLs in the manifest
Args:
manifestFile (str): The path to the manifest file.
downloadDir (str): The path to the download directory.
validate_manifest (bool, optional): If True, validates the manifest for any errors. Defaults to True.
show_progress_bar (bool, optional): If True, tracks the progress of download
use_s5cmd_sync (bool, optional): If True, will use s5cmd sync operation instead of cp when downloadDirectory is not empty; this can significantly improve the download speed if the content is partially downloaded
dirTemplate (str): A template string for the directory path. Must start with %. Defaults to index.DOWNLOAD_HIERARCHY_DEFAULT. It can contain attributes (PatientID, collection_id, Modality, StudyInstanceUID, SeriesInstanceUID) wrapped in '%'. Special characters can be used as connectors: '-' (hyphen), '/' (slash for subdirectories), '_' (underscore). Can be disabled by None.
Returns:
total_size (float): The total size of all series in the manifest file.
endpoint_to_use (str): The endpoint URL to use (either AWS or GCP).
temp_manifest_file(Path): Path to the temporary manifest file for downstream steps
Raises:
ValueError: If the manifest file does not exist, if any URL in the manifest file is invalid, or if any URL is inaccessible in both AWS and GCP.
Exception: If the manifest contains URLs from both AWS and GCP.
"""
logger.debug("manifest validation is requested: " + str(validate_manifest))
logger.debug("Parsing the manifest. Please wait..")
# Read the manifest as a csv file
manifest_df = pd.read_csv(
manifestFile, comment="#", skip_blank_lines=True, header=None
)
# Rename the column
manifest_df.columns = ["manifest_cp_cmd"]
# create a copy of the index
index_df_copy = self.index
# Extract s3 url and crdc_instance_uuid from the manifest copy commands
# Next, extract crdc_instance_uuid from aws_series_url in the index and
# try to verify if every series in the manifest is present in the index
# TODO: need to remove the assumption that manifest commands will have 'cp'
# and need to parse S3 URL directly
# ruff: noqa
sql = """
PRAGMA disable_progress_bar;
WITH
index_temp AS (
SELECT
seriesInstanceUID,
series_aws_url,
series_size_MB,
REGEXP_EXTRACT(series_aws_url, '(?:.*?\\/){3}([^\\/?#]+)', 1) index_crdc_series_uuid
FROM
index_df_copy),
manifest_temp AS (
SELECT
manifest_cp_cmd,
REGEXP_EXTRACT(manifest_cp_cmd, '(?:.*?\\/){3}([^\\/?#]+)', 1) AS manifest_crdc_series_uuid,
REGEXP_REPLACE(regexp_replace(manifest_cp_cmd, 'cp ', ''), '\\s[^\\s]*$', '') AS s3_url,
FROM
manifest_df )
SELECT
seriesInstanceuid,
s3_url,
series_size_MB,
index_crdc_series_uuid==manifest_crdc_series_uuid AS crdc_series_uuid_match,
s3_url==series_aws_url AS s3_url_match,
CASE
WHEN s3_url==series_aws_url THEN 'aws'
ELSE
'unknown'
END
AS endpoint
FROM
manifest_temp
LEFT JOIN
index_temp
ON
index_temp.index_crdc_series_uuid = manifest_temp.manifest_crdc_series_uuid
"""
# ruff: noqa: end
merged_df = duckdb.query(sql).df()
if validate_manifest:
# Check if crdc_instance_uuid is found in the index
if not all(merged_df["crdc_series_uuid_match"]):
missing_manifest_cp_cmds = merged_df.loc[
~merged_df["crdc_series_uuid_match"], "manifest_cp_cmd"
]
missing_manifest_cp_cmds_str = f"The following manifest copy commands do not have any associated series in the index: {missing_manifest_cp_cmds.tolist()}"
raise ValueError(missing_manifest_cp_cmds_str)
# Check if there are more than one endpoints
if len(merged_df["endpoint"].unique()) > 1:
raise ValueError(
"Either GCS bucket path is invalid or manifest has a mix of GCS and AWS urls. If so, please use urls from one provider only"
)
if (
len(merged_df["endpoint"].unique()) == 1
and merged_df["endpoint"].values[0] == "aws"
):
endpoint_to_use = aws_endpoint_url
if (
len(merged_df["endpoint"].unique()) == 1
and merged_df["endpoint"].values[0] == "unknown"
):
cmd = [
self.s5cmdPath,
"--no-sign-request",
"--endpoint-url",
gcp_endpoint_url,
"ls",
merged_df.s3_url.values[0],
]
process = subprocess.run(
cmd, capture_output=True, text=True, check=False
)
if process.stderr and process.stdout.startswith("ERROR"):
logger.debug(
"Folder not available in GCP. Manifest appears to be invalid."
)
if validate_manifest:
raise ValueError
else:
endpoint_to_use = gcp_endpoint_url
elif merged_df["endpoint"].values[0] == "aws":
endpoint_to_use = aws_endpoint_url
else:
# TODO: here we assume that the endpoint is GCP; we could check at least the first URL to be sure,
# but we can take care of this in a more principled way by including GCP bucket directly
# in the future, see https://github.com/ImagingDataCommons/idc-index/pull/56#discussion_r1582157048
endpoint_to_use = gcp_endpoint_url
# Calculate total size
total_size = merged_df["series_size_MB"].sum()
total_size = round(total_size, 2)
if dirTemplate is not None:
hierarchy = self._generate_sql_concat_for_building_directory(
dirTemplate=dirTemplate, downloadDir=downloadDir
)
sql = f"""
WITH temp as
(
SELECT
seriesInstanceUID,
s3_url
FROM
merged_df
)
SELECT
s3_url,
{hierarchy} as path
FROM
temp
JOIN
index using (seriesInstanceUID)
"""
logger.debug(f"About to run this query:\n{sql}")
merged_df = self.sql_query(sql)
# Write a temporary manifest file
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_manifest_file:
if use_s5cmd_sync and len(os.listdir(downloadDir)) != 0:
if dirTemplate is not None:
merged_df["s5cmd_cmd"] = (
"sync " + merged_df["s3_url"] + " " + merged_df["path"]
)
else:
merged_df["s5cmd_cmd"] = (
"sync " + merged_df["s3_url"] + " " + downloadDir
)
elif dirTemplate is not None:
merged_df["s5cmd_cmd"] = (
"cp " + merged_df["s3_url"] + " " + merged_df["path"]
)
else:
merged_df["s5cmd_cmd"] = "cp " + merged_df["s3_url"] + " " + downloadDir
merged_df["s5cmd_cmd"].to_csv(temp_manifest_file, header=False, index=False)
logger.info("Parsing the manifest is finished. Download will begin soon")
if dirTemplate is not None:
list_of_directories = merged_df.path.to_list()
else:
list_of_directories = [downloadDir]
return (
total_size,
endpoint_to_use,
Path(temp_manifest_file.name),
list_of_directories,
)
@staticmethod
def _generate_sql_concat_for_building_directory(dirTemplate, downloadDir):
# for now, we limit the allowed columns to this list to make sure that all
# values are guaranteed to be non-empty and to not contain any special characters
# in the future, we should consider including more attributes
# also, if we allow any column, we should decide what we would do if the value is NULL
valid_attributes = [
"PatientID",
"collection_id",
"Modality",
"StudyInstanceUID",
"SeriesInstanceUID",
]
valid_separators = ["_", "-", "/"]
updated_template = dirTemplate
# validate input template by removing all valid attributes and separators
for attr in valid_attributes:
updated_template = updated_template.replace("%" + attr, "")
for sep in valid_separators:
updated_template = updated_template.replace(sep, "")
if updated_template != "":
logger.error("Invalid download hierarchy template:" + updated_template)
logger.error(
"Make sure your template uses only valid attributes and separators"
)
logger.error("Valid attributes: " + str(valid_attributes))
logger.error("Valid separators: " + str(valid_separators))
raise ValueError
concat_command = dirTemplate
for attr in valid_attributes:
concat_command = concat_command.replace("%" + attr, f"', {attr},'")
# CONCAT command may contain empty strings, and they are not harmless -
# duckdb does not like them!
# NB: double-quotes are not allowed by duckdb!
concat_command = f"CONCAT('{downloadDir}/','" + concat_command + "')"
concat_command = concat_command.replace(",''", "")
concat_command = concat_command.replace("'',", "")
concat_command = concat_command.replace(",'',", "")
return concat_command
@staticmethod
def _track_download_progress(
size_MB: int,
downloadDir: str,
process: subprocess.Popen,
show_progress_bar: bool = True,
list_of_directories=None,
):
logger.info("Inputs received for tracking download:")
logger.info(f"size_MB: {size_MB}")
logger.info(f"downloadDir: {downloadDir}")
logger.info(f"show_progress_bar: {show_progress_bar}")
runtime_errors = []
if show_progress_bar:
total_size_bytes = size_MB * 10**6 # Convert MB to bytes
# temporary place holder. Accurate size is calculated in the next step
initial_size_bytes = 0
# Calculate the initial size of the directory
for directory in list_of_directories:
path = Path(directory)
if path.exists() and path.is_dir():
initial_size_bytes += sum(
f.stat().st_size for f in path.iterdir() if f.is_file()
)
logger.info("Initial size of the directory: %s bytes", initial_size_bytes)
logger.info(
"Approx. Size of the files need to be downloaded: %s bytes",
total_size_bytes,
)
pbar = tqdm(
total=total_size_bytes,
unit="B",
unit_scale=True,
desc="Downloading data",
)
while True:
downloaded_bytes = 0
for directory in list_of_directories:
path = Path(directory)
if path.exists() and path.is_dir():
downloaded_bytes += sum(
f.stat().st_size for f in path.iterdir() if f.is_file()
)
downloaded_bytes -= initial_size_bytes
pbar.n = min(
downloaded_bytes, total_size_bytes
) # Prevent the progress bar from exceeding 100%
pbar.refresh()
if process.poll() is not None:
break
time.sleep(0.5)
# Wait for the process to finish
_, stderr = process.communicate()
pbar.close()
else:
while process.poll() is None:
time.sleep(0.5)
def _parse_s5cmd_sync_output_and_generate_synced_manifest(
self, stdout, downloadDir, dirTemplate
) -> Path:
"""
Parse the output of s5cmd sync --dry-run to extract distinct folders and generate a synced manifest.
Args:
output (str): The output of s5cmd sync --dry-run command.
downloadDir (str): The directory to download the files to.
dirTemplate (str): Download directory hierarchy template.
Returns:
Path: The path to the generated synced manifest file.
float: Download size in MB
"""
logger.info("Parsing the s5cmd sync dry run output...")
stdout_df = pd.DataFrame(stdout.splitlines(), columns=["s5cmd_output"])
# create a copy of the index
index_df_copy = self.index
# TODO: need to remove the assumption that manifest commands will have 'cp'
# ruff: noqa
sql = """
PRAGMA disable_progress_bar;
WITH
index_temp AS (
SELECT
*,
REGEXP_EXTRACT(series_aws_url, '(?:.*?\\/){3}([^\\/?#]+)', 1) index_crdc_series_uuid
FROM
index_df_copy),
sync_temp AS (
SELECT
DISTINCT CONCAT(REGEXP_EXTRACT(s5cmd_output, 'cp (s3://[^/]+/[^/]+)/.*', 1), '/*') AS s3_url,
REGEXP_EXTRACT(CONCAT(REGEXP_EXTRACT(s5cmd_output, 'cp (s3://[^/]+/[^/]+)/.*', 1), '/*'),'(?:.*?\\/){3}([^\\/?#]+)',1) AS sync_crdc_instance_uuid
FROM
stdout_df )
SELECT
DISTINCT seriesInstanceUID,
series_size_MB,
s3_url
FROM
sync_temp
LEFT JOIN
index_temp
ON
index_temp.index_crdc_series_uuid = sync_temp.sync_crdc_instance_uuid
"""
# ruff: noqa: end
merged_df = duckdb.query(sql).df()
sync_size = merged_df["series_size_MB"].sum()
sync_size_rounded = round(sync_size, 2)
logger.info(f"sync_size_rounded: {sync_size_rounded}")
if dirTemplate is not None:
hierarchy = self._generate_sql_concat_for_building_directory(
dirTemplate=dirTemplate, downloadDir=downloadDir
)
sql = f"""
WITH temp as
(
SELECT
seriesInstanceUID
FROM
merged_df
)
SELECT
series_aws_url,
{hierarchy} as path
FROM
temp
JOIN
index using (seriesInstanceUID)
"""
synced_df = self.sql_query(sql)
# Write a temporary manifest file
with tempfile.NamedTemporaryFile(mode="w", delete=False) as synced_manifest:
if dirTemplate is not None:
synced_df["s5cmd_cmd"] = (
"sync " + synced_df["s3_url"] + " " + synced_df["path"]
)
list_of_directories = synced_df.path.to_list()
else:
synced_df["s5cmd_cmd"] = (
"sync " + synced_df["s3_url"] + " " + downloadDir
)
list_of_directories = [downloadDir]
synced_df["s5cmd_cmd"].to_csv(synced_manifest, header=False, index=False)
logger.info("Parsing the s5cmd sync dry run output finished")
return Path(synced_manifest.name), sync_size_rounded, list_of_directories
def _s5cmd_run(
self,
endpoint_to_use,
manifest_file,
total_size,
downloadDir,
quiet,
show_progress_bar,
use_s5cmd_sync,
dirTemplate,
list_of_directories,
):
"""
Executes the s5cmd command to sync files from a given endpoint to a local directory.
This function first performs a dry run of the s5cmd command to check which files need to be downloaded.
If there are files to be downloaded, it generates a new manifest file with the files to be synced and
runs the s5cmd command again to download the files. The progress of the download is tracked and printed
to the console.
Args:
endpoint_to_use (str): The endpoint URL to download the files from.
manifest_file (str): The path to the manifest file listing the files to be downloaded.
total_size (float): The total size of the files to be downloaded in MB.
downloadDir (str): The local directory where the files will be downloaded.
quiet (bool, optional): If True, suppresses the stdout and stderr of the s5cmd command.
show_progress_bar (bool, optional): If True, tracks the progress of download
use_s5cmd_sync (bool, optional): If True, will use s5cmd sync operation instead of cp when downloadDirectory is not empty; this can significantly improve the download speed if the content is partially downloaded
dirTemplate (str): Download directory hierarchy template.
Raises:
subprocess.CalledProcessError: If the s5cmd command fails.
Returns:
None
"""
logger.debug("running self._s5cmd_run. Inputs received:")
logger.debug(f"endpoint_to_use: {endpoint_to_use}")
logger.debug(f"manifest_file: {manifest_file}")
logger.debug(f"total_size: {total_size}")
logger.debug(f"downloadDir: {downloadDir}")
logger.debug(f"quiet: {quiet}")
logger.debug(f"show_progress_bar: {show_progress_bar}")
logger.debug(f"use_s5cmd_sync: {use_s5cmd_sync}")
logger.debug(f"dirTemplate: {dirTemplate}")
if quiet:
stdout = subprocess.DEVNULL
stderr = subprocess.DEVNULL
else:
stdout = None
stderr = None
if use_s5cmd_sync and len(os.listdir(downloadDir)) != 0:
logger.debug(
"Requested progress bar along with s5cmd sync dry run.\
Using s5cmd sync dry run as the destination folder is not empty"
)
dry_run_cmd = [
self.s5cmdPath,
"--no-sign-request",
"--dry-run",
"--endpoint-url",
endpoint_to_use,
"run",
manifest_file,
]
process = subprocess.run(
dry_run_cmd, stdout=subprocess.PIPE, text=True, check=False
)
if process.stdout:
# Some files need to be downloaded
logger.info(
"""
stoud from s5cmd sync dry run is not empty. Parsing the output to
evaluate what to download and corresponding size with only series level precision
"""
)
(
synced_manifest,
sync_size,
list_of_directories,
) = self._parse_s5cmd_sync_output_and_generate_synced_manifest(
stdout=process.stdout,
downloadDir=downloadDir,
dirTemplate=dirTemplate,
)
logger.info(f"sync_size (MB): {sync_size}")
cmd = [
self.s5cmdPath,
"--no-sign-request",
"--endpoint-url",
endpoint_to_use,
"run",
synced_manifest,
]
with subprocess.Popen(
cmd, stdout=stdout, stderr=stderr, universal_newlines=True
) as process:
if sync_size < total_size:
logger.info(
"""
Destination folder is not empty and sync size is less than total size. Displaying a warning
"""
)
existing_data_size = round(total_size - sync_size, 2)
logger.warning(
f"Requested total download size is {total_size} MB, \
however at least {existing_data_size} MB is already present,\
so downloading only remaining upto {sync_size} MB\n\
Please note that disk sizes are calculated at series level, \
so if individual files are missing, displayed progress bar may\
not be accurate."
)
self._track_download_progress(
sync_size, downloadDir, process, show_progress_bar