Browse Source

initial commit

Michael Kistler 1 month ago
parent
commit
7d29060d89
1 changed files with 204 additions and 0 deletions
  1. 204 0
      trim_edf.py

+ 204 - 0
trim_edf.py

@@ -0,0 +1,204 @@
1
+#!/usr/bin/env python
2
+#from __future__ import division, print_function, absolute_import
3
+
4
+from pathlib import Path
5
+
6
+#import mne
7
+#import matplotlib.pyplot as plt
8
+import numpy as np
9
+#import scipy as sp
10
+import os
11
+import sys
12
+#import edfrd
13
+import getopt
14
+import os
15
+
16
+from datetime import datetime
17
+from datetime import timedelta
18
+
19
+import pyedflib
20
+
21
+
22
+def main(argv):
23
+    '''
24
+
25
+    :param argv: list of command line arguments
26
+    :return:
27
+    '''
28
+
29
+    try:
30
+        opts, args = getopt.getopt(argv, "hi:s:e:d:", ["--input_file=", "--start_time=", "--end_time="])
31
+    except getopt.GetoptError:
32
+        print('split-edf2.py -i <inputfile>')
33
+        sys.exit(2)
34
+
35
+    # default values for optional parameters
36
+    interval_in_days = 7
37
+    start_time = 0
38
+    end_time = None
39
+
40
+    for opt, arg in opts:
41
+
42
+
43
+
44
+        if opt == '-h':
45
+            print(
46
+'split-edf2.py\n\
47
+-i, --input_file:           absolute path to the edf file\n\
48
+optional parameters:\n\
49
+-d, --interval_in_days:     how long is a segment in days \n\
50
+-s, --start_time:           start time as \'hh:mm:ss\'\n\
51
+-e, --end_time:             end_time as \'hh:mm:ss\''
52
+                    )
53
+            sys.exit()
54
+        elif opt in ("-i", "--input_file"):
55
+            fp = Path(arg)
56
+        elif opt in ("-s", "--start_time"):
57
+            start_time = str(arg)
58
+        elif opt in ("-e", "--end_time"):
59
+            end_time = str(arg)
60
+
61
+
62
+    if isinstance(start_time, str):
63
+        hrs, mins, secs = start_time.split(":")
64
+        start_time_td = timedelta(hours=int(hrs), minutes=int(mins), seconds=int(secs))
65
+    else:
66
+        start_time_td = timedelta(seconds=0)
67
+
68
+    if isinstance(end_time, str):
69
+        hrs, mins, secs = end_time.split(":")
70
+        end_time_td = timedelta(hours=int(hrs), minutes=int(mins), seconds=int(secs))
71
+    else:
72
+        end_time_td = end_time
73
+
74
+
75
+    #fp = Path("edf","10-54-26_part_1.EDF")
76
+    print("Input file is", os.fspath(fp))
77
+
78
+    if not fp.is_file():
79
+        print(f'File:{str(fp)} not found, please check your input')
80
+        exit()
81
+
82
+    trim_edf(fp,start_time_td=start_time_td, end_time_td=end_time_td)
83
+
84
+def _annotations(annotations, slice=None):
85
+    '''
86
+    this function converts a pyedflib annotations list into list of aggregated values that can be easily writen as single annotation to an EDF file
87
+    If slice is set, only the annotations within the range of the slice is extracted
88
+
89
+    :param annotations: (list), edflib annotations of an edf file
90
+    :param slice: (list), len of 2, with a start time and end timestamp in seconds
91
+    :return: (list) annotations_list, set of annotations with timestamp in seconds, value and description text
92
+    '''
93
+    ## annotations
94
+    ## timestamp, value, default -1, name/description
95
+    annotations_list = list()
96
+    for i in range(0, len(annotations[0])):
97
+        if slice:
98
+            if annotations[0][i]-slice[0] >=0 and slice[1]-annotations[0][i] >=0:
99
+            #if in range(slice[0],slice[-1]+1):
100
+                annotation = list([annotations[0][i]-slice[0], annotations[1][i], annotations[2][i]])
101
+                annotations_list.append(annotation)
102
+        else:
103
+            annotation = list([annotations[0][i], annotations[1][i], annotations[2][i]])
104
+            annotations_list.append(annotation)
105
+
106
+    return annotations_list
107
+
108
+
109
+def trim_edf(fp, start_time_td, end_time_td=None):
110
+    '''
111
+    split an EDF file into files of n days
112
+    :param fp: (Path) EDF file to split
113
+    :param start_time_td: (timedelta) start point from where to cut the file
114
+    :param end_time_td: (timedelta) until where to cut the file
115
+    :return: (Path) path to the directory where the parts are stored to
116
+    '''
117
+
118
+    f_in = pyedflib.EdfReader(os.fspath(fp))
119
+    file_duration= f_in.getFileDuration()
120
+    file_duration_td = timedelta(seconds=file_duration)
121
+    file_duration_s = int(file_duration_td.total_seconds())
122
+    start_time_s = int(start_time_td.total_seconds())
123
+
124
+    if end_time_td:
125
+        end_time_s = int(end_time_td.total_seconds())
126
+    else:
127
+        end_time_s = end_time_td
128
+
129
+    if end_time_s:
130
+        file_duration_stripped_s = end_time_s-start_time_s
131
+    else:
132
+        file_duration_stripped_s = file_duration_s-start_time_s
133
+
134
+
135
+    print(file_duration_stripped_s)
136
+
137
+    if file_duration_stripped_s < file_duration_s:
138
+
139
+        file_type = pyedflib.FILETYPE_EDFPLUS
140
+        start_date = f_in.getStartdatetime()
141
+        print(start_date)
142
+        sample_frequencies = f_in.getSampleFrequencies()
143
+        signal_headers = f_in.getSignalHeaders()
144
+        channel_info = f_in.getSignalHeaders()
145
+        channels = f_in.signals_in_file
146
+        annotations = f_in.readAnnotations()
147
+
148
+        #duration = end_time_s-start_time_s
149
+        recording_start_date = start_date + timedelta(seconds=start_time_s)
150
+        print(recording_start_date)
151
+        anno = _annotations(annotations, slice=list([start_time_s, end_time_s]))
152
+
153
+        f_out = fp.with_name(fp.stem + "_" + "trim" + "_" + str(start_time_s) + "_" + str(end_time_s)).with_suffix(
154
+            fp.suffix)
155
+
156
+        fo = pyedflib.EdfWriter(os.fspath(f_out), channels, file_type=file_type)
157
+        fo.path = f_out.name  # f.file_name
158
+        fo.file_type = file_type
159
+        fo.patient_name = f_in.getPatientName()
160
+        fo.patient_code = f_in.getPatientCode()
161
+        fo.technician = f_in.getTechnician()
162
+        fo.equipment = f_in.getEquipment()
163
+        fo.recording_additional = f_in.getRecordingAdditional()
164
+        fo.patient_additional = f_in.getPatientAdditional()
165
+        fo.admincode = f_in.getAdmincode()
166
+        fo.gender = f_in.getGender()
167
+        fo.recording_start_time = recording_start_date
168
+        fo.birthdate = f_in.getBirthdate()
169
+        fo.duration = 1
170
+        fo.number_of_annotations = 1 if file_type in [pyedflib.FILETYPE_EDFPLUS, pyedflib.FILETYPE_BDFPLUS] else 0
171
+        fo.n_channels = f_in.signals_in_file
172
+        fo.channels = channel_info
173
+        data_list = list()
174
+        for channel in range(0,channels):
175
+
176
+            sample_frequency = f_in.getSampleFrequency(channel)
177
+            signal_header = f_in.getSignalHeader(channel)
178
+            #fo.setSignalHeader(channel, signal_header)
179
+            data_list.append(f_in.readSignal(channel, start=start_time_s * sample_frequency, n=file_duration_stripped_s * sample_frequency))
180
+
181
+        #read_samples_slice(fp,)
182
+        # adapt header files with correct numbers (date)
183
+
184
+        #fo.sample_buffer = []
185
+        #fo.setHeader(f_in.getHeader())
186
+        fo.setSignalHeaders(channel_info)
187
+
188
+        anno = _annotations(annotations, slice=list([start_time_s, end_time_s]))
189
+        for a in anno:
190
+            fo.writeAnnotation(a[0], a[1], a[2])
191
+
192
+        fo.writeSamples(data_list)
193
+        print("written file", f_out.name)
194
+        del data_list
195
+        del fo
196
+
197
+    # for signal_header in signal_headers:
198
+    #     print(signal_header)
199
+
200
+if __name__ == "__main__":
201
+    main(sys.argv[1:])
202
+
203
+
204
+#split_edf(fp,interval_in_seconds_td,start_time_td=start_time_td, end_time_td=end_time_td)